












主述诸步之戏法,其要在于调度
之循环。盖llama-agents者,本于事件,其核乃事件之辨析。
class LoopingWorkflow(Workflow):
@step
async def prepare_input(self, ev: StartEvent) -> LoopEvent:
num_loops = random.randint(0, 10)
return LoopEvent(num_loops=num_loops)
@step
async def loop_step(self, ev: LoopEvent) -> LoopEvent | StopEvent:
if ev.num_loops <= 0:
return StopEvent(result="Done looping!")
return LoopEvent(num_loops=ev.num_loops-1)
class BranchWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
if random.randint(0, 1) == 0:
print("Go to branch A")
return BranchA1Event(payload="Branch A")
else:
print("Go to branch B")
return BranchB1Event(payload="Branch B")
@step
async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
print(ev.payload)
return BranchA2Event(payload=ev.payload)
@step
async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
print(ev.payload)
return BranchB2Event(payload=ev.payload)
@step
async def step_a2(self, ev: BranchA2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch A complete.")
@step
async def step_b2(self, ev: BranchB2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch B complete.")
并行而施。然默认执行之序不定,若欲察其果,须藉事件之聚敛或待其理。
class ParallelFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ev: StepTwoEvent) -> StopEvent:
print("Running slow query ", ev.query)
await asyncio.sleep(random.randint(0, 5))
return StopEvent(result=ev.query)
class ConcurrentFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
print("Running query ", ev.query)
await asyncio.sleep(random.randint(1, 5))
return StepThreeEvent(result=ev.query)
@step
async def step_three(
self, ctx: Context, ev: StepThreeEvent
) -> StopEvent | None:
# wait until we receive 3 events
result = ctx.collect_events(ev, [StepThreeEvent] * 3)
if result is None:
return None
# do something with all 3 results together
print(result)
return StopEvent(result="Done")
class ConcurrentFlow(Workflow):
@step
async def start(
self, ctx: Context, ev: StartEvent
) -> StepAEvent | StepBEvent | StepCEvent | None:
ctx.send_event(StepAEvent(query="Query 1"))
ctx.send_event(StepBEvent(query="Query 2"))
ctx.send_event(StepCEvent(query="Query 3"))
@step
async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
print("Doing something A-ish")
return StepACompleteEvent(result=ev.query)
@step
async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
print("Doing something B-ish")
return StepBCompleteEvent(result=ev.query)
@step
async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
print("Doing something C-ish")
return StepCCompleteEvent(result=ev.query)
@step
async def step_three(
self,
ctx: Context,
ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
) -> StopEvent:
print("Received event ", ev.result)
# wait until we receive 3 events
if (
ctx.collect_events(
ev,
[StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
)
is None
):
return None
# do something with all 3 results together
return StopEvent(result="Done")
欲谙lama-agents诸步之施法,于善用此架者,实裨益良多。今此部官方文牍颇备,可潜心研习焉。
https://developers.llamaindex.ai/python/llamaagents/workflows/branches_and_loops/
https://developers.llamaindex.ai/python/llamaagents/workflows/concurrent_execution/
https://developers.llamaindex.ai/python/llamaagents/workflows/unbound_functions/
https://developers.llamaindex.ai/python/llamaagents/workflows/durable_workflows/
此內容由慣性聚合(RSS閱讀器)自動聚合整理,僅供閱讀參考。 原文來自 — 版權歸原作者所有。