吾尝筑之鸣者使— 一多租户AI代理编排平台。途中,吾终成一事,觉其可独立成文:乃为Python之领域驱动设计框架,此框架取持久化、持久执行处理器及租户隔离诸利,皆源于单一资源类。
此乃受 Elixir(艾勒克斯) 启发而作也。Ash框架(Ash framework)此乃记其事、述其理、录其失之文也.
而后端之胶合,其弊何在?
- 凡后端之码,非皆业务之理。惟接线耳。尔撰一域之象,复撰:
- 一SQLAlchemy之模以存之
- 一库以行插入更新之术__JHSNS_SEG_8e68e3c3_11__一HTTP之应或消息队列之食以纳令
- 每询皆滤租客
- 重试之术,幂等之验,竞发之控
复如是于诸实体。商理之要——实所恃者——乃薄衣于厚壳之基构胶合间.
此框架所恃之见:若域模足表,万般皆可推演.
框架
资源
一Resource子类同时为 SQLAlchemy 模型、upsert 仓库及 Restate 虚拟对象。汝但书一类耳。
from ironbridge.shared.framework import Resource, ActionKind, action
from ironbridge.shared.framework.effects import ActionContext
class Widget(Resource):
class Meta:
tenant_scoped = True # inject tenant_id, enforce via Postgres RLS
restate_object = True # derive a Restate VirtualObject
__tablename__ = "widgets"
id : Mapped[str] = mapped_column(String, primary_key=True, default=_cuid)
name : Mapped[str] = mapped_column(String, nullable=False)
status : Mapped[str] = mapped_column(String, default="ACTIVE")
created_at : Mapped[datetime] = mapped_column(DateTime(timezone=True), default=_utcnow)
@action(kind=ActionKind.CREATE)
def create(self, name: str) -> "Widget":
self.name = name
return self
@action(kind=ActionKind.UPDATE)
def deactivate(self) -> "Widget":
if self.status == "INACTIVE":
raise ValueError("Already inactive")
self.status = "INACTIVE"
return self
@action(kind=ActionKind.READ)
def get(self) -> WidgetView:
return WidgetView(id=self.id, name=self.name, status=self.status)
由此,框架于启动时衍生之:
| 遗物 | 本源 |
|---|---|
| SQLAlchemy ORM模型 |
Mapped[] 列声明——标准 SQLAlchemy,无定制包装器 |
tenant_id 列 + Postgres RLS 政策 |
Meta.tenant_scoped = True |
| Upsert 仓库 | ORM 模型 |
| Restate 虚拟对象 + 每个动作一处理器 |
Meta.restate_object = True + @action
|
| 独占与共享处理器并发 | ActionKind |
| 数据库写入后效果执行 | ActionContext |
无码生成,无别立之模式文书。一统类,万法皆于引时得之。
资源元:元类也
ResourceMeta延伸 SQLAlchemy 之DeclarativeBase元类者,于类之定义时运行。其有三事焉:
注入租户列。若
Meta.tenant_scoped = True,则注入之。tenant_id(或无论何物)Meta.tenancy_key者,mapped_column之属也,与server_default = current_setting('app.tenant_id', true)相协。域码未尝自陈此列。聚诸行。 观其类域,索以
@action装饰之法,乃构cls.__actions__: dict[str, ActionMeta]。承袭之行悉纳;若荫蔽亲行而无@action则斥之。乃登诸类于寰宇。寰宇资源簿,以类名系类,俾派生层于启动时得寻之,无环顾之需。
class ResourceMeta(type(Base)):
def __new__(mcs, name, bases, namespace):
# collect @action methods into cls.__actions__
# parse cls.Meta into cls.__meta__
# inject tenant_id column if tenant_scoped
# register in global registry
...
动行驱动数据库与并发之态
动之属,以明框架当如何处置其返值,及如何设其Restate之应。
| 善哉 | 数据库操作 | 重述并发 |
|---|---|---|
CREATE |
插入并自动保存资源 | 独占 |
UPDATE |
更新并自动保存资源 | 独占 |
DESTROY |
按键删除 | 独占 |
ACTION |
领域通过ActionContext控制所有写入 | 独占 |
READ |
禁止写入 | 共享(并发) |
STREAM |
不写 | 共享(并发) |
CREATE与UPDATE自动保存所还Resource—框架调用repo.save(result)。ACTION将全权交予域码;框架仅于DB写入后执行所集效应。READ与STREAM 获取 Restate 共享处理器,使并发读取不互相阻塞。
行动体规则严苛:无 I/O,无原始 SQL,无外部调用。验证、变更、返回。框架处理持久化。
# Good
@action(kind=ActionKind.UPDATE)
def deactivate(self) -> "Widget":
if self.status == "INACTIVE":
raise ValueError("Already inactive")
self.status = "INACTIVE"
return self
# Bad — I/O in action body
@action(kind=ActionKind.UPDATE)
def deactivate(self) -> "Widget":
db.execute(...) # don't — framework handles persistence
requests.post(...) # don't — use ActionContext for side effects
return self
效应是数据,非调用
领域代码永不导入 Restate。副作用通过声明实现ActionContext,一平实之Python对象,入之ACTION执事者:
@action(kind=ActionKind.ACTION)
def add_message(self, action_ctx: ActionContext, content: dict, participant_id: str, ...) -> Message:
msg = Message(
id=_cuid(),
thread_id=self.id,
content=content,
participant_id=participant_id,
)
# If this is a human message, start an agent run
if participant_type == "HUMAN":
run_id = _cuid()
action_ctx.send_workflow("AgentRun", key=run_id, arg={
"run_id": run_id,
"thread_id": self.id,
"tenant_id": self.tenant_id,
})
# Fan out to all channels bound to this thread
for channel_id in resolve_channels_for_thread(self.id, self.tenant_id):
action_ctx.send_after(
"ChannelDelivery", "deliver", key=channel_id,
factory=lambda result, cid=channel_id: {
"channel_id": cid,
"message": {"position": result["position"], "content": content, ...},
},
)
return msg
ActionContext有三法焉。
-
send(service, handler, key, arg)一蹴而就至虚拟物之处理器 -
send_after(service, handler, key, factory)一火即弃,然 arg 乃建于 DB 写后之行动结果也 -
send_workflow(service, key, arg, handler)— 启动或触发重述工作流
send_after微妙者也。此讯position场域乃框架之位序计数所定,后ctx.run()— 构效之时,域码不能知也。send_after取一工廠函數,受序列之結果典後ctx.run()返归。衍层(derive layer)restate.py)呼factory(result)泛泛而言,其中无涉任何知识。ChannelDelivery,位之域,或其所以重也。
欲使其具体,今述派生层于每 ACTION 处理后所为:
# Inside _attach_handler, after ctx.run() returns the serialized result:
result = await ctx.run(action_name, _run)
# Update position counter in Restate state (for add_message-style actions)
if is_positional and next_pos is not None:
ctx.set("position", next_pos)
# Execute effects collected by ActionContext during _run
if action_ctx:
for effect in action_ctx.effects:
if isinstance(effect, DeferredSendEffect):
arg = effect.factory(result) # factory receives the result here
ctx.generic_send(effect.service, effect.handler, json.dumps(arg).encode(), key=effect.key)
elif isinstance(effect, SendEffect):
ctx.generic_send(effect.service, effect.handler, json.dumps(effect.arg).encode(), key=effect.key)
elif isinstance(effect, WorkflowEffect):
ctx.workflow_send(handler_fn, key=effect.key, arg=arg)
result此乃所返之典也ctx.run()— 其间亦含position者,框架于_run中预置之故也。工坊得窥其全。衍层泛调用factory(result),未识工坊所取之域。
者,效为实也。衍层施之。域码与基设相隔。
衍虚物:泛通无碍
derive_virtual_object(resource_cls)者,读之。cls.__actions__與cls.__meta__相合,而生一Restate虛象,具正處理者並行、位次計數、不墜效應、執行之能。其無域知:
def derive_virtual_object(resource_cls: type[Resource]) -> restate.VirtualObject:
obj = restate.VirtualObject(resource_cls.__name__)
for action_name, action_meta in resource_cls.__actions__.items():
_attach_handler(obj, resource_cls, action_name, action_meta)
# If resource has add_message, inject queue management handlers
if "add_message" in resource_cls.__actions__:
_attach_queue_handlers(obj, resource_cls.__name__)
return obj
每生處理者:
- 自請求中提取權限/租戶
- 若需,復位次計數(以
add_message之式行动) - 呼
ctx.run(action_name, _run),_run启租户会话,据键载资源,召行法,需则存之 - 更位于Restate之位
- 以
_execute_effects
行所集诸效__JHSNS_SEG_8e68e3c3_124___run回调乃纯粹之函数,其内无重述之操作,亦无HTTP之调用。凡与Restate相涉者,皆发生于回调之外。
租户隔离乃结构之要义也
tenant_scoped = True触发三层之制。
第一层—柱体注入。 ResourceMeta注之tenant_id与server_default = current_setting('app.tenant_id', true)若应用之码遗忘设之,Postgres自会取会话变量而充之。
层二—会话变量。 tenant_session("tenant-abc")乃一情境管理之器,于任何查询之前,皆设SET LOCAL app.tenant_id = :tid于连接。
层三—RLS策略。Alembic迁移创CREATE POLICY tenant_isolation ON widgets USING (tenant_id = current_setting('app.tenant_id', true))。
其果:阙如。WHERE tenant_id = ? 之條於應用程式,返零行,非悉行。錯誤皆閉。無故洩跨租戶資料之法,由忘記濾鏡。
with tenant_session("tenant-abc") as db:
repo = SqlAlchemyRepository(db, Widget)
widgets = repo.list() # no WHERE clause — RLS filters automatically
皆寫為上插
SqlAlchemyRepository.save() 永遠發 INSERT ... ON CONFLICT DO UPDATE。非為風格之好——乃正確持久執行之必需也。
每值《重演》于崩殂后重演一记,则ctx.run()之回执复行。若但INSERT,则重演时生重键之谬。更置则一也。
于某些资,其冲突之的非主键。Message有天然之键。(thread_id, idempotency_key) — 同一脉络中,具同一名副不可易之钥之讯,实为同一讯也。经Restate之涤荡,作业流程重演,然讯息之ID新生,其副不可易之钥则仍旧。ON CONFLICT (id) DO UPDATE当得(新ID,无冲突)而遂,继而犯独占之制。所宜者,乃天然之钥,具DO NOTHING:
class Message(Resource):
class Meta:
conflict_columns = ("thread_id", "idempotency_key")
conflict_action = "nothing"
ResourceMeta 此之解析,仓库自用之。
初试而废者何
自定义字段类型。 旧版有 fields.py 之 CuidField、StringField、DateTimeField,复有独立者derive/orm.py者,自彼字段定义中发SQLAlchemy之模型。其重行SQLAlchemy,甚拙。二文件,约三百行,所供类型安全,反不如土生土长之Mapped[]。遂删二者。Resource今直承DeclarativeBase。
供内容哈希之恒等键。 欲自訊息內容之哈希以自動生成唯一性鑰,以除之於域API。患:二異發者可生同內。基於哈希之鑰將使異發者之訊息去重——誤義。復用呼者所供之鑰。
應用層租戶濾鏡。始於filter_by(tenant_id=...) 诸存库之法皆有之。移至 RLS。其异:应用之滤失则开(滤缺=悉行);RLS 滤失 SET LOCAL 则阖(滤缺=空会变量=零行)。一执之点,结构正然。
一单会话流程,每线程一。 尝使每线程为 Restate Workflow,存续会话于 Restate 之状态库。为 Restate SDK 缺陷所阻:WorkflowSharedContext.get/set 当处理输入为 Pydantic 模型时失灵——SDK 试图于账簿记录时序列化输入,遂以 not JSON serializable 失败。复为 Thread VirtualObject(为序消息)+ AgentRun Workflow(为执行智能体)。
教我以事之虫
SQLAlchemy之关系使记事膨胀
_serialize() 最初行遍全物之图,含关系之集合。一Thread 有二百消息,则每单一记事皆录其二百消息而尽add_message之召,致Pusher 413之误(负载过巨),使日记条目膨胀。其解:于_serialize()尽弃关系集合。处理器之返值,无须相关对象——非行动契约之部分也。
ON CONFLICT (id)于Restate清除后崩坏。既清Restate之状,工作流复演。若复演生新消息ID而同idempotency key,ON CONFLICT (id) DO UPDATE于新ID无冲突——插入遂行,复触UNIQUE(thread_id, idempotency_key)之限。其解:ON CONFLICT (thread_id, idempotency_key) DO NOTHING。若库已具此讯(实为真源),默然越之。
httpx.post于作业处理器中,易致僵锁。旧式码,书误文,乃呼之。httpx.post 自內於工作流處理器,至 Restate 入口。此致僵局:工作流行於 Restate 處理迴環,而同步呼叫回 Restate 入口則阻。_run_done 從未發動,線程佇列永阻。修:工作流處理器所有傳送皆經 ctx.generic_send — 火即忘,非阻,記錄。
Restate 清理後虛擬物件之死狀。既过,podman compose down -v则消,然Thread VirtualObject之active_run_id,犹存其旧态。_run_done永不发矣,盖流程已亡。无生机之检,则线程队列,每度清涤,皆阻绝无解。其解:_enqueue_run行前必唤ctx.workflow_call(status_fn, key=active_run_id),以审运行之真伪。若状非"running",澄明其境,即刻发之。
代理之层
此框架之要义,在于为领域驱动设计之器——凡乎 Python 之务,兼具恒久执行与关系数据库者,皆可资之。然于铁桥之特例,此框架实为代理执行之基。
代理之约,至简:
class BaseAgent(ABC):
@abstractmethod
async def run(self, ctx: AgentContext) -> None: ...
AgentContext包覆Restate之WorkflowContext,显领域级之素朴:持久步骤、史册攫取、讯息书写、人机交互之悬停。诸使无取自Restate或SQLAlchemy之事物。
class WeatherAgent(BaseAgent):
async def run(self, ctx: AgentContext) -> None:
history = await ctx.step("fetch_history", ctx.get_history)
response = await ctx.step("llm_call_0", lambda: call_llm(history))
ctx.write_message({"version": 1, "parts": [{"type": "text", "text": response}]})
agent_registry.register("weather", WeatherAgent)
每ctx.step()已录之。步骤间崩坏,重演自中断处续之。人环介入,HITL命名之许诺(代理暂止,线程现消息,人应,许诺解,代理复行)。取消乃每步前窥之恒久许诺——新用户消息至,则活动运行于次步界取消之。
此框架悉理之。代理唯领域之理也。
指导之则
其持之者逾四十五事:
- Postgres乃真源之所在。 重述日志之志,非其状也。凡读皆归Postgres。
- 租户隔离,本乎结构。 RLS于库层行之。缺滤则返零行。
-
域无基建之引。之效,乃以
ActionContext宣告之。基设施乃施之。 - 凡书皆插之。持久之执行,重演回调。幂等必结构,非可择。
-
之效,行于库书之后,持久。 PostgreSQL與Restate無共通之分布式交易管理器——非藉由二阶段提交或发箱模式,二者间之真正原子性不可得。此框架所为之事:效果于Restate處理器既返
ctx.run()後而发,此等发送亦由Restate記之。若於数据库写入与发送之间进程崩溃,Restate重演其处理器——ctx.run()复其缓存之果(无复书),并发火。DB之写乃真源;效达之保由Restate之重演。不一致之窗乃进程崩殂间。ctx.run()竟而次记《Restate》之文——此《Restate》本就设为终焉。
堆栈
Python,FastAPI,复述之 PostgreSQL (RLS), Alembic, Hypercorn (HTTP/2 必需于 Restate), Podman Compose.
框架层 (shared/framework/ 与 shared/derive/) 无领域之识。平台层 (platform/) 除经由框架外,无 Restate 之引。具体之代理与适配器 (services/) 亦然.
45 ADR 之数。docs/decisions.md 者欲深究,则可循此。













