























1
I’m having an issue with LangGraph event streaming. My tool uses get_stream_writer to send events to the frontend. Sometimes, the event stream works and the client receives all events. But other times, the stream stalls (client stops getting events) even though the backend finishes fine.
For context, the async worker called inside the tool can run anywhere from 1 to 10 minutes in real scenarios. During this time, I emit status events via get_stream_writer before and after the worker runs.
Are there any known pitfalls, timeouts, or best practices I should follow to ensure reliable streaming with get_stream_writer inside async tools—especially when awaiting a long-running worker? Is there something in my flow or the LangGraph event stream mechanism I should be aware of for long-running tasks?
Here’s a minimal code skeleton of what I’m doing:
import asyncio
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
# --- TOOL: Async tool that streams progress updates to the client ---
@tool
async def generate_report_tool(report_id: str, user_id: str, sections: list[str], **kwargs) -> str:
writer = get_stream_writer() # Used to send events to the frontend
# Send an initial status event
writer({"status": f"Starting report generation for: {sections}"})
# Simulate a long-running async worker (1-10 minutes).
# In production, the worker should emit progress events every 5-10 seconds using the writer.
worker = ReportingWorker(
options=ReportOptions(report_id=report_id, user_id=user_id, selected_sections=sections),
broadcast=writer # Pass the writer for emitting events!
)
report_data = await worker.run()
# Send a completion status event
writer({"status": "completed", "output": report_data})
# Return result as JSON string for downstream nodes
return json.dumps(report_data)
# --- NODE: Calls the report generation tool and returns its result ---
async def tools_node(state, config=None):
tool_args = {
"report_id": "sample_report",
"user_id": "user42",
"sections": ["introduction", "analysis", "conclusion"],
}
result = await generate_report_tool.ainvoke(tool_args)
return {"messages": [result]}
# --- GRAPH: Minimal StateGraph setup ---
def build_graph():
graph = StateGraph(State)
graph.add_node("tools", tools_node)
graph.add_edge(START, "tools")
return graph.compile()
P.S: It most of the time happens in the production server in the Langraph Platform Cloud.
Thank you!
wfh 2
Hello! Apologies for the delayed response - I don’t monitor the OSS section as closely.
If anyone experiences this in the future, please speak up! We’ve made a lot of improvements in the past two months to the server’s redis connection management that has fixed similar issues. It’s likely that those same improvements would also fix this.
If you anyone else runs into this, please write into our support email and we will try to get things squared away.
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。