慣性聚合 高效追讀感興趣之博客、新聞、科技資訊
閱原文 以慣性聚合開啟

推薦訂閱源

博客园 - 司徒正美
V
V2EX
T
Tailwind CSS Blog
有赞技术团队
有赞技术团队
aimingoo的专栏
aimingoo的专栏
Apple Machine Learning Research
Apple Machine Learning Research
IT之家
IT之家
Blog — PlanetScale
Blog — PlanetScale
A
About on SuperTechFans
月光博客
月光博客
T
The Blog of Author Tim Ferriss
宝玉的分享
宝玉的分享
Martin Fowler
Martin Fowler
博客园 - 聂微东
The GitHub Blog
The GitHub Blog
V
Visual Studio Blog
WordPress大学
WordPress大学
酷 壳 – CoolShell
酷 壳 – CoolShell
Engineering at Meta
Engineering at Meta
GbyAI
GbyAI

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python)
一以灰烬为启之DDD框架于Python:万法皆源于域模型
Sahil Pohare · 2026-05-24 · via DEV Community

吾尝筑之鸣者使— 一多租户AI代理编排平台。途中,吾终成一事,觉其可独立成文:乃为Python之领域驱动设计框架,此框架取持久化、持久执行处理器及租户隔离诸利,皆源于单一资源类。

此乃受 Elixir(艾勒克斯) 启发而作也。Ash框架(Ash framework)此乃记其事、述其理、录其失之文也.


而后端之胶合,其弊何在?

  • 凡后端之码,非皆业务之理。惟接线耳。尔撰一域之象,复撰:
  • 一SQLAlchemy之模以存之
  • 一库以行插入更新之术__JHSNS_SEG_8e68e3c3_11__一HTTP之应或消息队列之食以纳令
  • 每询皆滤租客
  • 重试之术,幂等之验,竞发之控

复如是于诸实体。商理之要——实所恃者——乃薄衣于厚壳之基构胶合间.

此框架所恃之见:若域模足表,万般皆可推演.


框架

资源

Resource子类同时为 SQLAlchemy 模型、upsert 仓库及 Restate 虚拟对象。汝但书一类耳。

from ironbridge.shared.framework import Resource, ActionKind, action
from ironbridge.shared.framework.effects import ActionContext

class Widget(Resource):
    class Meta:
        tenant_scoped  = True   # inject tenant_id, enforce via Postgres RLS
        restate_object = True   # derive a Restate VirtualObject

    __tablename__ = "widgets"

    id         : Mapped[str]      = mapped_column(String, primary_key=True, default=_cuid)
    name       : Mapped[str]      = mapped_column(String, nullable=False)
    status     : Mapped[str]      = mapped_column(String, default="ACTIVE")
    created_at : Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow)

    @action(kind=ActionKind.CREATE)
    def create(self, name: str) -> "Widget":
        self.name = name
        return self

    @action(kind=ActionKind.UPDATE)
    def deactivate(self) -> "Widget":
        if self.status == "INACTIVE":
            raise ValueError("Already inactive")
        self.status = "INACTIVE"
        return self

    @action(kind=ActionKind.READ)
    def get(self) -> WidgetView:
        return WidgetView(id=self.id, name=self.name, status=self.status)

入全景模式 出全屏模式

由此,框架于启动时衍生之:

遗物 本源
SQLAlchemy ORM模型 Mapped[] 列声明——标准 SQLAlchemy,无定制包装器
tenant_id 列 + Postgres RLS 政策 Meta.tenant_scoped = True
Upsert 仓库 ORM 模型
Restate 虚拟对象 + 每个动作一处理器 Meta.restate_object = True + @action
独占与共享处理器并发 ActionKind
数据库写入后效果执行 ActionContext

无码生成,无别立之模式文书。一统类,万法皆于引时得之。

资源元:元类也

ResourceMeta延伸 SQLAlchemy 之DeclarativeBase元类者,于类之定义时运行。其有三事焉:

  1. 注入租户列。Meta.tenant_scoped = True,则注入之。tenant_id(或无论何物)Meta.tenancy_key 者,mapped_column 之属也,与 server_default = current_setting('app.tenant_id', true) 相协。域码未尝自陈此列。

  2. 聚诸行。 观其类域,索以 @action 装饰之法,乃构 cls.__actions__: dict[str, ActionMeta]。承袭之行悉纳;若荫蔽亲行而无 @action 则斥之。

  3. 乃登诸类于寰宇。寰宇资源簿,以类名系类,俾派生层于启动时得寻之,无环顾之需。

class ResourceMeta(type(Base)):
    def __new__(mcs, name, bases, namespace):
        # collect @action methods into cls.__actions__
        # parse cls.Meta into cls.__meta__
        # inject tenant_id column if tenant_scoped
        # register in global registry
        ...

入全景模式 出全屏模式

动行驱动数据库与并发之态

动之属,以明框架当如何处置其返值,及如何设其Restate之应。

善哉 数据库操作 重述并发
CREATE 插入并自动保存资源 独占
UPDATE 更新并自动保存资源 独占
DESTROY 按键删除 独占
ACTION 领域通过ActionContext控制所有写入 独占
READ 禁止写入 共享(并发)
STREAM 不写 共享(并发)

CREATEUPDATE自动保存所还Resource—框架调用repo.save(result)ACTION将全权交予域码;框架仅于DB写入后执行所集效应。READSTREAM 获取 Restate 共享处理器,使并发读取不互相阻塞。

行动体规则严苛:无 I/O,无原始 SQL,无外部调用。验证、变更、返回。框架处理持久化。

# Good
@action(kind=ActionKind.UPDATE)
def deactivate(self) -> "Widget":
    if self.status == "INACTIVE":
        raise ValueError("Already inactive")
    self.status = "INACTIVE"
    return self

# Bad — I/O in action body
@action(kind=ActionKind.UPDATE)
def deactivate(self) -> "Widget":
    db.execute(...)       # don't — framework handles persistence
    requests.post(...)    # don't — use ActionContext for side effects
    return self

进入全屏模式 退出全屏模式

效应是数据,非调用

领域代码永不导入 Restate。副作用通过声明实现ActionContext,一平实之Python对象,入之ACTION执事者:

@action(kind=ActionKind.ACTION)
def add_message(self, action_ctx: ActionContext, content: dict, participant_id: str, ...) -> Message:
    msg = Message(
        id=_cuid(),
        thread_id=self.id,
        content=content,
        participant_id=participant_id,
    )

    # If this is a human message, start an agent run
    if participant_type == "HUMAN":
        run_id = _cuid()
        action_ctx.send_workflow("AgentRun", key=run_id, arg={
            "run_id": run_id,
            "thread_id": self.id,
            "tenant_id": self.tenant_id,
        })

    # Fan out to all channels bound to this thread
    for channel_id in resolve_channels_for_thread(self.id, self.tenant_id):
        action_ctx.send_after(
            "ChannelDelivery", "deliver", key=channel_id,
            factory=lambda result, cid=channel_id: {
                "channel_id": cid,
                "message": {"position": result["position"], "content": content, ...},
            },
        )

    return msg

入全景模式 出全屏模式

ActionContext有三法焉。

  • send(service, handler, key, arg)一蹴而就至虚拟物之处理器
  • send_after(service, handler, key, factory)一火即弃,然 arg 乃建于 DB 写后之行动结果也
  • send_workflow(service, key, arg, handler)— 启动或触发重述工作流

send_after微妙者也。此讯position场域乃框架之位序计数所定,后ctx.run()— 构效之时,域码不能知也。send_after取一工廠函數,受序列之結果典後ctx.run()返归。衍层(derive layer)restate.py)呼factory(result)泛泛而言,其中无涉任何知识。ChannelDelivery,位之域,或其所以重也。

欲使其具体,今述派生层于每 ACTION 处理后所为:

# Inside _attach_handler, after ctx.run() returns the serialized result:
result = await ctx.run(action_name, _run)

# Update position counter in Restate state (for add_message-style actions)
if is_positional and next_pos is not None:
    ctx.set("position", next_pos)

# Execute effects collected by ActionContext during _run
if action_ctx:
    for effect in action_ctx.effects:
        if isinstance(effect, DeferredSendEffect):
            arg = effect.factory(result)   # factory receives the result here
            ctx.generic_send(effect.service, effect.handler, json.dumps(arg).encode(), key=effect.key)
        elif isinstance(effect, SendEffect):
            ctx.generic_send(effect.service, effect.handler, json.dumps(effect.arg).encode(), key=effect.key)
        elif isinstance(effect, WorkflowEffect):
            ctx.workflow_send(handler_fn, key=effect.key, arg=arg)

入全屏模式 出全屏模式

result此乃所返之典也ctx.run()— 其间亦含position者,框架于_run中预置之故也。工坊得窥其全。衍层泛调用factory(result),未识工坊所取之域。

者,效为实也。衍层施之。域码与基设相隔。

衍虚物:泛通无碍

derive_virtual_object(resource_cls)者,读之。cls.__actions__cls.__meta__相合,而生一Restate虛象,具正處理者並行、位次計數、不墜效應、執行之能。其無域知:

def derive_virtual_object(resource_cls: type[Resource]) -> restate.VirtualObject:
    obj = restate.VirtualObject(resource_cls.__name__)

    for action_name, action_meta in resource_cls.__actions__.items():
        _attach_handler(obj, resource_cls, action_name, action_meta)

    # If resource has add_message, inject queue management handlers
    if "add_message" in resource_cls.__actions__:
        _attach_queue_handlers(obj, resource_cls.__name__)

    return obj

入全屏模式 出全屏模式

每生處理者:

  1. 自請求中提取權限/租戶
  2. 若需,復位次計數(以add_message之式行动)
  3. ctx.run(action_name, _run)_run启租户会话,据键载资源,召行法,需则存之
  4. 更位于Restate之位
  5. _execute_effects

行所集诸效__JHSNS_SEG_8e68e3c3_124___run回调乃纯粹之函数,其内无重述之操作,亦无HTTP之调用。凡与Restate相涉者,皆发生于回调之外。

租户隔离乃结构之要义也

tenant_scoped = True触发三层之制。

第一层—柱体注入。 ResourceMeta注之tenant_idserver_default = current_setting('app.tenant_id', true)若应用之码遗忘设之,Postgres自会取会话变量而充之。

层二—会话变量。 tenant_session("tenant-abc")乃一情境管理之器,于任何查询之前,皆设SET LOCAL app.tenant_id = :tid于连接。

层三—RLS策略。Alembic迁移创CREATE POLICY tenant_isolation ON widgets USING (tenant_id = current_setting('app.tenant_id', true))

其果:阙如。WHERE tenant_id = ? 之條於應用程式,返零行,非悉行。錯誤皆閉。無故洩跨租戶資料之法,由忘記濾鏡。

with tenant_session("tenant-abc") as db:
    repo = SqlAlchemyRepository(db, Widget)
    widgets = repo.list()   # no WHERE clause — RLS filters automatically

進全屏模式 離全屏模式

皆寫為上插

SqlAlchemyRepository.save() 永遠發 INSERT ... ON CONFLICT DO UPDATE。非為風格之好——乃正確持久執行之必需也。

每值《重演》于崩殂后重演一记,则ctx.run()之回执复行。若但INSERT,则重演时生重键之谬。更置则一也。

于某些资,其冲突之的非主键。Message有天然之键。(thread_id, idempotency_key) — 同一脉络中,具同一名副不可易之钥之讯,实为同一讯也。经Restate之涤荡,作业流程重演,然讯息之ID新生,其副不可易之钥则仍旧。ON CONFLICT (id) DO UPDATE当得(新ID,无冲突)而遂,继而犯独占之制。所宜者,乃天然之钥,具DO NOTHING

class Message(Resource):
    class Meta:
        conflict_columns = ("thread_id", "idempotency_key")
        conflict_action  = "nothing"

入全屏模式 出全屏模式

ResourceMeta 此之解析,仓库自用之。


初试而废者何

自定义字段类型。 旧版有 fields.pyCuidFieldStringFieldDateTimeField,复有独立者derive/orm.py者,自彼字段定义中发SQLAlchemy之模型。其重行SQLAlchemy,甚拙。二文件,约三百行,所供类型安全,反不如土生土长之Mapped[]。遂删二者。Resource今直承DeclarativeBase

供内容哈希之恒等键。 欲自訊息內容之哈希以自動生成唯一性鑰,以除之於域API。患:二異發者可生同內。基於哈希之鑰將使異發者之訊息去重——誤義。復用呼者所供之鑰。

應用層租戶濾鏡。始於filter_by(tenant_id=...) 诸存库之法皆有之。移至 RLS。其异:应用之滤失则开(滤缺=悉行);RLS 滤失 SET LOCAL 则阖(滤缺=空会变量=零行)。一执之点,结构正然。

一单会话流程,每线程一。 尝使每线程为 Restate Workflow,存续会话于 Restate 之状态库。为 Restate SDK 缺陷所阻:WorkflowSharedContext.get/set 当处理输入为 Pydantic 模型时失灵——SDK 试图于账簿记录时序列化输入,遂以 not JSON serializable 失败。复为 Thread VirtualObject(为序消息)+ AgentRun Workflow(为执行智能体)。


教我以事之虫

SQLAlchemy之关系使记事膨胀 _serialize() 最初行遍全物之图,含关系之集合。一Thread 有二百消息,则每单一记事皆录其二百消息而尽add_message之召,致Pusher 413之误(负载过巨),使日记条目膨胀。其解:于_serialize()尽弃关系集合。处理器之返值,无须相关对象——非行动契约之部分也。

ON CONFLICT (id)于Restate清除后崩坏。既清Restate之状,工作流复演。若复演生新消息ID而同idempotency key,ON CONFLICT (id) DO UPDATE于新ID无冲突——插入遂行,复触UNIQUE(thread_id, idempotency_key)之限。其解:ON CONFLICT (thread_id, idempotency_key) DO NOTHING。若库已具此讯(实为真源),默然越之。

httpx.post于作业处理器中,易致僵锁。旧式码,书误文,乃呼之。httpx.post 自內於工作流處理器,至 Restate 入口。此致僵局:工作流行於 Restate 處理迴環,而同步呼叫回 Restate 入口則阻。_run_done 從未發動,線程佇列永阻。修:工作流處理器所有傳送皆經 ctx.generic_send — 火即忘,非阻,記錄。

Restate 清理後虛擬物件之死狀。既过,podman compose down -v则消,然Thread VirtualObject之active_run_id,犹存其旧态。_run_done永不发矣,盖流程已亡。无生机之检,则线程队列,每度清涤,皆阻绝无解。其解:_enqueue_run行前必唤ctx.workflow_call(status_fn, key=active_run_id),以审运行之真伪。若状非"running",澄明其境,即刻发之。


代理之层

此框架之要义,在于为领域驱动设计之器——凡乎 Python 之务,兼具恒久执行与关系数据库者,皆可资之。然于铁桥之特例,此框架实为代理执行之基。

代理之约,至简:

class BaseAgent(ABC):
    @abstractmethod
    async def run(self, ctx: AgentContext) -> None: ...

入全景模式 退出全屏模式

AgentContext包覆Restate之WorkflowContext,显领域级之素朴:持久步骤、史册攫取、讯息书写、人机交互之悬停。诸使无取自Restate或SQLAlchemy之事物。

class WeatherAgent(BaseAgent):
    async def run(self, ctx: AgentContext) -> None:
        history = await ctx.step("fetch_history", ctx.get_history)
        response = await ctx.step("llm_call_0", lambda: call_llm(history))
        ctx.write_message({"version": 1, "parts": [{"type": "text", "text": response}]})

agent_registry.register("weather", WeatherAgent)

进入全屏模式 退出全屏模式

ctx.step()已录之。步骤间崩坏,重演自中断处续之。人环介入,HITL命名之许诺(代理暂止,线程现消息,人应,许诺解,代理复行)。取消乃每步前窥之恒久许诺——新用户消息至,则活动运行于次步界取消之。

此框架悉理之。代理唯领域之理也。


指导之则

其持之者逾四十五事:

  1. Postgres乃真源之所在。 重述日志之志,非其状也。凡读皆归Postgres。
  2. 租户隔离,本乎结构。 RLS于库层行之。缺滤则返零行。
  3. 域无基建之引。之效,乃以ActionContext宣告之。基设施乃施之。
  4. 凡书皆插之。持久之执行,重演回调。幂等必结构,非可择。
  5. 之效,行于库书之后,持久。 PostgreSQL與Restate無共通之分布式交易管理器——非藉由二阶段提交或发箱模式,二者间之真正原子性不可得。此框架所为之事:效果于Restate處理器既返 ctx.run()後而发,此等发送亦由Restate記之。若於数据库写入与发送之间进程崩溃,Restate重演其处理器——ctx.run()复其缓存之果(无复书),并发火。DB之写乃真源;效达之保由Restate之重演。不一致之窗乃进程崩殂间。ctx.run()竟而次记《Restate》之文——此《Restate》本就设为终焉。

堆栈

Python,FastAPI,复述之 PostgreSQL (RLS), Alembic, Hypercorn (HTTP/2 必需于 Restate), Podman Compose.

框架层 (shared/framework/shared/derive/) 无领域之识。平台层 (platform/) 除经由框架外,无 Restate 之引。具体之代理与适配器 (services/) 亦然.

45 ADR 之数。docs/decisions.md 者欲深究,则可循此。