慣性聚合 高效追蹤和閱讀你感興趣的部落格、新聞、科技資訊
閱讀原文 在慣性聚合中打開

推薦訂閱源

小众软件
小众软件
博客园 - 叶小钗
有赞技术团队
有赞技术团队
大猫的无限游戏
大猫的无限游戏
博客园_首页
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
L
LangChain Blog
Hugging Face - Blog
Hugging Face - Blog
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
aimingoo的专栏
aimingoo的专栏
Blog — PlanetScale
Blog — PlanetScale
爱范儿
爱范儿
T
Tailwind CSS Blog
Jina AI
Jina AI
量子位
Stack Overflow Blog
Stack Overflow Blog
人人都是产品经理
人人都是产品经理
J
Java Code Geeks
V
Visual Studio Blog
月光博客
月光博客

博客园 - ®Geovin Du Dream Park™

go: Coroutines Pattern python:Coroutines Pattern go: Broadcast Pattern python: Broadcast Pattern python: Bounded Parallelism Pattern go: Bounded Parallelism Pattern go: N-Barrier Pattern python: Semaphore Pattern go: Semaphore Pattern python: Read-Write Lock Pattern go: Read-Write Lock Pattern python: Monitor Pattern go: Monitor Pattern python: Mutex Pattern go: Lock/Mutex Pattern Python: Condition Variable Pattern go:Condition Variable Pattern python: Registry Pattern python: Interpreter Pattern go: Interpreter Pattern go: Registry Pattern go: Memento Pattern go: Command Pattern go: State Pattern - ®Geovin Du Dream Park™ go:Template Method Pattern go: Strategy Pattern go: Visitor Pattern go: Observer Pattern go: Mediator Pattern go: Iterator Pattern go: Chain of Responsibility Pattern go: Proxy Pattern go:Decorator Pattern go: Facade Pattern go: Flyweight Pattern go: Composite Pattern go: Singleton Pattern go: Prototype Pattern go: Bridge Pattern go: Adapter Pattern go: Builder Pattern 密码进行加盐哈希 using CSharp,Python,Go,Java go: Abstract Factory Pattern go: Model,Interface,DAL ,Factory,BLL using mysql go: Simple Factory Pattern - ®Geovin Du Dream Park™ go: Factory Method Pattern go: goLang 在Windows环境搭建Go语言开发环境 CSharp: Parallel Extensions cpp: class python: 蝴蝶跟随鼠标
python: N-Barrier Pattern
®Geovin Du D · 2026-05-27 · via 博客园 - ®Geovin Du Dream Park™

項目結構:

5d6a937c-3790-4a4d-b647-c667214a8f90

# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:03 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : structured_logger.py

import json
from datetime import datetime


class StructuredLogger:
    """
    結構化日誌適配器
    """

    @staticmethod
    def info(msg: str, **kwargs):
        """

        :param msg:
        :param kwargs:
        :return:
        """
        timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
        log_entry = {"level": "info", "time": timestamp, "msg": msg, **kwargs}
        print(json.dumps(log_entry, ensure_ascii=False))

    def error(self, msg: str, **kwargs):
        timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
        log_entry = {"level": "error", "time": timestamp, "msg": msg, **kwargs}
        print(json.dumps(log_entry, ensure_ascii=False))

    def warning(self, msg: str, **kwargs):
        timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
        log_entry = {"level": "warning", "time": timestamp, "msg": msg, **kwargs}
        print(json.dumps(log_entry, ensure_ascii=False))

    def alert(self, msg: str, **kwargs):
        timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
        log_entry = {"level": "alert", "time": timestamp, "msg": msg, **kwargs}
        print(json.dumps(log_entry, ensure_ascii=False))


    @staticmethod
    def separator():
        print("========================================")





# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 20:52 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : registry.py

from typing import Dict, Callable, Optional
from dataclasses import dataclass
import time


@dataclass
class StepDefinition:
    """
    步驟元數據定義(支持動態註冊)
    """
    name: str
    duration: float
    executor: Callable  # 步驟執行函數
    stage: str  # 所屬階段(生產/入庫/發貨)


class WorkflowRegistry:
    """
    工作流注冊中心(單例模式)
    """
    _instance = None
    _steps: Dict[str, StepDefinition] = {}
    _stage_steps: Dict[str, list] = {"production": [], "inventory": [], "shipping": []}

    def __new__(cls):
        """

        """
        if cls._instance is None:
            cls._instance = super(WorkflowRegistry, cls).__new__(cls)
        return cls._instance

    def register_step(self, step: StepDefinition) -> None:
        """
        動態註冊新步驟
        :param step:
        :return:
        """
        self._steps[step.name] = step
        self._stage_steps[step.stage].append(step)
        print(f"[註冊中心] 已註冊步驟: {step.name} ({step.stage}) | 耗時: {step.duration}s")

    def get_stage_steps(self, stage: str) -> list:
        """
        獲取階段所有步驟(含動態新增)
        :param stage:
        :return:
        """
        return self._stage_steps.get(stage, [])

    def get_step_count(self, stage: str) -> int:
        """
        實時計算階段步驟數量(支持動態擴展)
        :param stage:
        :return:
        """
        return len(self._stage_steps.get(stage, []))

    # === DYNAMIC STEP ENHANCEMENT ===
    @classmethod
    def init_default_steps(cls):
        """
        初始化默認步驟(保留原有流程)
        :return:
        """
        registry = cls()
        # 生產階段
        registry.register_step(StepDefinition("款式設計", 3.0, lambda ctx: time.sleep(3.0), "production"))
        registry.register_step(StepDefinition("原料採購", 2.0, lambda ctx: time.sleep(2.0), "production"))
        registry.register_step(StepDefinition("工藝加工", 4.0, lambda ctx: time.sleep(4.0), "production"))
        registry.register_step(StepDefinition("品質質檢", 2.0, lambda ctx: time.sleep(2.0), "production"))
        registry.register_step(StepDefinition("成品包裝", 1.0, lambda ctx: time.sleep(1.0), "production"))

        # 入庫階段
        registry.register_step(StepDefinition("檔案歸檔", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
        registry.register_step(StepDefinition("庫存更新", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
        registry.register_step(StepDefinition("貼標入倉", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
        registry.register_step(StepDefinition("入庫核驗", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
        registry.register_step(StepDefinition("成品拍照", 1.0, lambda ctx: time.sleep(1.0), "inventory"))

        # 發貨階段
        registry.register_step(StepDefinition("批量發貨", 2.0, lambda ctx: time.sleep(2.0), "shipping"))
        return registry



# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 20:56 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : settings.py

class WorkflowConfig:
    """工作流全局配置"""
    PRODUCTION_STEPS_COUNT = 5
    INVENTORY_STEPS_COUNT = 5
    SHIPPING_STEPS_COUNT = 1

    # 模擬耗時(秒)
    STEP_DURATIONS = {
        "成品包裝": 1.0, "原料採購": 2.0, "款式設計": 3.0,
        "工藝加工": 4.0, "品質質檢": 2.0, "檔案歸檔": 1.0,
        "庫存更新": 1.0, "貼標入倉": 1.0, "入庫核驗": 1.0,
        "成品拍照": 1.0, "批量發貨": 2.0
    }

    RETRY_POLICY = {
        "strategy": "指數退避",
        "max_retries": 2
    }
# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 20:54 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : dynamic_barrier.py

import threading
import time
from typing import Dict, List, Set
from NBarrierPattern.domain.exceptions import BarrierTimeoutError


class DynamicNBarrier:
    """
    支持動態參與者的彈性屏障
    """

    def __init__(self, barrier_id: str, timeout: float = 30.0):
        self.barrier_id = barrier_id
        self.timeout = timeout
        self._lock = threading.RLock()
        self._condition = threading.Condition(self._lock)
        self._participants: Dict[str, bool] = {}  # worker_id -> is_arrived
        self._completion_time = None
        self._reset_flag = False

    def register(self, worker_id: str) -> None:
        """
        動態註冊新參與者(運行時可調用)
        :param worker_id:
        :return:
        """
        with self._lock:
            if worker_id not in self._participants:
                self._participants[worker_id] = False
                print(f"[屏障] {self.barrier_id} 新註冊參與者: {worker_id}")

    def wait(self, worker_id: str, context: dict) -> dict:
        """
        帶超時控制的等待(支持動態參與者)
        :param worker_id:
        :param context:
        :return:
        """
        start_time = time.time()

        with self._condition:
            # 標記到達
            if worker_id not in self._participants:
                raise ValueError(f"參與者 {worker_id} 未註冊到屏障 {self.barrier_id}")
            self._participants[worker_id] = True

            # 檢查是否全部到達
            while not all(self._participants.values()) and not self._reset_flag:
                elapsed = time.time() - start_time
                if elapsed > self.timeout:
                    raise BarrierTimeoutError(
                        self.barrier_id,
                        [w for w, arrived in self._participants.items() if not arrived]
                    )
                self._condition.wait(self.timeout - elapsed)

            # 記錄完成時間
            self._completion_time = time.time() - start_time
            return {
                "barrier_id": self.barrier_id,
                "synchronized_workers": len(self._participants),
                "active_workers": list(self._participants.keys()),
                "completion_time": round(self._completion_time, 4),
                **context
            }

    def reset(self) -> None:
        """
        重置屏障狀態(供新批次流程使用)
        :return:
        """
        with self._condition:
            self._participants = {w: False for w in self._participants}
            self._reset_flag = True
            self._condition.notify_all()
            self._reset_flag = False

    def remove(self, worker_id: str) -> None:
        """
        移除參與者(動態取消註冊)
        :param worker_id:
        :return:
        """
        with self._condition:
            if worker_id in self._participants:
                del self._participants[worker_id]
                print(f"[屏障] {self.barrier_id} 移除參與者: {worker_id}")
                self._condition.notify_all()

    def get_status(self) -> dict:
        """
        實時獲取屏障狀態(用於監控)
        :return:
        """
        with self._lock:
            return {
                "barrier_id": self.barrier_id,
                "total_participants": len(self._participants),
                "arrived_count": sum(1 for arrived in self._participants.values() if arrived),
                "pending_workers": [w for w, arrived in self._participants.items() if not arrived]
            }



# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:00 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : n_barrier.py


from threading import Barrier as ThreadBarrier, BrokenBarrierError
import time
from NBarrierPattern.domain.exceptions import BarrierTimeoutError
import threading
import time
from typing import Dict, List, Set


class NBarrier:
    """
    企業級N-Barrier實現
    """

    def __init__(self, parties: int, barrier_id: str, timeout: float = 30.0):
        """

        :param parties:
        :param barrier_id:
        :param timeout:
        """
        self._parties = parties
        self._timeout = timeout
        self._barrier = ThreadBarrier(parties, timeout=timeout)
        self.barrier_id = barrier_id
        self.participants: Dict[str, bool] = {}
        self._completion_time = None
        self._lock = threading.RLock()

    def _reset_barrier(self):
        """
        重置屏障(當屏障損壞時調用)
        :return:
        """
        with self._lock:
            self._parties = len(self.participants) if self.participants else self._parties
            self._barrier = ThreadBarrier(self._parties, timeout=self._timeout)
            self.participants = {w: False for w in self.participants}

    def register(self, worker_id: str) -> None:
        """
        註冊參與方
        :param worker_id:
        :return:
        """
        with self._lock:
            if worker_id not in self.participants:
                self.participants[worker_id] = False
                # 動態調整屏障大小
                if len(self.participants) > self._parties:
                    self._parties = len(self.participants)
                    self._barrier = ThreadBarrier(self._parties, timeout=self._timeout)

    def wait(self, worker_id: str, context: dict) -> dict:
        """
        帶上下文的等待與審計記錄生成
        :param worker_id:
        :param context:
        :return:
        """
        try:
            self.participants[worker_id] = True
            start = time.time()
            self._barrier.wait()
            self._completion_time = time.time() - start

            return {
                "barrier_id": self.barrier_id,
                "synchronized_workers": len(self.participants),
                "active_workers": [w for w, s in self.participants.items() if s],
                "completion_time": round(self._completion_time, 4),
                **context
            }
        except BrokenBarrierError:
            self._reset_barrier()
            raise BarrierTimeoutError(
                self.barrier_id,
                [w for w, s in self.participants.items() if not s]
            )


# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:54 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : workflow_engine.py
from NBarrierPattern.config.registry import WorkflowRegistry
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
from NBarrierPattern.services.stage_service import StageService
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
from NBarrierPattern.domain.events import QualityPassedEvent, ArchivingCompletedEvent, EventBus
from NBarrierPattern.services.shipping.service import ShippingServiceWithRetry

class WorkflowOrchestrator(object):
    """
    全流程自動化編排引擎
    """

    def __init__(self, registry: WorkflowRegistry, logger: StructuredLogger):
        """

        :param registry:
        :param logger:
        """
        self.registry = registry
        self.logger = logger
        self.context = WorkflowContext()
        self.shipping_service = ShippingServiceWithRetry(logger)
        # 初始化所有階段的屏障和服務
        self.services = {
            "production": self._build_stage_service("production"),
            "inventory": self._build_stage_service("inventory"),
            "shipping": self._build_stage_service("shipping")
        }

        # 訂閱事件,實現全自動觸發
        EventBus.subscribe(ArchivingCompletedEvent, self.on_archiving_completed)

    def _build_stage_service(self, stage_name: str) -> StageService:
        """

        :param stage_name:
        :return:
        """
        barrier = DynamicNBarrier(f"{stage_name.upper()}_BARRIER")
        return StageService(stage_name, self.registry, barrier, self.logger)

    def start_production(self):
        """
        手動或外部API觸發生產流程的起點
        :return:
        """
        self.logger.info("=== 🏭 階段1:生產流程啟動 ===")
        self.services["production"].execute(self.context)

    def on_archiving_completed(self, event: ArchivingCompletedEvent):
        """
        監聽歸檔完成事件,自動觸發後續物理流轉
        :param event:
        :return:
        """
        self.logger.separator()
        self.logger.info("🔗 接收到歸檔完成信號,準備跨入物理流轉階段...",
                         workflow_id=event.workflow_id,
                         archive_id=event.archive_id)

        # 1. 自動觸發入庫流程
        self.logger.info("=== 📦 階段2:自動觸發入庫流程 ===")
        self.services["inventory"].execute(self.context)

        # 2. 自動觸發送貨流程
        self.logger.info("=== 🚚 階段3:自動觸發送貨流程 ===")
        self.services["shipping"].execute(self.context)

        # 3. 輸出最終全鏈路閉環結果
        self.logger.separator()
        self.logger.info("🎉 珠寶全生命週期管理流程完美閉環!",
                         workflow_id=self.context.workflow_id,
                         final_status="已發貨",
                         archive_id=event.archive_id)

        # 觸發帶重試保護的發貨流程
        self.logger.info("=== 🚚 階段3:啟動智能發貨流程 ===")
        self.shipping_service.execute_shipping_with_retry(self.context)

        # 根據最終狀態輸出結果
        if self.context.shipping_status.name == 'SUCCESS':
            self.logger.info("🎉 全鏈路閉環完成!")
        else:
            self.logger.alert("⚠️ 流程異常終止,請檢查工單系統",
                              final_state=self.context.shipping_status.value)
# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:43 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : events.py

from dataclasses import dataclass, field
from typing import List, Callable
import uuid
from datetime import datetime
from threading import Thread

@dataclass
class QualityPassedEvent:
    """
    質檢通過領域事件
    """
    workflow_id: str
    product_sku: str
    passed_at: str = field(default_factory=lambda: datetime.now().isoformat())
    metadata: dict = field(default_factory=dict)

@dataclass
class ArchivingCompletedEvent:
    """
    歸檔完成領域事件(觸發發貨的開關)
    """
    workflow_id: str
    archive_id: str
    storage_zone: str
    archived_at: str = field(default_factory=lambda: datetime.now().isoformat())

class EventBus:
    """
    輕量級內存事件總線
    """
    _subscribers: dict = {}

    @classmethod
    def subscribe(cls, event_type: type, handler: Callable):
        if event_type not in cls._subscribers:
            cls._subscribers[event_type] = []
        cls._subscribers[event_type].append(handler)

    @classmethod
    def publish(cls, event):
        event_type = type(event)
        if event_type in cls._subscribers:
            for handler in cls._subscribers[event_type]:
                Thread(target=handler, args=(event,), daemon=True).start()



# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 20:58 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : exceptions.py

class BarrierTimeoutError(Exception):
    """
    屏障超時異常
    """
    def __init__(self, barrier_id: str, pending_workers: list):
        """

        :param barrier_id:
        :param pending_workers:
        """
        self.barrier_id = barrier_id
        self.pending_workers = pending_workers
        super().__init__(f"屏障{barrier_id}超時,未到達節點:{pending_workers}")



# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 20:57 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : models.py

from dataclasses import dataclass, field
from typing import Dict, Any
import uuid
from datetime import datetime
from enum import Enum
from NBarrierPattern.domain.QualityStatus import QualityStatus


class ShippingStatus(Enum):
    PENDING = "待發貨"
    DISPATCHING = "下發指令中"
    SUCCESS = "已發貨"
    FAILED_RECOVERABLE = "失敗(可重試)"  # 如WMS接口超時
    FAILED_PERMANENT = "失敗(需人工介入)" # 如參數校驗不通過


@dataclass
class WorkflowContext:
    """
    工作流上下文載體
    """
    workflow_id: str = field(default_factory=lambda: f"JW-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}")
    quality_status: QualityStatus = QualityStatus.PENDING
    shipping_status: ShippingStatus = ShippingStatus.PENDING
    rework_count: int = 0  # 記錄返工次數,防止死循環
    max_rework_attempts: int = 3
    metadata: Dict[str, Any] = field(default_factory=dict)

    def mark_as_failed(self):
        """

        :return:
        """
        if self.rework_count >= self.max_rework_attempts:
            raise RuntimeError(f"流程{self.workflow_id}返工次數已達上限({self.max_rework_attempts}),強制終止!")
        QualityStatus.quality_status = QualityStatus.FAILED
        self.rework_count += 1

    def mark_as_reworked(self):
        """

        :return:
        """
        QualityStatus.quality_status = QualityStatus.REWORKED


# 自定義業務異常體系
class BaseShippingException(Exception):
    """

    """
    def __init__(self, message: str, is_recoverable: bool):
        super().__init__(message)
        self.is_recoverable = is_recoverable

class WmsTimeoutError(BaseShippingException):
    """
    倉庫系統連接超時(可恢復)
    """
    def __init__(self):
        super().__init__("調用WMS接口超時", True)

class InvalidAddressError(BaseShippingException):
    """
    收貨地址非法(不可恢復)
    """
    def __init__(self, address: str):
        super().__init__(f"收貨地址無效: {address}", False)


# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:29 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : QualityStatus.py

from enum import Enum


class QualityStatus(Enum):
    """質量檢驗狀態"""
    PENDING = "待質檢"
    PASSED = "合格"
    FAILED = "不合格(需返工)"
    REWORKED = "已返工"
# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:06 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : BaseStageService.py

from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
import time


class BaseStageService:
    """
    階段基類
    """

    def __init__(self, logger: StructuredLogger, barrier: NBarrier, stage_name: str, steps_config: list):
        """

        :param logger:
        :param barrier:
        :param stage_name:
        :param steps_config:
        """
        self.logger = logger
        self.barrier = barrier
        self.stage_name = stage_name
        self.steps_config = steps_config

    def _execute_step(self, step_name: str, duration: float, worker_id: str):
        """
        隔離的步驟執行環境
        :param step_name:
        :param duration:
        :param worker_id:
        :return:
        """
        self.logger.info("開始{}步驟".format(self.stage_name), step=step_name, worker_id=worker_id)
        time.sleep(duration)

        metrics = {"cost": round(duration + (duration * 0.0001), 7), "retry": 0}
        self.logger.info("步驟完成", stage=self.stage_name, step=step_name, worker_id=worker_id, **metrics)

        barrier_result = self.barrier.wait(worker_id, metrics)
        self.logger.info("=== 【{}屏障通過】 ===".format(self.stage_name), **barrier_result)

    def execute(self, context: WorkflowContext):
        """

        :param context:
        :return:
        """
        threads = []
        for step_name, duration in self.steps_config.items():
            worker_id = f"{self.stage_name.upper()[:4]}-{list(self.steps_config.keys()).index(step_name) + 1}"
            self.barrier.register(worker_id)
            t = Thread(target=self._execute_step, args=(step_name, duration, worker_id))
            threads.append(t)
            t.start()

        for t in threads:
            t.join()
        return context



# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:22 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : stage_service.py

from threading import Thread
from NBarrierPattern.config.registry import WorkflowRegistry, StepDefinition
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
import time
from typing import Dict, List, Set

class StageService:
    """
    支持動態步驟的階段服務
    """

    def __init__(
            self,
            stage_name: str,
            registry: WorkflowRegistry,
            barrier: DynamicNBarrier,
            logger: StructuredLogger
    ):
        self.stage_name = stage_name
        self.registry = registry
        self.barrier = barrier
        self.logger = logger
        self._threads: List[Thread] = []
        self._worker_id_counter = 0  # 用於生成唯一worker_id

    def add_step(self, step: StepDefinition, context=None) -> None:
        """
        運行時動態添加新步驟
        :param step:
        :param context:
        :return:
        """
        if step.stage != self.stage_name:
            raise ValueError(f"步驟 {step.name} 不屬於 {self.stage_name} 階段")

        self.registry.register_step(step)
        # === DYNAMIC STEP ENHANCEMENT ===
        # 關鍵:動態註冊新參與者到屏障
        worker_id = self._generate_worker_id(step.name)
        self.barrier.register(worker_id)
        try:
            step.executor(context)  # 執行,傳入context
            self.barrier.wait(worker_id, {})  # 執行完等待匯合,傳入空context
        finally:
            self.barrier.remove(worker_id)  # 離開時移除

        self.logger.info("動態添加新步驟",
                         stage=self.stage_name,
                         step=step.name,
                         worker_id=worker_id,
                         new_barrier_size=self.barrier.get_status()["total_participants"])

    def _generate_worker_id(self, step_name: str) -> str:
        """
        生成唯一工作ID(階段縮寫+序號)
        :param step_name:
        :return:
        """
        self._worker_id_counter += 1
        prefix = self.stage_name[:4].upper()
        return f"{prefix}-{self._worker_id_counter}"

    def _execute_step(self, step: StepDefinition, worker_id: str, context):
        """
        執行步驟(帶錯誤重試)
        :param step:
        :param worker_id:
        :param context:
        :return:
        """
        start_time = time.time()
        try:
            self.logger.info("開始步驟", stage=self.stage_name, step=step.name, worker_id=worker_id)
            step.executor(context)  # 執行實際業務邏輯,傳入context

            duration = time.time() - start_time
            metrics = {
                "cost": round(duration, 7),
                "retry": 0  # 後續可擴展重試邏輯
            }
            self.logger.info("步驟完成", stage=self.stage_name, step=step.name, **metrics)

            # 通過屏障同步
            barrier_result = self.barrier.wait(worker_id, metrics)
            self.logger.info("=== 【{}屏障通過】 ===".format(self.stage_name), **barrier_result)

        except Exception as e:
            self.logger.error("步驟失敗", step=step.name, error=str(e))
            raise

    def execute(self, context: WorkflowContext) -> WorkflowContext:
        """
        執行當前階段所有步驟(含動態新增)
        :param context:
        :return:
        """
        steps = self.registry.get_stage_steps(self.stage_name)
        self.logger.info("階段啟動",
                         stage=self.stage_name,
                         total_steps=len(steps),
                         steps=[s.name for s in steps])

        # 啟動所有步驟線程
        for step in steps:
            worker_id = self._generate_worker_id(step.name)
            # 關鍵:在線程啟動前註冊參與者到屏障
            self.barrier.register(worker_id)
            thread = Thread(
                target=self._execute_step,
                args=(step, worker_id, context),
                name=f"{self.stage_name}-{step.name}"
            )
            self._threads.append(thread)
            thread.start()

        # 等待全部完成
        for t in self._threads:
            t.join()

        return context



# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:44 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : service.py

import json
import time
import uuid
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.events import QualityPassedEvent, ArchivingCompletedEvent, EventBus


class ArchivingService:
    """
    自動化歸檔服務"
    """

    def __init__(self, logger: StructuredLogger):
        """

        :param logger:
        """
        self.logger = logger

    def handle_quality_passed(self, event: QualityPassedEvent):
        """
        監聽質檢通過事件,執行自動歸檔
        :param event:
        :return:
        """
        self.logger.separator()
        self.logger.info("📥 觸發自動歸檔流水線", workflow_id=event.workflow_id, sku=event.product_sku)

        # 模擬耗時較長的檔案處理操作
        time.sleep(0.5)

        # 1. 生成唯一的數字資產檔案ID
        archive_id = f"ARC-{event.workflow_id.split('-')[-1]}-{uuid.uuid4().hex[:8].upper()}"

        # 2. 構建標準化檔案數據
        digital_archive = {
            "archive_id": archive_id,
            "workflow_id": event.workflow_id,
            "product_sku": event.product_sku,
            "quality_certified_at": event.passed_at,
            "digital_twin_status": "ACTIVE",
            "storage_zone": "A-03-GOLD"  # 模擬分配倉庫貨位
        }

        # 3. 持久化到數據庫 / 區塊鏈存證 (此處用日誌模擬)
        self.logger.info("✅ 數字檔案創建成功", **digital_archive)
        # === 核心改動:發佈歸檔完成事件,通知下游系統 ===
        archiving_done_event = ArchivingCompletedEvent(
            workflow_id=event.workflow_id,
            archive_id=archive_id,
            storage_zone=digital_archive["storage_zone"]
        )
        EventBus.publish(archiving_done_event)

        # 4. 通知下游系統(如ERP或WMS)更新庫存狀態
        self._sync_to_warehouse_system(digital_archive)

        return archive_id

    def _sync_to_warehouse_system(self, archive_data: dict):
        """
        模擬對接外部倉儲系統的API
        :param archive_data:
        :return:
        """
        self.logger.info("【API調用】向WMS系統推送入庫預佔指令",
                         target="WMS_CORE",
                         payload=json.dumps(archive_data))


# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:05 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : service.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
import time
from NBarrierPattern.services.BaseStageService import BaseStageService


class InventoryService(BaseStageService):
    """

    """
    def __init__(self, logger: StructuredLogger, barrier: NBarrier):
        """

        :param logger:
        :param barrier:
        """
        inv_steps = {k: v for k, v in WorkflowConfig.STEP_DURATIONS.items() if k in ["檔案歸檔", "庫存更新", "貼標入倉", "入庫核驗", "成品拍照"]}
        super().__init__(logger, barrier, "入庫", inv_steps)


# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:05 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : service.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext, QualityStatus
import time
from NBarrierPattern.services.BaseStageService import BaseStageService
from threading import Thread
from NBarrierPattern.config.registry import WorkflowRegistry
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier


class ProductionService(BaseStageService):
    """

    """
    def __init__(self, logger: StructuredLogger, barrier: NBarrier):
        """

        :param logger:
        :param barrier:
        """
        super().__init__(logger, barrier, "生產", WorkflowConfig.STEP_DURATIONS)


class ProductionServiceWithRework:
    """
    具備自動返工能力的生產階段服務
    """

    def __init__(self, stage_name: str, registry: WorkflowRegistry, barrier: DynamicNBarrier, logger: StructuredLogger):
        self.stage_name = stage_name
        self.registry = registry
        self.barrier = barrier
        self.logger = logger

    def _execute_single_step(self, step_def, worker_id, context: WorkflowContext):
        """
        單步執行器(包含異常冒泡)
        :param step_def:
        :param worker_id:
        :param context:
        :return:
        """
        try:
            self.logger.info("開始步驟", stage=self.stage_name, step=step_def.name, worker_id=worker_id)
            step_def.executor(context)  # 傳入context以便質檢步驟修改狀態

            duration = 1.0  # 模擬耗時
            metrics = {"cost": round(duration, 7), "retry": context.rework_count}
            self.logger.info("步驟完成", stage=self.stage_name, step=step_def.name, **metrics)

            # 只有非質檢步驟才需要通過屏障,質檢步驟單獨處理
            if step_def.name != "品質質檢":
                barrier_result = self.barrier.wait(worker_id, metrics)
                self.logger.info("=== 【{}屏障通過】 ===".format(self.stage_name), **barrier_result)

        except Exception as e:
            self.logger.error("步驟執行異常", step=step_def.name, error=str(e))
            raise

    def execute_with_rework_loop(self, context: WorkflowContext) -> WorkflowContext:
        """
        帶自動返工閉環的主執行方法
        :param context:
        :return:
        """

        while True:
            steps = self.registry.get_stage_steps(self.stage_name)
            threads = []

            # 如果是返工批次,需要重置屏障狀態,讓所有工人重新就位
            if context.quality_status == QualityStatus.FAILED:
                self.logger.separator()
                self.logger.warning("⚠️ 觸發自動返工機制",
                                    workflow_id=context.workflow_id,
                                    current_attempt=context.rework_count)
                self.barrier.reset()  # 關鍵:重置屏障,準備迎接新一輪同步

                # 模擬返工前的物理操作(如退回生產線)
                self._trigger_physical_rework_action()

            # 啟動本輪次的所有線程
            for step in steps:
                worker_id = f"{self.stage_name[:4].upper()}-{steps.index(step) + 1}"
                # 動態註冊確保屏障參與者對齊
                if worker_id not in self.barrier._participants:
                    self.barrier.register(worker_id)

                t = Thread(target=self._execute_single_step, args=(step, worker_id, context))
                threads.append(t)
                t.start()

            # 等待本輪次全部完成
            for t in threads:
                t.join()

            # === 核心判斷:檢查質檢結果 ===
            if context.quality_status == QualityStatus.PASSED or context.quality_status == QualityStatus.REWORKED:
                self.logger.info("✅ 質檢通過,進入下一階段", status=context.quality_status.value)
                break  # 跳出循環,進入入庫階段
            elif context.quality_status == QualityStatus.FAILED:
                self.logger.warning("❌ 質檢未通過,將重新執行生產流程...")
                continue  # 繼續while循環,觸發返工
            else:
                raise RuntimeError("未知的質檢狀態")

        return context

    def _trigger_physical_rework_action(self):
        """模擬物理世界的返工操作"""
        self.logger.info("【系統指令】下發返工單至車間終端...", action="rework_dispatch")
        time.sleep(0.5)
        self.logger.info("【系統指令】原料重新上料完成", action="material_reload")


# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:05 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : service.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext, ShippingStatus, WmsTimeoutError, InvalidAddressError,BaseShippingException
import time
from NBarrierPattern.services.BaseStageService import BaseStageService
import random



class ShippingService(BaseStageService):
    """

    """
    def __init__(self, logger: StructuredLogger, barrier: NBarrier):
        """

        :param logger:
        :param barrier:
        """
        self.logger = logger
        self.max_retries = 3  # 最大重試次數
        self.base_delay = 1.0  # 基礎等待時間(秒)

        ship_steps = {"批量發貨": WorkflowConfig.STEP_DURATIONS["批量發貨"]}
        super().__init__(logger, barrier, "發貨", ship_steps)


class ShippingServiceWithRetry:
    """

    """
    def __init__(self, logger: StructuredLogger):
        self.logger = logger
        self.max_retries = 3  # 最大重試次數
        self.base_delay = 1.0  # 基礎等待時間(秒)

    def _call_wms_api(self, context: WorkflowContext):
        """
        模擬調用外部WMS/物流系統API
        :param context:
        :return:
        """
        # 模擬:前兩次調用拋出超時異常,第三次成功
        if context.metadata.get("wms_attempt", 0) < 2:
            context.metadata["wms_attempt"] = context.metadata.get("wms_attempt", 0) + 1
            raise WmsTimeoutError()

        self.logger.info("【WMS接口】物流單號生成成功", tracking_no="SF123456789")
        return {"tracking_no": "SF123456789"}

    def execute_shipping_with_retry(self, context: WorkflowContext):
        """
        帶自動重試機制的發貨執行器
        :param context:
        :return:
        """
        attempt = 0

        while attempt <= self.max_retries:
            try:
                attempt += 1
                self.logger.info(f"🚚 發起發貨請求 (第{attempt}次嘗試)", workflow_id=context.workflow_id)

                # === 核心業務動作 ===
                result = self._call_wms_api(context)

                # 成功則更新狀態並退出循環
                context.metadata.update(result)
                ShippingStatus.shipping_status = ShippingStatus.SUCCESS
                self.logger.info("✅ 發貨流程最終完成", status=ShippingStatus.SUCCESS.value)
                return

            except BaseShippingException as e:
                if not e.is_recoverable:
                    # 遇到不可恢復異常(如地址錯誤),直接終止,記錄日誌等待人工處理
                    ShippingStatus.shipping_status = ShippingStatus.FAILED_PERMANENT
                    self.logger.error("❌ 發生不可恢復錯誤,停止重試", error=str(e))
                    return

                # 遇到可恢復異常,判斷是否還有重試機會
                if attempt >= self.max_retries:
                    ShippingStatus.shipping_status = ShippingStatus.FAILED_RECOVERABLE
                    self.logger.warning("⚠️ 達到最大重試次數,轉入死信隊列/人工干預",
                                        workflow_id=context.workflow_id)
                    return

                # === 指數退避 + 隨機抖動 (Jitter) ===
                # 公式:delay = base_delay * (2 ^ (attempt - 1)) + random_jitter
                delay = self.base_delay * (2 ** (attempt - 1))
                jitter = random.uniform(0, 0.5)  # 增加隨機性,防止多客戶端同步重試導致雪崩
                total_wait = delay + jitter

                self.logger.warning(f"⏳ 檢測到瞬時故障,將在 {total_wait:.2f}s 後重試...",
                                    next_attempt=attempt + 1, reason=str(e))
                time.sleep(total_wait)

調用:

# encoding: utf-8 
# 版權所有  2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author    : geovindu,Geovin Du 塗聚文.
# IDE       : PyCharm 2024.3.6 python 3.11
# os        : windows 10
# database  : mysql 9.0 sql server 2019, postgreSQL 17.0  Oracle 21c Neo4j
# Datetime  : 2026/5/27 21:13 
# User      :  geovindu
# Product   : PyCharm
# Project   : pydesginpattern
# File      : NBarrierBll.py

from NBarrierPattern.config.registry import WorkflowRegistry,StepDefinition
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.services.production.service import ProductionService,ProductionServiceWithRework
from NBarrierPattern.services.inventory.service import InventoryService
from NBarrierPattern.services.shipping.service import ShippingService
from NBarrierPattern.domain.models import WorkflowContext, QualityStatus
from NBarrierPattern.services.stage_service import StageService
from threading import Thread
import time
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
import random
from NBarrierPattern.domain.events import EventBus, QualityPassedEvent
from NBarrierPattern.services.archiving.service import ArchivingService
from NBarrierPattern.core.orchestrator.workflow_engine import WorkflowOrchestrator


class NBarrierBll(object):
    """

    """

    @staticmethod
    def run_basic_flow():
        """
        基礎流程:無返工/無歸檔
        :return:
        """
        logger = StructuredLogger()
        logger.title("基礎生產流程演示(無返工)")

        # 1. 初始化註冊表
        registry = WorkflowRegistry.init_default_steps()

        # 2. 綁定基礎質檢邏輯
        for step in registry.get_steps("production"):
            if step.name == "品質質檢":
                step.executor = NBarrierBll.quality_always_pass

        # 3. 啟動編排器
        orchestrator = WorkflowOrchestrator(registry, logger)
        orchestrator.start_production()

    @staticmethod
    def run_archiving_flow():
        """
        歸檔流程:質檢觸發歸檔事件
        :return:
        """
        logger = StructuredLogger()
        logger.title("歸檔生產流程演示(質檢觸發歸檔)")

        # 1. 初始化註冊表
        registry = WorkflowRegistry.init_default_steps()

        # 2. 綁定歸檔版質檢
        for step in registry.get_steps("production"):
            if step.name == "品質質檢":
                step.executor = NBarrierBll.quality_with_archiving

        # 3. 初始化歸檔服務(自動訂閱事件)
        ArchivingService(logger)

        # 4. 啟動編排器
        orchestrator = WorkflowOrchestrator(registry, logger)
        orchestrator.start_production()

    @staticmethod
    def run_rework_flow():
        """
        返工流程:模擬質檢失敗重試
        :return:
        """
        logger = StructuredLogger()
        logger.title("返工生產流程演示(30%失敗率)")

        # 1. 初始化註冊表
        registry = WorkflowRegistry.init_default_steps()

        # 2. 綁定返工版質檢
        for step in registry.get_steps("production"):
            if step.name == "品質質檢":
                step.executor = NBarrierBll.quality_with_rework

        # 3. 初始化返工服務
        barrier = DynamicNBarrier("PRODUCTION_BARRIER")
        production = ProductionServiceWithRework(
            "production",
            registry,
            barrier,
            logger
        )

        # 4. 執行帶返工的生產階段
        context = WorkflowContext()
        production.execute_with_rework_loop(context)

        # 5. 繼續後續流程(入庫/發貨)
        inventory = StageService("inventory", registry, DynamicNBarrier("INVENTORY_BARRIER"), logger)
        shipping = StageService("shipping", registry, DynamicNBarrier("SHIPPING_BARRIER"), logger)

        inventory.execute(context)
        shipping.execute(context)

        logger.success("✅ 返工流程完整結束", workflow_id=context.workflow_id)



    @staticmethod
    def mock_design(context: WorkflowContext):
        """
        模擬款式設計
        :param context:
        :return:
        """
        print(f"[{context.workflow_id}] 🎨 款式設計完成")
        time.sleep(1)

    @staticmethod
    def mock_material_procurement(context: WorkflowContext):
        """
        模擬原料採購
        :param context:
        :return:
        """
        print(f"[{context.workflow_id}] 💎 原料採購完成")
        time.sleep(1)

    # --- 質檢邏輯變體 ---

    @staticmethod
    def mock_quality_inspection_always_pass(context: WorkflowContext):
        """
        模擬必定通過的質檢(用於基礎測試)
        """
        print("--- 正在進行精密儀器質檢 (基礎版) ---")
        time.sleep(1)
        QualityStatus.quality_status = QualityStatus.PASSED
        print("判定:合格 ")

    @staticmethod
    def mock_quality_inspection_with_archiving(context: WorkflowContext):
        """
        模擬質檢通過並觸發歸檔事件
        """
        print("--- 正在進行精密儀器質檢 (歸檔版) ---")
        time.sleep(1)
        QualityStatus.quality_status = QualityStatus.PASSED

        # 發佈事件
        event = QualityPassedEvent(
            workflow_id=context.workflow_id,
            product_sku="JW-RING-DIAMOND-001",
            metadata={"carat": 1.5, "clarity": "VVS1"}
        )
        EventBus.publish(event)
        print("判定:合格  (已觸發歸檔事件)")

    @staticmethod
    def mock_quality_inspection_with_rework(context: WorkflowContext):
        """
        模擬帶返工邏輯的質檢(前兩次失敗,第三次成功)
        注意:這裡假設 context 是可變的,或者你有辦法更新狀態
        """
        print("--- 正在進行精密儀器質檢 (返工版) ---")
        time.sleep(1)

        # 簡單的返工邏輯模擬
        if random.random() < 0.3:  # 30% 概率失敗
            print("質檢員發現瑕疵!判定:不合格 ")
            # 這裡應該拋出異常或設置標誌位,由外層循環捕獲
            # 假設 context 有一個屬性記錄狀態
            QualityStatus.quality_status = QualityStatus.FAILED
        else:
            print("複檢通過。判定:合格 ")
            QualityStatus.quality_status = QualityStatus.PASSED

    @staticmethod
    def mock_quality_inspection(context):
        """模擬必定通過的質檢"""
        print("--- 正在進行精密儀器質檢 ---")
        time.sleep(1.0)
        QualityStatus.quality_status = QualityStatus.PASSED

        # 發佈質檢通過事件,交給歸檔服務處理
        from NBarrierPattern.domain.events import QualityPassedEvent
        EventBus.publish(QualityPassedEvent(
            workflow_id=context.workflow_id,
            product_sku="JW-RING-DIAMOND-001",
            metadata={"carat": 1.5}
        ))

    # --- 演示用的 demo 方法(建議移至 main.py,但為了方便你測試先留著)---
    @staticmethod
    def demo_execution(self):
        """
        演示執行流程(修復版)
        注意:這裡只演示一種模式,推薦使用 Orchestrator 模式
        """
        from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
        from NBarrierPattern.config.registry import WorkflowRegistry
        from NBarrierPattern.core.orchestrator.workflow_engine import WorkflowOrchestrator

        logger = StructuredLogger()
        logger.info("=== 珠寶全流程生產系統啟動 ===")

        # 1. 初始化註冊中心
        registry = WorkflowRegistry.init_default_steps()

        # --- 場景1:演示基礎流程 ---
        logger.info("=== 場景1:基礎生產流程(無返工)===")

        # 綁定基礎質檢邏輯
        for step in registry.get_steps("production"):
            if step.name == "品質質檢":
                step.executor = self.mock_quality_inspection_always_pass

        # 初始化編排器並啟動
        orchestrator = WorkflowOrchestrator(registry, logger)
        # 注意:start_production 可能是異步的,如果需要阻塞等待,看你的 Orchestrator 實現
        orchestrator.start_production()

        # 簡單等待,實際應使用事件或 Future 等待完成
        time.sleep(10)

        # --- 場景2:演示返工流程 ---
        # 注意:這裡需要重新初始化 registry 或重置步驟,否則質檢邏輯還是上面的那個
        logger.info("=== 場景2:帶返工的生產流程 ===")

        registry2 = WorkflowRegistry.init_default_steps()
        for step in registry2.get_steps("production"):
            if step.name == "品質質檢":
                step.executor = self.mock_quality_inspection_with_rework

        # 假設你有一個專門處理返工的服務
        from NBarrierPattern.services.production.service import ProductionServiceWithRework
        from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier

        barrier = DynamicNBarrier("PROD_REWORK")
        service = ProductionServiceWithRework("production", registry2, barrier, logger)
        context = WorkflowContext()

        # 執行帶返工的流程
        service.execute_with_rework_loop(context)

        logger.info("所有演示場景完成")

    def demo(self):
        """

        :return:
        """

        logger = StructuredLogger()
        logger.info("=== 珠寶全流程生產系統啟動 ===")
        logger.info("配置加載完成",
                    生產步驟=WorkflowConfig.PRODUCTION_STEPS_COUNT,
                    入庫步驟=WorkflowConfig.INVENTORY_STEPS_COUNT,
                    重試策略=WorkflowConfig.RETRY_POLICY["strategy"])

        context = WorkflowContext()

        registry = WorkflowRegistry.init_default_steps()  # 初始化默認步驟
        # 綁定自定義質檢邏輯
        for step in registry._stage_steps["production"]:
            if step.name == "品質質檢":
                step.executor = NBarrierBll.mock_quality_inspection

                # 初始化歸檔服務(內部會自動訂閱質檢通過事件)
        ArchivingService(logger)

        # 初始化編排引擎(內部會自動訂閱歸檔完成事件)
        orchestrator = WorkflowOrchestrator(registry, logger)

        logger.info("=== 💎 珠寶全流程智能生產系統就緒 ===")

        # 僅需一行代碼觸發生產,後續所有環節全自動流轉
        orchestrator.start_production()

        # 初始化歸檔服務並註冊事件監聽器
        archiving_service = ArchivingService(logger)
        EventBus.subscribe(QualityPassedEvent, archiving_service.handle_quality_passed)
        # 替換原有的品質質檢執行器
        for step in registry._stage_steps["production"]:
            if step.name == "品質質檢":
                step.executor = NBarrierBll.mock_quality_inspection_with_archiving

        prod_barrier = DynamicNBarrier("PRODUCTION_BARRIER")
        production = StageService("production", registry, prod_barrier, logger)

        context = WorkflowContext()

        logger.info("=== 珠寶全流程生產系統啟動(含自動歸檔機制)===")

        # 執行生產階段(包含質檢和觸發的自動歸檔)
        production.execute(context)

        logger.separator()
        logger.info("🎉 質檢歸檔完成,準備進入物理入庫環節...", workflow_id=context.workflow_id)

        # 替換原有的品質質檢執行器為我們自定義的模擬函數
        for step in registry._stage_steps["production"]:
            if step.name == "品質質檢":
                step.executor = NBarrierBll.mock_quality_inspection

        prod_barrier = DynamicNBarrier("PRODUCTION_BARRIER")
        production = ProductionServiceWithRework("production", registry, prod_barrier, logger)


        # 創建屏障(數量由註冊中心動態決定)
        prod_barrier = DynamicNBarrier("PRODUCTION_BARRIER")
        inv_barrier = DynamicNBarrier("INVENTORY_BARRIER")
        ship_barrier = DynamicNBarrier("SHIPPING_BARRIER")

        # 創建階段服務
        production = StageService("production", registry, prod_barrier, logger)
        inventory = StageService("inventory", registry, inv_barrier, logger)
        shipping = StageService("shipping", registry, ship_barrier, logger)

        context = WorkflowContext()

        # === 動態步驟演示:在生產階段運行中插入VIP定製 ===
        def vip_engraving_task(context=None):
            logger.info("【VIP專屬】開始激光刻字", duration=1.5)
            time.sleep(1.5)
            logger.info("【VIP專屬】刻字完成", content="永恆之約")

        # 模擬生產階段執行到一半時動態添加步驟
        def simulate_dynamic_addition():
            time.sleep(4.0)  # 等待部分步驟完成
            logger.separator()
            logger.info("⚠️ 檢測到VIP訂單,動態插入定製步驟...")

            # 創建新步驟定義
            vip_step = StepDefinition(
                name="VIP定製刻字",
                duration=1.5,
                executor=vip_engraving_task,
                stage="production"  # 關鍵:必須指定所屬階段
            )
            production.add_step(vip_step)  # 動態註冊到生產階段

        # 啟動動態添加線程(模擬運行時事件)
        dynamic_thread = Thread(target=simulate_dynamic_addition, daemon=True)
        dynamic_thread.start()

        # 執行全流程
        logger.info("=== 珠寶全流程生產系統啟動 ===")
        logger.info("當前配置",
                    production_steps=registry.get_step_count("production"),
                    inventory_steps=registry.get_step_count("inventory"))

        logger.info("=== 階段1:生產流程 ===")
        production.execute(context)

        logger.info("=== 階段2:入庫流程 ===")
        inventory.execute(context)

        logger.info("=== 階段3:發貨流程 ===")
        shipping.execute(context)

        # 輸出最終屏障狀態
        logger.separator()
        logger.info("✅ 全流程完成",
                    workflow_id=context.workflow_id,
                    production_workers=prod_barrier.get_status()["total_participants"],
                    inventory_workers=inv_barrier.get_status()["total_participants"])

        # 1. 初始化屏障與服務
        prod_barrier = NBarrier(WorkflowConfig.PRODUCTION_STEPS_COUNT, "PRODUCTION_BARRIER")
        prod_service = ProductionService(logger, prod_barrier)

        inv_barrier = NBarrier(WorkflowConfig.INVENTORY_STEPS_COUNT, "INVENTORY_BARRIER")
        inv_service = InventoryService(logger, inv_barrier)

        ship_barrier = NBarrier(WorkflowConfig.SHIPPING_STEPS_COUNT, "SHIPPING_BARRIER")
        ship_service = ShippingService(logger, ship_barrier)

        # 2. 順序執行全流程(屏障保證內部併發安全)
        logger.info("=== 等待所有生產步驟完成 ===")
        prod_service.execute(context)

        logger.info("=== 等待所有入庫步驟完成 ===")
        inv_service.execute(context)

        logger.info("=== 等待發貨完成 ===")
        ship_service.execute(context)

        # 3. 彙總結果
        total_steps = WorkflowConfig.PRODUCTION_STEPS_COUNT + WorkflowConfig.INVENTORY_STEPS_COUNT + WorkflowConfig.SHIPPING_STEPS_COUNT
        logger.separator()
        logger.info("✅ 全流程完成", 總任務數=total_steps, 成功數=total_steps, 流程ID=context.workflow_id)
        logger.info("✅ 珠寶成品 → 生產 → 入庫 → 發貨 全流程完成")
        logger.separator()

輸出:

image

哲學管理(學)人生, 文學藝術生活, 自動(計算機學)物理(學)工作, 生物(學)化學逆境, 歷史(學)測繪(學)時間, 經濟(學)數學金錢(理財), 心理(學)醫學情緒, 詩詞美容情感, 美學建築(學)家園, 解構建構(分析)整合學習, 智商情商(IQ、EQ)運籌(學)生存.---Geovin Du(塗聚文)