慣性聚合 高效追讀感興趣之博客、新聞、科技資訊
閱原文 以慣性聚合開啟

推薦訂閱源

博客园 - 司徒正美
V
V2EX
T
Tailwind CSS Blog
有赞技术团队
有赞技术团队
aimingoo的专栏
aimingoo的专栏
Apple Machine Learning Research
Apple Machine Learning Research
IT之家
IT之家
Blog — PlanetScale
Blog — PlanetScale
A
About on SuperTechFans
月光博客
月光博客
T
The Blog of Author Tim Ferriss
宝玉的分享
宝玉的分享
Martin Fowler
Martin Fowler
博客园 - 聂微东
The GitHub Blog
The GitHub Blog
V
Visual Studio Blog
WordPress大学
WordPress大学
酷 壳 – CoolShell
酷 壳 – CoolShell
Engineering at Meta
Engineering at Meta
GbyAI
GbyAI

博客园 - 荣锋亮

java nats request-many 支持的一些模式 java nats RequestMany context7 面向llm 以及ai代码编辑器的依赖包更新平台 NATS Agents Protocol nats 团队提出的agent 通信协议 metamcp mcp聚合&调度工具 llama-agents 的server 服务 mcp-proxy 桥接streamable http 以及stdio mcp 的工具 GhidraMCP Ghidra 的mcp服务 mcpcute 聚合mcp的工具 agent-tool-protocol 与mcp的集成 - 荣锋亮 llama-agents 执行流程图查看 llama-agents 的异常处理 通过dbos 解决llama-agents workflow 持久执行问题 llama-agents workflow 持久化运行的一些玩法 llama-agents 状态管理 llama-agents stream events 处理 zerofs v1.1.1 aarch64 支持16k-64k page llmaindex workflows-py 变更为了llama-agents agent-tool-protocol 代码优先的agent tool 协议 ZeroFS 1.1.0 发布 chicory 原生jvm webassembly runtime quickjs4j 包含的几个模块 quickjs4j java 中安全运行js 的沙箱 supergateway 将stdio server 暴露为sse服务的ai gateway nats-server 2.14.0 发布 nuitka python 模块转so 模块 - 荣锋亮 llama-agents 的resource对象 turndown html 转markdown npm包 @langchain/node-vfs 简单试用 @langchain/node-vfs node vfs web-agent firecrawl 开源的web 数据agent CubeSandbox 腾讯开源的轻量级面向ai agent的的沙箱工具 new-api 统一ai gateway 平台 pyinstrument python 调用连profiler 工具 jsonhero Trigger 团队开可以友好查看json的工具 aiproxy 高性能ai gateway just-bash 面向agent 的bash restate 构建可靠应用的平台 java 一些raft 实现框架 opendataloader-pdf 开源pdf自动处理工具 sbproxy单一文件的ai gateway 以及反向代理服务 hyperframes agent 友好的通过html 的视频处理框架 open-semantic开放语义层 agentregistry 开源中心化可信可治理的ai registry py-spy 一些新特性 zerofs 1.0.9 包含了webui nanobot 基于mcp 构建agent的框架 gh-aw-firewall github agent workflow 防火墙 dbos-inc agent-skills dbos 的agent skills robustmq ai 领域的通信设施
llama-agents 之 step 执行,其模式有数:
荣锋亮 · 2026-05-24 · via 博客园 - 荣锋亮

主述诸步之戏法,其要在于调度

之分途及

之循环。盖llama-agents者,本于事件,其核乃事件之辨析。

  • 循环之戏法,
class LoopingWorkflow(Workflow):
    @step
    async def prepare_input(self, ev: StartEvent) -> LoopEvent:
        num_loops = random.randint(0, 10)
        return LoopEvent(num_loops=num_loops)

    @step
    async def loop_step(self, ev: LoopEvent) -> LoopEvent | StopEvent:
        if ev.num_loops <= 0:
            return StopEvent(result="Done looping!")

        return LoopEvent(num_loops=ev.num_loops-1)
  • 分支之戏法,
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

并行之戏法,

并行而施。然默认执行之序不定,若欲察其果,须藉事件之聚敛或待其理。

  • 并行,
class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(0, 5))

        return StopEvent(result=ev.query)
  • 待其果,
class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(
        self, ctx: Context, ev: StepThreeEvent
    ) -> StopEvent | None:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")
  • 异类之待,
class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent | None:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")

其说。

欲谙lama-agents诸步之施法,于善用此架者,实裨益良多。今此部官方文牍颇备,可潜心研习焉。

参详资料

https://developers.llamaindex.ai/python/llamaagents/workflows/branches_and_loops/

https://developers.llamaindex.ai/python/llamaagents/workflows/concurrent_execution/

https://developers.llamaindex.ai/python/llamaagents/workflows/unbound_functions/

https://developers.llamaindex.ai/python/llamaagents/workflows/durable_workflows/