LangGraph 워크플로우 템플릿 (v38)
Python 개발자를 위한 LangGraph 기반 AI 에이전트 워크플로우 템플릿
LangChain과 LangGraph를 사용한 Python 기반 AI 에이전트 개발을 위한 실전 가이드입니다. 이 템플릿은 실제 개발자들이 겪는 문제를 해결하기 위해 설계되었습니다.
1. LangGraph 아키텍처 개요
LangGraph는 상태 기반 워크플로우 시스템으로, 다음과 같은 핵심 구성 요소로 구성됩니다:
핵심 구성 요소:
- Nodes: 워크플로우의 단계
- Edges: 노드 간의 연결
- State: 워크플로우의 상태 관리
- Checkpointing: 상태 저장 및 복원
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
context: str
# 그래프 생성
workflow = StateGraph(GraphState)
2. 템플릿 1: 간단한 RAG 에이전트 (검색 → 생성 → 검증)
대부분의 LLM 응용 프로그램은 RAG(검색 기반 생성) 패턴을 기반으로 합니다:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
class RAGAgent:
def __init__(self, llm):
self.llm = llm
self.retriever = self._setup_retriever()
self.prompt = PromptTemplate.from_template(
"주어진 컨텍스트를 사용하여 질문에 답하세요:\n\n{context}\n\n질문: {question}"
)
def retrieve(self, state):
question = state["messages"][-1].content
docs = self.retriever.invoke(question)
context = "\n\n".join([doc.page_content for doc in docs])
return {"context": context}
def generate(self, state):
chain = self.prompt | self.llm | StrOutputParser()
result = chain.invoke({
"question": state["messages"][-1].content,
"context": state["context"]
})
return {"messages": [result]}
def validate(self, state):
# 간단한 검증 로직
if len(state["messages"][-1].content) < 10:
return {"messages": ["질문에 대한 충분한 정보가 없습니다."]}
return {"messages": [state["messages"][-1].content]}
# 워크플로우 정의
def create_rag_workflow():
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", RAGAgent.retrieve)
workflow.add_node("generate", RAGAgent.generate)
workflow.add_node("validate", RAGAgent.validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
return workflow.compile()
3. 템플릿 2: 다중 도구 에이전트 (계획 → 실행 → 관찰 → 결정)
복잡한 작업을 처리하는 다중 도구 에이전트:
from langchain.tools import Tool
from langchain_core.messages import ToolMessage
import json
class MultiToolAgent:
def __init__(self):
# 예시 도구들
self.tools = [
Tool(
name="weather",
func=lambda location: f"{location}의 날씨는 맑습니다.",
description="날씨 정보 조회"
),
Tool(
name="calculator",
func=lambda expression: f"결과: {eval(expression)}",
description="수학 계산"
)
]
def plan(self, state):
# 작업 계획 생성
plan = {
"tasks": [
{"name": "weather", "params": {"location": "서울"}},
{"name": "calculator", "params": {"expression": "2+2"}}
]
}
return {"plan": plan}
def execute(self, state):
# 도구 실행
results = []
for task in state["plan"]["tasks"]:
tool = next(t for t in self.tools if t.name == task["name"])
result = tool.func(**task["params"])
results.append(result)
return {"execution_results": results}
def observe(self, state):
# 결과 관찰 및 분석
return {"analysis": "모든 작업이 완료되었습니다."}
def decide(self, state):
# 결정 로직
if len(state["execution_results"]) > 0:
return {"messages": ["작업이 성공적으로 완료되었습니다."]}
return {"messages": ["작업 실패."]}
4. 템플릿 3: 인간-중개 워크플로우 (일시정지 → 검토 → 계속)
사람의 개입이 필요한 중요한 결정을 위한 워크플로우:
class HumanInLoopAgent:
def __init__(self):
self.user_feedback = None
def pause_for_review(self, state):
# 사용자 검토를 위한 일시정지
return {"status": "pending_review"}
def review(self, state):
# 사용자 검토 처리
if self.user_feedback is not None:
return {"messages": [self.user_feedback]}
return {"messages": ["사용자 검토가 필요합니다."]}
def continue_workflow(self, state):
# 워크플로우 재개
return {"status": "processing"}
# 사용자 피드백 설정
def set_user_feedback(feedback):
agent = HumanInLoopAgent()
agent.user_feedback = feedback
return agent
5. 템플릿 5: 병렬 실행 에이전트 (분기 → 처리 → 집계)
대량 데이터를 병렬로 처리:
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgent:
def __init__(self, max_workers=4):
self.max_workers = max_workers
def fan_out(self, state):
# 입력 데이터 분기
data_chunks = [
{"chunk_id": i, "data": f"chunk_{i}"}
for i in range(4)
]
return {"chunks": data_chunks}
def process_chunk(self, chunk_data):
# 개별 청크 처리
time.sleep(0.1) # 시뮬레이션
return {"result": f"processed_{chunk_data['data']}"}
async def parallel_process(self, state):
# 병렬 처리
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(self.process_chunk, chunk)
for chunk in state["chunks"]
]
results = [future.result() for future in futures]
return {"processed_results": results}
def aggregate(self, state):
# 결과 집계
aggregated = {
"total_count": len(state["processed_results"]),
"results": [r["result"] for r in state["processed_results"]]
}
return {"messages": [f"총 {aggregated['total_count']}개 처리 완료"]}
6. 상태 관리 패턴
효율적인 상태 관리:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint import BaseCheckpointSaver
class StateManager:
def __init__(self):
# 메모리 체크포인트 사용
self.checkpointer = MemorySaver()
def get_state(self, thread_id):
# 상태 조회
return self.checkpointer.get(thread_id)
def update_state(self, thread_id, new_state):
# 상태 업데이트
self.checkpointer.put(thread_id, new_state)
def rollback(self, thread_id, version):
# 이전 버전으로 롤백
pass
# 상태 저장 설정
def setup_workflow_with_checkpoint():
workflow = StateGraph(GraphState)
workflow.add_checkpoint(MemorySaver())
return workflow
7. 스트리밍 및 실시간 업데이트
실시간 처리 지원:
python
from langchain_core.callbacks import BaseCallbackHandler
class StreamingHandler(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
print("체인 시작")
def on_chain_end(self, outputs, **kwargs):
print("체인 종료")
def on_llm_new_token(self, token, **kwargs):
print(f"토큰: {token}", end="", flush=True)
class StreamingAgent:
def __init__(self):
self.streaming_handler = StreamingHandler()
def stream_response(self, query):
# 스트리밍 응답
llm = ChatOpenAI(callbacks=[self.streaming_handler])
response = llm.invoke(query)
return response
# 사용 예시
async def stream_example():
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)





















