












項目結構:

# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:03
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : structured_logger.py
import json
from datetime import datetime
class StructuredLogger:
"""
結構化日誌適配器
"""
@staticmethod
def info(msg: str, **kwargs):
"""
:param msg:
:param kwargs:
:return:
"""
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
log_entry = {"level": "info", "time": timestamp, "msg": msg, **kwargs}
print(json.dumps(log_entry, ensure_ascii=False))
def error(self, msg: str, **kwargs):
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
log_entry = {"level": "error", "time": timestamp, "msg": msg, **kwargs}
print(json.dumps(log_entry, ensure_ascii=False))
def warning(self, msg: str, **kwargs):
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
log_entry = {"level": "warning", "time": timestamp, "msg": msg, **kwargs}
print(json.dumps(log_entry, ensure_ascii=False))
def alert(self, msg: str, **kwargs):
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f+0800")[:-3]
log_entry = {"level": "alert", "time": timestamp, "msg": msg, **kwargs}
print(json.dumps(log_entry, ensure_ascii=False))
@staticmethod
def separator():
print("========================================")
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 20:52
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : registry.py
from typing import Dict, Callable, Optional
from dataclasses import dataclass
import time
@dataclass
class StepDefinition:
"""
步驟元數據定義(支持動態註冊)
"""
name: str
duration: float
executor: Callable # 步驟執行函數
stage: str # 所屬階段(生產/入庫/發貨)
class WorkflowRegistry:
"""
工作流注冊中心(單例模式)
"""
_instance = None
_steps: Dict[str, StepDefinition] = {}
_stage_steps: Dict[str, list] = {"production": [], "inventory": [], "shipping": []}
def __new__(cls):
"""
"""
if cls._instance is None:
cls._instance = super(WorkflowRegistry, cls).__new__(cls)
return cls._instance
def register_step(self, step: StepDefinition) -> None:
"""
動態註冊新步驟
:param step:
:return:
"""
self._steps[step.name] = step
self._stage_steps[step.stage].append(step)
print(f"[註冊中心] 已註冊步驟: {step.name} ({step.stage}) | 耗時: {step.duration}s")
def get_stage_steps(self, stage: str) -> list:
"""
獲取階段所有步驟(含動態新增)
:param stage:
:return:
"""
return self._stage_steps.get(stage, [])
def get_step_count(self, stage: str) -> int:
"""
實時計算階段步驟數量(支持動態擴展)
:param stage:
:return:
"""
return len(self._stage_steps.get(stage, []))
# === DYNAMIC STEP ENHANCEMENT ===
@classmethod
def init_default_steps(cls):
"""
初始化默認步驟(保留原有流程)
:return:
"""
registry = cls()
# 生產階段
registry.register_step(StepDefinition("款式設計", 3.0, lambda ctx: time.sleep(3.0), "production"))
registry.register_step(StepDefinition("原料採購", 2.0, lambda ctx: time.sleep(2.0), "production"))
registry.register_step(StepDefinition("工藝加工", 4.0, lambda ctx: time.sleep(4.0), "production"))
registry.register_step(StepDefinition("品質質檢", 2.0, lambda ctx: time.sleep(2.0), "production"))
registry.register_step(StepDefinition("成品包裝", 1.0, lambda ctx: time.sleep(1.0), "production"))
# 入庫階段
registry.register_step(StepDefinition("檔案歸檔", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
registry.register_step(StepDefinition("庫存更新", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
registry.register_step(StepDefinition("貼標入倉", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
registry.register_step(StepDefinition("入庫核驗", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
registry.register_step(StepDefinition("成品拍照", 1.0, lambda ctx: time.sleep(1.0), "inventory"))
# 發貨階段
registry.register_step(StepDefinition("批量發貨", 2.0, lambda ctx: time.sleep(2.0), "shipping"))
return registry
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 20:56
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : settings.py
class WorkflowConfig:
"""工作流全局配置"""
PRODUCTION_STEPS_COUNT = 5
INVENTORY_STEPS_COUNT = 5
SHIPPING_STEPS_COUNT = 1
# 模擬耗時(秒)
STEP_DURATIONS = {
"成品包裝": 1.0, "原料採購": 2.0, "款式設計": 3.0,
"工藝加工": 4.0, "品質質檢": 2.0, "檔案歸檔": 1.0,
"庫存更新": 1.0, "貼標入倉": 1.0, "入庫核驗": 1.0,
"成品拍照": 1.0, "批量發貨": 2.0
}
RETRY_POLICY = {
"strategy": "指數退避",
"max_retries": 2
}
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 20:54
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : dynamic_barrier.py
import threading
import time
from typing import Dict, List, Set
from NBarrierPattern.domain.exceptions import BarrierTimeoutError
class DynamicNBarrier:
"""
支持動態參與者的彈性屏障
"""
def __init__(self, barrier_id: str, timeout: float = 30.0):
self.barrier_id = barrier_id
self.timeout = timeout
self._lock = threading.RLock()
self._condition = threading.Condition(self._lock)
self._participants: Dict[str, bool] = {} # worker_id -> is_arrived
self._completion_time = None
self._reset_flag = False
def register(self, worker_id: str) -> None:
"""
動態註冊新參與者(運行時可調用)
:param worker_id:
:return:
"""
with self._lock:
if worker_id not in self._participants:
self._participants[worker_id] = False
print(f"[屏障] {self.barrier_id} 新註冊參與者: {worker_id}")
def wait(self, worker_id: str, context: dict) -> dict:
"""
帶超時控制的等待(支持動態參與者)
:param worker_id:
:param context:
:return:
"""
start_time = time.time()
with self._condition:
# 標記到達
if worker_id not in self._participants:
raise ValueError(f"參與者 {worker_id} 未註冊到屏障 {self.barrier_id}")
self._participants[worker_id] = True
# 檢查是否全部到達
while not all(self._participants.values()) and not self._reset_flag:
elapsed = time.time() - start_time
if elapsed > self.timeout:
raise BarrierTimeoutError(
self.barrier_id,
[w for w, arrived in self._participants.items() if not arrived]
)
self._condition.wait(self.timeout - elapsed)
# 記錄完成時間
self._completion_time = time.time() - start_time
return {
"barrier_id": self.barrier_id,
"synchronized_workers": len(self._participants),
"active_workers": list(self._participants.keys()),
"completion_time": round(self._completion_time, 4),
**context
}
def reset(self) -> None:
"""
重置屏障狀態(供新批次流程使用)
:return:
"""
with self._condition:
self._participants = {w: False for w in self._participants}
self._reset_flag = True
self._condition.notify_all()
self._reset_flag = False
def remove(self, worker_id: str) -> None:
"""
移除參與者(動態取消註冊)
:param worker_id:
:return:
"""
with self._condition:
if worker_id in self._participants:
del self._participants[worker_id]
print(f"[屏障] {self.barrier_id} 移除參與者: {worker_id}")
self._condition.notify_all()
def get_status(self) -> dict:
"""
實時獲取屏障狀態(用於監控)
:return:
"""
with self._lock:
return {
"barrier_id": self.barrier_id,
"total_participants": len(self._participants),
"arrived_count": sum(1 for arrived in self._participants.values() if arrived),
"pending_workers": [w for w, arrived in self._participants.items() if not arrived]
}
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:00
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : n_barrier.py
from threading import Barrier as ThreadBarrier, BrokenBarrierError
import time
from NBarrierPattern.domain.exceptions import BarrierTimeoutError
import threading
import time
from typing import Dict, List, Set
class NBarrier:
"""
企業級N-Barrier實現
"""
def __init__(self, parties: int, barrier_id: str, timeout: float = 30.0):
"""
:param parties:
:param barrier_id:
:param timeout:
"""
self._parties = parties
self._timeout = timeout
self._barrier = ThreadBarrier(parties, timeout=timeout)
self.barrier_id = barrier_id
self.participants: Dict[str, bool] = {}
self._completion_time = None
self._lock = threading.RLock()
def _reset_barrier(self):
"""
重置屏障(當屏障損壞時調用)
:return:
"""
with self._lock:
self._parties = len(self.participants) if self.participants else self._parties
self._barrier = ThreadBarrier(self._parties, timeout=self._timeout)
self.participants = {w: False for w in self.participants}
def register(self, worker_id: str) -> None:
"""
註冊參與方
:param worker_id:
:return:
"""
with self._lock:
if worker_id not in self.participants:
self.participants[worker_id] = False
# 動態調整屏障大小
if len(self.participants) > self._parties:
self._parties = len(self.participants)
self._barrier = ThreadBarrier(self._parties, timeout=self._timeout)
def wait(self, worker_id: str, context: dict) -> dict:
"""
帶上下文的等待與審計記錄生成
:param worker_id:
:param context:
:return:
"""
try:
self.participants[worker_id] = True
start = time.time()
self._barrier.wait()
self._completion_time = time.time() - start
return {
"barrier_id": self.barrier_id,
"synchronized_workers": len(self.participants),
"active_workers": [w for w, s in self.participants.items() if s],
"completion_time": round(self._completion_time, 4),
**context
}
except BrokenBarrierError:
self._reset_barrier()
raise BarrierTimeoutError(
self.barrier_id,
[w for w, s in self.participants.items() if not s]
)
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:54
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : workflow_engine.py
from NBarrierPattern.config.registry import WorkflowRegistry
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
from NBarrierPattern.services.stage_service import StageService
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
from NBarrierPattern.domain.events import QualityPassedEvent, ArchivingCompletedEvent, EventBus
from NBarrierPattern.services.shipping.service import ShippingServiceWithRetry
class WorkflowOrchestrator(object):
"""
全流程自動化編排引擎
"""
def __init__(self, registry: WorkflowRegistry, logger: StructuredLogger):
"""
:param registry:
:param logger:
"""
self.registry = registry
self.logger = logger
self.context = WorkflowContext()
self.shipping_service = ShippingServiceWithRetry(logger)
# 初始化所有階段的屏障和服務
self.services = {
"production": self._build_stage_service("production"),
"inventory": self._build_stage_service("inventory"),
"shipping": self._build_stage_service("shipping")
}
# 訂閱事件,實現全自動觸發
EventBus.subscribe(ArchivingCompletedEvent, self.on_archiving_completed)
def _build_stage_service(self, stage_name: str) -> StageService:
"""
:param stage_name:
:return:
"""
barrier = DynamicNBarrier(f"{stage_name.upper()}_BARRIER")
return StageService(stage_name, self.registry, barrier, self.logger)
def start_production(self):
"""
手動或外部API觸發生產流程的起點
:return:
"""
self.logger.info("=== 🏭 階段1:生產流程啟動 ===")
self.services["production"].execute(self.context)
def on_archiving_completed(self, event: ArchivingCompletedEvent):
"""
監聽歸檔完成事件,自動觸發後續物理流轉
:param event:
:return:
"""
self.logger.separator()
self.logger.info("🔗 接收到歸檔完成信號,準備跨入物理流轉階段...",
workflow_id=event.workflow_id,
archive_id=event.archive_id)
# 1. 自動觸發入庫流程
self.logger.info("=== 📦 階段2:自動觸發入庫流程 ===")
self.services["inventory"].execute(self.context)
# 2. 自動觸發送貨流程
self.logger.info("=== 🚚 階段3:自動觸發送貨流程 ===")
self.services["shipping"].execute(self.context)
# 3. 輸出最終全鏈路閉環結果
self.logger.separator()
self.logger.info("🎉 珠寶全生命週期管理流程完美閉環!",
workflow_id=self.context.workflow_id,
final_status="已發貨",
archive_id=event.archive_id)
# 觸發帶重試保護的發貨流程
self.logger.info("=== 🚚 階段3:啟動智能發貨流程 ===")
self.shipping_service.execute_shipping_with_retry(self.context)
# 根據最終狀態輸出結果
if self.context.shipping_status.name == 'SUCCESS':
self.logger.info("🎉 全鏈路閉環完成!")
else:
self.logger.alert("⚠️ 流程異常終止,請檢查工單系統",
final_state=self.context.shipping_status.value)
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:43
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : events.py
from dataclasses import dataclass, field
from typing import List, Callable
import uuid
from datetime import datetime
from threading import Thread
@dataclass
class QualityPassedEvent:
"""
質檢通過領域事件
"""
workflow_id: str
product_sku: str
passed_at: str = field(default_factory=lambda: datetime.now().isoformat())
metadata: dict = field(default_factory=dict)
@dataclass
class ArchivingCompletedEvent:
"""
歸檔完成領域事件(觸發發貨的開關)
"""
workflow_id: str
archive_id: str
storage_zone: str
archived_at: str = field(default_factory=lambda: datetime.now().isoformat())
class EventBus:
"""
輕量級內存事件總線
"""
_subscribers: dict = {}
@classmethod
def subscribe(cls, event_type: type, handler: Callable):
if event_type not in cls._subscribers:
cls._subscribers[event_type] = []
cls._subscribers[event_type].append(handler)
@classmethod
def publish(cls, event):
event_type = type(event)
if event_type in cls._subscribers:
for handler in cls._subscribers[event_type]:
Thread(target=handler, args=(event,), daemon=True).start()
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 20:58
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : exceptions.py
class BarrierTimeoutError(Exception):
"""
屏障超時異常
"""
def __init__(self, barrier_id: str, pending_workers: list):
"""
:param barrier_id:
:param pending_workers:
"""
self.barrier_id = barrier_id
self.pending_workers = pending_workers
super().__init__(f"屏障{barrier_id}超時,未到達節點:{pending_workers}")
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 20:57
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : models.py
from dataclasses import dataclass, field
from typing import Dict, Any
import uuid
from datetime import datetime
from enum import Enum
from NBarrierPattern.domain.QualityStatus import QualityStatus
class ShippingStatus(Enum):
PENDING = "待發貨"
DISPATCHING = "下發指令中"
SUCCESS = "已發貨"
FAILED_RECOVERABLE = "失敗(可重試)" # 如WMS接口超時
FAILED_PERMANENT = "失敗(需人工介入)" # 如參數校驗不通過
@dataclass
class WorkflowContext:
"""
工作流上下文載體
"""
workflow_id: str = field(default_factory=lambda: f"JW-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6].upper()}")
quality_status: QualityStatus = QualityStatus.PENDING
shipping_status: ShippingStatus = ShippingStatus.PENDING
rework_count: int = 0 # 記錄返工次數,防止死循環
max_rework_attempts: int = 3
metadata: Dict[str, Any] = field(default_factory=dict)
def mark_as_failed(self):
"""
:return:
"""
if self.rework_count >= self.max_rework_attempts:
raise RuntimeError(f"流程{self.workflow_id}返工次數已達上限({self.max_rework_attempts}),強制終止!")
QualityStatus.quality_status = QualityStatus.FAILED
self.rework_count += 1
def mark_as_reworked(self):
"""
:return:
"""
QualityStatus.quality_status = QualityStatus.REWORKED
# 自定義業務異常體系
class BaseShippingException(Exception):
"""
"""
def __init__(self, message: str, is_recoverable: bool):
super().__init__(message)
self.is_recoverable = is_recoverable
class WmsTimeoutError(BaseShippingException):
"""
倉庫系統連接超時(可恢復)
"""
def __init__(self):
super().__init__("調用WMS接口超時", True)
class InvalidAddressError(BaseShippingException):
"""
收貨地址非法(不可恢復)
"""
def __init__(self, address: str):
super().__init__(f"收貨地址無效: {address}", False)
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:29
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : QualityStatus.py
from enum import Enum
class QualityStatus(Enum):
"""質量檢驗狀態"""
PENDING = "待質檢"
PASSED = "合格"
FAILED = "不合格(需返工)"
REWORKED = "已返工"
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:06
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : BaseStageService.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
import time
class BaseStageService:
"""
階段基類
"""
def __init__(self, logger: StructuredLogger, barrier: NBarrier, stage_name: str, steps_config: list):
"""
:param logger:
:param barrier:
:param stage_name:
:param steps_config:
"""
self.logger = logger
self.barrier = barrier
self.stage_name = stage_name
self.steps_config = steps_config
def _execute_step(self, step_name: str, duration: float, worker_id: str):
"""
隔離的步驟執行環境
:param step_name:
:param duration:
:param worker_id:
:return:
"""
self.logger.info("開始{}步驟".format(self.stage_name), step=step_name, worker_id=worker_id)
time.sleep(duration)
metrics = {"cost": round(duration + (duration * 0.0001), 7), "retry": 0}
self.logger.info("步驟完成", stage=self.stage_name, step=step_name, worker_id=worker_id, **metrics)
barrier_result = self.barrier.wait(worker_id, metrics)
self.logger.info("=== 【{}屏障通過】 ===".format(self.stage_name), **barrier_result)
def execute(self, context: WorkflowContext):
"""
:param context:
:return:
"""
threads = []
for step_name, duration in self.steps_config.items():
worker_id = f"{self.stage_name.upper()[:4]}-{list(self.steps_config.keys()).index(step_name) + 1}"
self.barrier.register(worker_id)
t = Thread(target=self._execute_step, args=(step_name, duration, worker_id))
threads.append(t)
t.start()
for t in threads:
t.join()
return context
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:22
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : stage_service.py
from threading import Thread
from NBarrierPattern.config.registry import WorkflowRegistry, StepDefinition
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
import time
from typing import Dict, List, Set
class StageService:
"""
支持動態步驟的階段服務
"""
def __init__(
self,
stage_name: str,
registry: WorkflowRegistry,
barrier: DynamicNBarrier,
logger: StructuredLogger
):
self.stage_name = stage_name
self.registry = registry
self.barrier = barrier
self.logger = logger
self._threads: List[Thread] = []
self._worker_id_counter = 0 # 用於生成唯一worker_id
def add_step(self, step: StepDefinition, context=None) -> None:
"""
運行時動態添加新步驟
:param step:
:param context:
:return:
"""
if step.stage != self.stage_name:
raise ValueError(f"步驟 {step.name} 不屬於 {self.stage_name} 階段")
self.registry.register_step(step)
# === DYNAMIC STEP ENHANCEMENT ===
# 關鍵:動態註冊新參與者到屏障
worker_id = self._generate_worker_id(step.name)
self.barrier.register(worker_id)
try:
step.executor(context) # 執行,傳入context
self.barrier.wait(worker_id, {}) # 執行完等待匯合,傳入空context
finally:
self.barrier.remove(worker_id) # 離開時移除
self.logger.info("動態添加新步驟",
stage=self.stage_name,
step=step.name,
worker_id=worker_id,
new_barrier_size=self.barrier.get_status()["total_participants"])
def _generate_worker_id(self, step_name: str) -> str:
"""
生成唯一工作ID(階段縮寫+序號)
:param step_name:
:return:
"""
self._worker_id_counter += 1
prefix = self.stage_name[:4].upper()
return f"{prefix}-{self._worker_id_counter}"
def _execute_step(self, step: StepDefinition, worker_id: str, context):
"""
執行步驟(帶錯誤重試)
:param step:
:param worker_id:
:param context:
:return:
"""
start_time = time.time()
try:
self.logger.info("開始步驟", stage=self.stage_name, step=step.name, worker_id=worker_id)
step.executor(context) # 執行實際業務邏輯,傳入context
duration = time.time() - start_time
metrics = {
"cost": round(duration, 7),
"retry": 0 # 後續可擴展重試邏輯
}
self.logger.info("步驟完成", stage=self.stage_name, step=step.name, **metrics)
# 通過屏障同步
barrier_result = self.barrier.wait(worker_id, metrics)
self.logger.info("=== 【{}屏障通過】 ===".format(self.stage_name), **barrier_result)
except Exception as e:
self.logger.error("步驟失敗", step=step.name, error=str(e))
raise
def execute(self, context: WorkflowContext) -> WorkflowContext:
"""
執行當前階段所有步驟(含動態新增)
:param context:
:return:
"""
steps = self.registry.get_stage_steps(self.stage_name)
self.logger.info("階段啟動",
stage=self.stage_name,
total_steps=len(steps),
steps=[s.name for s in steps])
# 啟動所有步驟線程
for step in steps:
worker_id = self._generate_worker_id(step.name)
# 關鍵:在線程啟動前註冊參與者到屏障
self.barrier.register(worker_id)
thread = Thread(
target=self._execute_step,
args=(step, worker_id, context),
name=f"{self.stage_name}-{step.name}"
)
self._threads.append(thread)
thread.start()
# 等待全部完成
for t in self._threads:
t.join()
return context
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:44
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : service.py
import json
import time
import uuid
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.events import QualityPassedEvent, ArchivingCompletedEvent, EventBus
class ArchivingService:
"""
自動化歸檔服務"
"""
def __init__(self, logger: StructuredLogger):
"""
:param logger:
"""
self.logger = logger
def handle_quality_passed(self, event: QualityPassedEvent):
"""
監聽質檢通過事件,執行自動歸檔
:param event:
:return:
"""
self.logger.separator()
self.logger.info("📥 觸發自動歸檔流水線", workflow_id=event.workflow_id, sku=event.product_sku)
# 模擬耗時較長的檔案處理操作
time.sleep(0.5)
# 1. 生成唯一的數字資產檔案ID
archive_id = f"ARC-{event.workflow_id.split('-')[-1]}-{uuid.uuid4().hex[:8].upper()}"
# 2. 構建標準化檔案數據
digital_archive = {
"archive_id": archive_id,
"workflow_id": event.workflow_id,
"product_sku": event.product_sku,
"quality_certified_at": event.passed_at,
"digital_twin_status": "ACTIVE",
"storage_zone": "A-03-GOLD" # 模擬分配倉庫貨位
}
# 3. 持久化到數據庫 / 區塊鏈存證 (此處用日誌模擬)
self.logger.info("✅ 數字檔案創建成功", **digital_archive)
# === 核心改動:發佈歸檔完成事件,通知下游系統 ===
archiving_done_event = ArchivingCompletedEvent(
workflow_id=event.workflow_id,
archive_id=archive_id,
storage_zone=digital_archive["storage_zone"]
)
EventBus.publish(archiving_done_event)
# 4. 通知下游系統(如ERP或WMS)更新庫存狀態
self._sync_to_warehouse_system(digital_archive)
return archive_id
def _sync_to_warehouse_system(self, archive_data: dict):
"""
模擬對接外部倉儲系統的API
:param archive_data:
:return:
"""
self.logger.info("【API調用】向WMS系統推送入庫預佔指令",
target="WMS_CORE",
payload=json.dumps(archive_data))
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:05
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : service.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext
import time
from NBarrierPattern.services.BaseStageService import BaseStageService
class InventoryService(BaseStageService):
"""
"""
def __init__(self, logger: StructuredLogger, barrier: NBarrier):
"""
:param logger:
:param barrier:
"""
inv_steps = {k: v for k, v in WorkflowConfig.STEP_DURATIONS.items() if k in ["檔案歸檔", "庫存更新", "貼標入倉", "入庫核驗", "成品拍照"]}
super().__init__(logger, barrier, "入庫", inv_steps)
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:05
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : service.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext, QualityStatus
import time
from NBarrierPattern.services.BaseStageService import BaseStageService
from threading import Thread
from NBarrierPattern.config.registry import WorkflowRegistry
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
class ProductionService(BaseStageService):
"""
"""
def __init__(self, logger: StructuredLogger, barrier: NBarrier):
"""
:param logger:
:param barrier:
"""
super().__init__(logger, barrier, "生產", WorkflowConfig.STEP_DURATIONS)
class ProductionServiceWithRework:
"""
具備自動返工能力的生產階段服務
"""
def __init__(self, stage_name: str, registry: WorkflowRegistry, barrier: DynamicNBarrier, logger: StructuredLogger):
self.stage_name = stage_name
self.registry = registry
self.barrier = barrier
self.logger = logger
def _execute_single_step(self, step_def, worker_id, context: WorkflowContext):
"""
單步執行器(包含異常冒泡)
:param step_def:
:param worker_id:
:param context:
:return:
"""
try:
self.logger.info("開始步驟", stage=self.stage_name, step=step_def.name, worker_id=worker_id)
step_def.executor(context) # 傳入context以便質檢步驟修改狀態
duration = 1.0 # 模擬耗時
metrics = {"cost": round(duration, 7), "retry": context.rework_count}
self.logger.info("步驟完成", stage=self.stage_name, step=step_def.name, **metrics)
# 只有非質檢步驟才需要通過屏障,質檢步驟單獨處理
if step_def.name != "品質質檢":
barrier_result = self.barrier.wait(worker_id, metrics)
self.logger.info("=== 【{}屏障通過】 ===".format(self.stage_name), **barrier_result)
except Exception as e:
self.logger.error("步驟執行異常", step=step_def.name, error=str(e))
raise
def execute_with_rework_loop(self, context: WorkflowContext) -> WorkflowContext:
"""
帶自動返工閉環的主執行方法
:param context:
:return:
"""
while True:
steps = self.registry.get_stage_steps(self.stage_name)
threads = []
# 如果是返工批次,需要重置屏障狀態,讓所有工人重新就位
if context.quality_status == QualityStatus.FAILED:
self.logger.separator()
self.logger.warning("⚠️ 觸發自動返工機制",
workflow_id=context.workflow_id,
current_attempt=context.rework_count)
self.barrier.reset() # 關鍵:重置屏障,準備迎接新一輪同步
# 模擬返工前的物理操作(如退回生產線)
self._trigger_physical_rework_action()
# 啟動本輪次的所有線程
for step in steps:
worker_id = f"{self.stage_name[:4].upper()}-{steps.index(step) + 1}"
# 動態註冊確保屏障參與者對齊
if worker_id not in self.barrier._participants:
self.barrier.register(worker_id)
t = Thread(target=self._execute_single_step, args=(step, worker_id, context))
threads.append(t)
t.start()
# 等待本輪次全部完成
for t in threads:
t.join()
# === 核心判斷:檢查質檢結果 ===
if context.quality_status == QualityStatus.PASSED or context.quality_status == QualityStatus.REWORKED:
self.logger.info("✅ 質檢通過,進入下一階段", status=context.quality_status.value)
break # 跳出循環,進入入庫階段
elif context.quality_status == QualityStatus.FAILED:
self.logger.warning("❌ 質檢未通過,將重新執行生產流程...")
continue # 繼續while循環,觸發返工
else:
raise RuntimeError("未知的質檢狀態")
return context
def _trigger_physical_rework_action(self):
"""模擬物理世界的返工操作"""
self.logger.info("【系統指令】下發返工單至車間終端...", action="rework_dispatch")
time.sleep(0.5)
self.logger.info("【系統指令】原料重新上料完成", action="material_reload")
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:05
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : service.py
from threading import Thread
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.domain.models import WorkflowContext, ShippingStatus, WmsTimeoutError, InvalidAddressError,BaseShippingException
import time
from NBarrierPattern.services.BaseStageService import BaseStageService
import random
class ShippingService(BaseStageService):
"""
"""
def __init__(self, logger: StructuredLogger, barrier: NBarrier):
"""
:param logger:
:param barrier:
"""
self.logger = logger
self.max_retries = 3 # 最大重試次數
self.base_delay = 1.0 # 基礎等待時間(秒)
ship_steps = {"批量發貨": WorkflowConfig.STEP_DURATIONS["批量發貨"]}
super().__init__(logger, barrier, "發貨", ship_steps)
class ShippingServiceWithRetry:
"""
"""
def __init__(self, logger: StructuredLogger):
self.logger = logger
self.max_retries = 3 # 最大重試次數
self.base_delay = 1.0 # 基礎等待時間(秒)
def _call_wms_api(self, context: WorkflowContext):
"""
模擬調用外部WMS/物流系統API
:param context:
:return:
"""
# 模擬:前兩次調用拋出超時異常,第三次成功
if context.metadata.get("wms_attempt", 0) < 2:
context.metadata["wms_attempt"] = context.metadata.get("wms_attempt", 0) + 1
raise WmsTimeoutError()
self.logger.info("【WMS接口】物流單號生成成功", tracking_no="SF123456789")
return {"tracking_no": "SF123456789"}
def execute_shipping_with_retry(self, context: WorkflowContext):
"""
帶自動重試機制的發貨執行器
:param context:
:return:
"""
attempt = 0
while attempt <= self.max_retries:
try:
attempt += 1
self.logger.info(f"🚚 發起發貨請求 (第{attempt}次嘗試)", workflow_id=context.workflow_id)
# === 核心業務動作 ===
result = self._call_wms_api(context)
# 成功則更新狀態並退出循環
context.metadata.update(result)
ShippingStatus.shipping_status = ShippingStatus.SUCCESS
self.logger.info("✅ 發貨流程最終完成", status=ShippingStatus.SUCCESS.value)
return
except BaseShippingException as e:
if not e.is_recoverable:
# 遇到不可恢復異常(如地址錯誤),直接終止,記錄日誌等待人工處理
ShippingStatus.shipping_status = ShippingStatus.FAILED_PERMANENT
self.logger.error("❌ 發生不可恢復錯誤,停止重試", error=str(e))
return
# 遇到可恢復異常,判斷是否還有重試機會
if attempt >= self.max_retries:
ShippingStatus.shipping_status = ShippingStatus.FAILED_RECOVERABLE
self.logger.warning("⚠️ 達到最大重試次數,轉入死信隊列/人工干預",
workflow_id=context.workflow_id)
return
# === 指數退避 + 隨機抖動 (Jitter) ===
# 公式:delay = base_delay * (2 ^ (attempt - 1)) + random_jitter
delay = self.base_delay * (2 ** (attempt - 1))
jitter = random.uniform(0, 0.5) # 增加隨機性,防止多客戶端同步重試導致雪崩
total_wait = delay + jitter
self.logger.warning(f"⏳ 檢測到瞬時故障,將在 {total_wait:.2f}s 後重試...",
next_attempt=attempt + 1, reason=str(e))
time.sleep(total_wait)
調用:
# encoding: utf-8
# 版權所有 2026 ©塗聚文有限公司™ ®
# 許可信息查看:言語成了邀功盡責的功臣,還需要行為每日來值班嗎
# 描述:N-Barrier Pattern 屏障模式
# Author : geovindu,Geovin Du 塗聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/5/27 21:13
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : NBarrierBll.py
from NBarrierPattern.config.registry import WorkflowRegistry,StepDefinition
from NBarrierPattern.config.settings import WorkflowConfig
from NBarrierPattern.core.barrier.n_barrier import NBarrier
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.services.production.service import ProductionService,ProductionServiceWithRework
from NBarrierPattern.services.inventory.service import InventoryService
from NBarrierPattern.services.shipping.service import ShippingService
from NBarrierPattern.domain.models import WorkflowContext, QualityStatus
from NBarrierPattern.services.stage_service import StageService
from threading import Thread
import time
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
import random
from NBarrierPattern.domain.events import EventBus, QualityPassedEvent
from NBarrierPattern.services.archiving.service import ArchivingService
from NBarrierPattern.core.orchestrator.workflow_engine import WorkflowOrchestrator
class NBarrierBll(object):
"""
"""
@staticmethod
def run_basic_flow():
"""
基礎流程:無返工/無歸檔
:return:
"""
logger = StructuredLogger()
logger.title("基礎生產流程演示(無返工)")
# 1. 初始化註冊表
registry = WorkflowRegistry.init_default_steps()
# 2. 綁定基礎質檢邏輯
for step in registry.get_steps("production"):
if step.name == "品質質檢":
step.executor = NBarrierBll.quality_always_pass
# 3. 啟動編排器
orchestrator = WorkflowOrchestrator(registry, logger)
orchestrator.start_production()
@staticmethod
def run_archiving_flow():
"""
歸檔流程:質檢觸發歸檔事件
:return:
"""
logger = StructuredLogger()
logger.title("歸檔生產流程演示(質檢觸發歸檔)")
# 1. 初始化註冊表
registry = WorkflowRegistry.init_default_steps()
# 2. 綁定歸檔版質檢
for step in registry.get_steps("production"):
if step.name == "品質質檢":
step.executor = NBarrierBll.quality_with_archiving
# 3. 初始化歸檔服務(自動訂閱事件)
ArchivingService(logger)
# 4. 啟動編排器
orchestrator = WorkflowOrchestrator(registry, logger)
orchestrator.start_production()
@staticmethod
def run_rework_flow():
"""
返工流程:模擬質檢失敗重試
:return:
"""
logger = StructuredLogger()
logger.title("返工生產流程演示(30%失敗率)")
# 1. 初始化註冊表
registry = WorkflowRegistry.init_default_steps()
# 2. 綁定返工版質檢
for step in registry.get_steps("production"):
if step.name == "品質質檢":
step.executor = NBarrierBll.quality_with_rework
# 3. 初始化返工服務
barrier = DynamicNBarrier("PRODUCTION_BARRIER")
production = ProductionServiceWithRework(
"production",
registry,
barrier,
logger
)
# 4. 執行帶返工的生產階段
context = WorkflowContext()
production.execute_with_rework_loop(context)
# 5. 繼續後續流程(入庫/發貨)
inventory = StageService("inventory", registry, DynamicNBarrier("INVENTORY_BARRIER"), logger)
shipping = StageService("shipping", registry, DynamicNBarrier("SHIPPING_BARRIER"), logger)
inventory.execute(context)
shipping.execute(context)
logger.success("✅ 返工流程完整結束", workflow_id=context.workflow_id)
@staticmethod
def mock_design(context: WorkflowContext):
"""
模擬款式設計
:param context:
:return:
"""
print(f"[{context.workflow_id}] 🎨 款式設計完成")
time.sleep(1)
@staticmethod
def mock_material_procurement(context: WorkflowContext):
"""
模擬原料採購
:param context:
:return:
"""
print(f"[{context.workflow_id}] 💎 原料採購完成")
time.sleep(1)
# --- 質檢邏輯變體 ---
@staticmethod
def mock_quality_inspection_always_pass(context: WorkflowContext):
"""
模擬必定通過的質檢(用於基礎測試)
"""
print("--- 正在進行精密儀器質檢 (基礎版) ---")
time.sleep(1)
QualityStatus.quality_status = QualityStatus.PASSED
print("判定:合格 ")
@staticmethod
def mock_quality_inspection_with_archiving(context: WorkflowContext):
"""
模擬質檢通過並觸發歸檔事件
"""
print("--- 正在進行精密儀器質檢 (歸檔版) ---")
time.sleep(1)
QualityStatus.quality_status = QualityStatus.PASSED
# 發佈事件
event = QualityPassedEvent(
workflow_id=context.workflow_id,
product_sku="JW-RING-DIAMOND-001",
metadata={"carat": 1.5, "clarity": "VVS1"}
)
EventBus.publish(event)
print("判定:合格 (已觸發歸檔事件)")
@staticmethod
def mock_quality_inspection_with_rework(context: WorkflowContext):
"""
模擬帶返工邏輯的質檢(前兩次失敗,第三次成功)
注意:這裡假設 context 是可變的,或者你有辦法更新狀態
"""
print("--- 正在進行精密儀器質檢 (返工版) ---")
time.sleep(1)
# 簡單的返工邏輯模擬
if random.random() < 0.3: # 30% 概率失敗
print("質檢員發現瑕疵!判定:不合格 ")
# 這裡應該拋出異常或設置標誌位,由外層循環捕獲
# 假設 context 有一個屬性記錄狀態
QualityStatus.quality_status = QualityStatus.FAILED
else:
print("複檢通過。判定:合格 ")
QualityStatus.quality_status = QualityStatus.PASSED
@staticmethod
def mock_quality_inspection(context):
"""模擬必定通過的質檢"""
print("--- 正在進行精密儀器質檢 ---")
time.sleep(1.0)
QualityStatus.quality_status = QualityStatus.PASSED
# 發佈質檢通過事件,交給歸檔服務處理
from NBarrierPattern.domain.events import QualityPassedEvent
EventBus.publish(QualityPassedEvent(
workflow_id=context.workflow_id,
product_sku="JW-RING-DIAMOND-001",
metadata={"carat": 1.5}
))
# --- 演示用的 demo 方法(建議移至 main.py,但為了方便你測試先留著)---
@staticmethod
def demo_execution(self):
"""
演示執行流程(修復版)
注意:這裡只演示一種模式,推薦使用 Orchestrator 模式
"""
from NBarrierPattern.adapters.logging.structured_logger import StructuredLogger
from NBarrierPattern.config.registry import WorkflowRegistry
from NBarrierPattern.core.orchestrator.workflow_engine import WorkflowOrchestrator
logger = StructuredLogger()
logger.info("=== 珠寶全流程生產系統啟動 ===")
# 1. 初始化註冊中心
registry = WorkflowRegistry.init_default_steps()
# --- 場景1:演示基礎流程 ---
logger.info("=== 場景1:基礎生產流程(無返工)===")
# 綁定基礎質檢邏輯
for step in registry.get_steps("production"):
if step.name == "品質質檢":
step.executor = self.mock_quality_inspection_always_pass
# 初始化編排器並啟動
orchestrator = WorkflowOrchestrator(registry, logger)
# 注意:start_production 可能是異步的,如果需要阻塞等待,看你的 Orchestrator 實現
orchestrator.start_production()
# 簡單等待,實際應使用事件或 Future 等待完成
time.sleep(10)
# --- 場景2:演示返工流程 ---
# 注意:這裡需要重新初始化 registry 或重置步驟,否則質檢邏輯還是上面的那個
logger.info("=== 場景2:帶返工的生產流程 ===")
registry2 = WorkflowRegistry.init_default_steps()
for step in registry2.get_steps("production"):
if step.name == "品質質檢":
step.executor = self.mock_quality_inspection_with_rework
# 假設你有一個專門處理返工的服務
from NBarrierPattern.services.production.service import ProductionServiceWithRework
from NBarrierPattern.core.barrier.dynamic_barrier import DynamicNBarrier
barrier = DynamicNBarrier("PROD_REWORK")
service = ProductionServiceWithRework("production", registry2, barrier, logger)
context = WorkflowContext()
# 執行帶返工的流程
service.execute_with_rework_loop(context)
logger.info("所有演示場景完成")
def demo(self):
"""
:return:
"""
logger = StructuredLogger()
logger.info("=== 珠寶全流程生產系統啟動 ===")
logger.info("配置加載完成",
生產步驟=WorkflowConfig.PRODUCTION_STEPS_COUNT,
入庫步驟=WorkflowConfig.INVENTORY_STEPS_COUNT,
重試策略=WorkflowConfig.RETRY_POLICY["strategy"])
context = WorkflowContext()
registry = WorkflowRegistry.init_default_steps() # 初始化默認步驟
# 綁定自定義質檢邏輯
for step in registry._stage_steps["production"]:
if step.name == "品質質檢":
step.executor = NBarrierBll.mock_quality_inspection
# 初始化歸檔服務(內部會自動訂閱質檢通過事件)
ArchivingService(logger)
# 初始化編排引擎(內部會自動訂閱歸檔完成事件)
orchestrator = WorkflowOrchestrator(registry, logger)
logger.info("=== 💎 珠寶全流程智能生產系統就緒 ===")
# 僅需一行代碼觸發生產,後續所有環節全自動流轉
orchestrator.start_production()
# 初始化歸檔服務並註冊事件監聽器
archiving_service = ArchivingService(logger)
EventBus.subscribe(QualityPassedEvent, archiving_service.handle_quality_passed)
# 替換原有的品質質檢執行器
for step in registry._stage_steps["production"]:
if step.name == "品質質檢":
step.executor = NBarrierBll.mock_quality_inspection_with_archiving
prod_barrier = DynamicNBarrier("PRODUCTION_BARRIER")
production = StageService("production", registry, prod_barrier, logger)
context = WorkflowContext()
logger.info("=== 珠寶全流程生產系統啟動(含自動歸檔機制)===")
# 執行生產階段(包含質檢和觸發的自動歸檔)
production.execute(context)
logger.separator()
logger.info("🎉 質檢歸檔完成,準備進入物理入庫環節...", workflow_id=context.workflow_id)
# 替換原有的品質質檢執行器為我們自定義的模擬函數
for step in registry._stage_steps["production"]:
if step.name == "品質質檢":
step.executor = NBarrierBll.mock_quality_inspection
prod_barrier = DynamicNBarrier("PRODUCTION_BARRIER")
production = ProductionServiceWithRework("production", registry, prod_barrier, logger)
# 創建屏障(數量由註冊中心動態決定)
prod_barrier = DynamicNBarrier("PRODUCTION_BARRIER")
inv_barrier = DynamicNBarrier("INVENTORY_BARRIER")
ship_barrier = DynamicNBarrier("SHIPPING_BARRIER")
# 創建階段服務
production = StageService("production", registry, prod_barrier, logger)
inventory = StageService("inventory", registry, inv_barrier, logger)
shipping = StageService("shipping", registry, ship_barrier, logger)
context = WorkflowContext()
# === 動態步驟演示:在生產階段運行中插入VIP定製 ===
def vip_engraving_task(context=None):
logger.info("【VIP專屬】開始激光刻字", duration=1.5)
time.sleep(1.5)
logger.info("【VIP專屬】刻字完成", content="永恆之約")
# 模擬生產階段執行到一半時動態添加步驟
def simulate_dynamic_addition():
time.sleep(4.0) # 等待部分步驟完成
logger.separator()
logger.info("⚠️ 檢測到VIP訂單,動態插入定製步驟...")
# 創建新步驟定義
vip_step = StepDefinition(
name="VIP定製刻字",
duration=1.5,
executor=vip_engraving_task,
stage="production" # 關鍵:必須指定所屬階段
)
production.add_step(vip_step) # 動態註冊到生產階段
# 啟動動態添加線程(模擬運行時事件)
dynamic_thread = Thread(target=simulate_dynamic_addition, daemon=True)
dynamic_thread.start()
# 執行全流程
logger.info("=== 珠寶全流程生產系統啟動 ===")
logger.info("當前配置",
production_steps=registry.get_step_count("production"),
inventory_steps=registry.get_step_count("inventory"))
logger.info("=== 階段1:生產流程 ===")
production.execute(context)
logger.info("=== 階段2:入庫流程 ===")
inventory.execute(context)
logger.info("=== 階段3:發貨流程 ===")
shipping.execute(context)
# 輸出最終屏障狀態
logger.separator()
logger.info("✅ 全流程完成",
workflow_id=context.workflow_id,
production_workers=prod_barrier.get_status()["total_participants"],
inventory_workers=inv_barrier.get_status()["total_participants"])
# 1. 初始化屏障與服務
prod_barrier = NBarrier(WorkflowConfig.PRODUCTION_STEPS_COUNT, "PRODUCTION_BARRIER")
prod_service = ProductionService(logger, prod_barrier)
inv_barrier = NBarrier(WorkflowConfig.INVENTORY_STEPS_COUNT, "INVENTORY_BARRIER")
inv_service = InventoryService(logger, inv_barrier)
ship_barrier = NBarrier(WorkflowConfig.SHIPPING_STEPS_COUNT, "SHIPPING_BARRIER")
ship_service = ShippingService(logger, ship_barrier)
# 2. 順序執行全流程(屏障保證內部併發安全)
logger.info("=== 等待所有生產步驟完成 ===")
prod_service.execute(context)
logger.info("=== 等待所有入庫步驟完成 ===")
inv_service.execute(context)
logger.info("=== 等待發貨完成 ===")
ship_service.execute(context)
# 3. 彙總結果
total_steps = WorkflowConfig.PRODUCTION_STEPS_COUNT + WorkflowConfig.INVENTORY_STEPS_COUNT + WorkflowConfig.SHIPPING_STEPS_COUNT
logger.separator()
logger.info("✅ 全流程完成", 總任務數=total_steps, 成功數=total_steps, 流程ID=context.workflow_id)
logger.info("✅ 珠寶成品 → 生產 → 入庫 → 發貨 全流程完成")
logger.separator()
輸出:

哲學管理(學)人生, 文學藝術生活, 自動(計算機學)物理(學)工作, 生物(學)化學逆境, 歷史(學)測繪(學)時間, 經濟(學)數學金錢(理財), 心理(學)醫學情緒, 詩詞美容情感, 美學建築(學)家園, 解構建構(分析)整合學習, 智商情商(IQ、EQ)運籌(學)生存.---Geovin Du(塗聚文)
此內容由慣性聚合(RSS閱讀器)自動聚合整理,僅供閱讀參考。 原文來自 — 版權歸原作者所有。