
























基于 demo_basic 实战代码
前置要求:Python 3.10+、uv、API Key(MiniMax 硅基流动)、Ollama 本地服务
官方文档:https://python.langchain.com/docs/
直接调用模型 API 只能解决"单次问答",但真实业务要的是"系统能力":提示词管理、记忆、检索(RAG)、工具调用、多步编排和可观测性。LangChain 的价值,是把这些应用层能力标准化、组件化。
LangChain 是一个 LLM 应用编排框架,核心是把不同能力拼成可维护的链路:
ChatOpenAI / ChatOllama 负责推理与生成先从最小闭环开始:Prompt -> LLM -> Parser。
再往上叠加:Memory 解决上下文,Retriever 解决知识,Tool 和 Agent 解决外部能力,Callback 解决可观测性。
最后把这些组件组合成真正可维护的应用链路。
[用户输入]
↓
[Prompt / LCEL]
↓
[LLM 推理]
↓
[Output Parser]
↓
[结果输出]
旁路能力:
Memory ──► Prompt
Retriever ──► Prompt
Tool / Agent ◄──► LLM
Callback / Streaming ──► 全链路观测
点击下表里的 Demo 名称可直接跳转到对应章节;每个 Demo 标题右侧的
↑可以返回这里。
| Demo | 主题 | 对应文件 |
|---|---|---|
prompt | llm | parser)完成主题文案生成,并演示单次调用、模板渲染和批量调用。ChatPromptTemplate + llm + StrOutputParser\| 原理.invoke() / .batch() 调用方式| 知识模块 | 内容 |
|---|---|
# 文件:demo01_lcel_basics.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os
load_dotenv()
# 使用 MiniMax 模型(硅基流动)
llm = ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
request_timeout=60,
max_retries=2,
)
# Prompt 模板
prompt = ChatPromptTemplate.from_template(
"你是一位{profession}专家。请用3句话介绍{topic},最后加一个彩蛋笑话。"
).partial(profession="技术写作")
# LCEL 管道
chain = prompt | llm | StrOutputParser()
# 单次调用
result = chain.invoke({"topic": "LangChain 框架"})
print("=== 单次调用 ===")
print(result)
# 渲染后的 Prompt
rendered = prompt.format(topic="Python 装饰器")
print("=== 渲染后的 Prompt ===")
print(rendered)
# 批量调用
results = chain.batch([
{"topic": "量子计算"},
{"topic": "区块链"},
{"topic": "神经网络"},
])
print("=== 批量调用 ===")
for r in results:
print(r)

| 知识模块 | 内容 |
|---|---|
# 文件:demo02_prompt_template.py
from langchain_core.prompts import (
ChatPromptTemplate,
PromptTemplate,
MessagesPlaceholder,
HumanMessagePromptTemplate,
SystemMessagePromptTemplate
)
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os
load_dotenv()
llm = ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
)
# PipelinePrompt 替代方案(更简洁)
introduction = PromptTemplate.from_template("你是 {character},一个乐于助人的 AI 助手。")
main_template = "{introduction}\n\n用户问:{question}\n\n你的回答:"
def create_pipeline_prompt(character, question):
intro = introduction.format(character=character)
return main_template.format(introduction=intro, question=question)
final = create_pipeline_prompt("LangChain 助手", "什么是 RAG?")
print("PipelinePrompt 结果:", final)
response = llm.invoke(final)
# response 是 AIMessage 对象
answer = response.content
token_usage = response.response_metadata["token_usage"]
model_name = response.response_metadata["model_name"]
finish_reason = response.response_metadata["finish_reason"]
print("AI 回复:", answer)
print("Token用量:", token_usage)
print("模型名称:", model_name)
print("结束原因:", finish_reason)

get_buffer_string 的用法| 知识模块 | 内容 |
|---|---|
# 文件:demo03_memory.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (
ChatPromptTemplate,
MessagesPlaceholder,
HumanMessagePromptTemplate,
)
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, get_buffer_string
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
import os
load_dotenv()
llm = ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
)
class WindowedMemory:
"""只保留最近 k 条消息的窗口记忆"""
def __init__(self, k: int = 6):
self.k = k
self.messages = []
def add_user_message(self, text: str):
self.messages.append(HumanMessage(content=text))
if len(self.messages) > self.k * 2:
self.messages = self.messages[-self.k * 2:]
def add_ai_message(self, text: str):
self.messages.append(AIMessage(content=text))
if len(self.messages) > self.k * 2:
self.messages = self.messages[-self.k * 2:]
def get_messages(self) -> list:
return self.messages[-self.k:]
class SummaryMemory:
"""AI 摘要记忆:对话太长时自动压缩"""
def __init__(self, llm, max_history: int = 20):
self.llm = llm
self.max_history = max_history
self.summary = ""
self.current_messages = []
def add_user_message(self, text: str):
self.current_messages.append(HumanMessage(content=text))
def add_ai_message(self, text: str):
self.current_messages.append(AIMessage(content=text))
def get_context(self) -> str:
recent = get_buffer_string(self.current_messages[-self.max_history:])
if self.summary:
return f"【历史摘要】{self.summary}\n\n【最近对话】\n{recent}"
return recent
def should_summarize(self) -> bool:
return len(self.current_messages) >= self.max_history * 2
def summarize(self):
summary_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="请把以下对话压缩成一段简短摘要,保留关键信息:"),
HumanMessagePromptTemplate.from_template("{messages}"),
])
self.summary = (
summary_prompt | self.llm | StrOutputParser()
).invoke({"messages": get_buffer_string(self.current_messages)})
self.current_messages = []
# 去掉 <think> 标签的辅助函数
def extract_plain_text(summary_with_thinking: str) -> str:
import re
text = re.sub(r'<think>.*?', '', summary_with_thinking, flags=re.DOTALL)
return text.strip()
# 示例
print("\n===== WindowedMemory 示例 =====")
w_memory = WindowedMemory(k=2)
w_memory.add_user_message("你好!")
w_memory.add_ai_message("你好,有什么可以帮你的?")
w_memory.add_user_message("我想了解 LangChain")
w_memory.add_ai_message("LangChain 是一个构建 LLM 应用的框架。")
print("只保留最近 2 条:", w_memory.get_messages())
print("\n===== SummaryMemory 示例 =====")
s_memory = SummaryMemory(llm, max_history=3)
s_memory.add_user_message("我想计划下个月去日本东京旅游")
s_memory.add_ai_message("东京旅游很棒!必去景点包括:浅草寺、东京塔...")
s_memory.add_user_message("签证怎么办理?")
s_memory.add_ai_message("日本旅游签证需要通过旅行社代办...")
s_memory.add_user_message("机票和酒店有推荐吗?")
s_memory.add_ai_message("机票建议提前2个月订,东京往返含税约2500-4500元...")
print(f"消息数: {len(s_memory.current_messages)}, 需摘要: {s_memory.should_summarize()}")
s_memory.summarize()
plain_summary = extract_plain_text(s_memory.summary)
print(f"压缩后摘要: {plain_summary}")

<think> 标签| 知识模块 | 内容 |
|---|---|
# 文件:demo04_output_parser.py
### 代码整体功能
- 通过清洗函数 + 输出解析器,把模型文本稳定转换为 JSON、Pydantic 对象和列表结构。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.messages import SystemMessage
from langchain_core.output_parsers import (
JsonOutputParser,
PydanticOutputParser,
CommaSeparatedListOutputParser,
)
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel, Field, field_validator
import re
from dotenv import load_dotenv
import os
load_dotenv()
def build_llm() -> ChatOpenAI:
return ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.0,
max_tokens=1000,
timeout=60,
max_retries=2,
)
# 关键:处理 MiniMax 输出的 <think> 标签
def clean_json_output(input_str: str) -> str:
if hasattr(input_str, "content"):
input_str = input_str.content
# 去掉 <think>...
pattern = r"<think>.*?"
text = re.sub(pattern, "", str(input_str), flags=re.DOTALL)
return text.strip()
def clean_list_output(input_str: str) -> str:
text = clean_json_output(input_str)
return text.replace(",", ",").replace("、", ",")
def run_json_demo(llm: ChatOpenAI) -> None:
json_parser = JsonOutputParser()
json_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="你是一个数据提取助手。只返回合法 JSON,不要解释。"),
HumanMessagePromptTemplate.from_template(
"从以下文本中提取信息,以 JSON 格式返回:\n{text}\n\n格式要求:\n{format}"
),
])
chain_json = json_prompt | llm | RunnableLambda(clean_json_output) | json_parser
result = chain_json.invoke({
"text": "布鲁斯,37岁,来自深圳,是一名技术合伙人,擅长 Python 和 Java。",
"format": json_parser.get_format_instructions(),
})
print("JSON 结果:", result)
# Pydantic 模型
class PersonInfo(BaseModel):
name: str = Field(description="人物姓名")
age: int = Field(description="人物年龄(必须是整数)")
city: str = Field(description="所在城市")
skills: list[str] = Field(default_factory=list, description="掌握的技能列表")
@field_validator("age")
@classmethod
def age_must_be_positive(cls, v: int) -> int:
if v <= 0 or v > 150:
raise ValueError(f"年龄 {v} 不合理!")
return v
def run_pydantic_demo(llm: ChatOpenAI) -> None:
pydantic_parser = PydanticOutputParser(pydantic_object=PersonInfo)
pydantic_prompt = (
ChatPromptTemplate.from_messages([
SystemMessage(content="你是一个数据提取助手。只返回符合格式要求的 JSON 本身。"),
HumanMessagePromptTemplate.from_template("从以下文本中提取信息:\n{text}\n\n{format}"),
])
.partial(format=pydantic_parser.get_format_instructions())
)
chain_pydantic = pydantic_prompt | llm | RunnableLambda(clean_json_output) | pydantic_parser
person: PersonInfo = chain_pydantic.invoke({
"text": "布鲁斯,37岁,深圳技术合伙人,擅长Node Python、Java、Go。"
})
print(f"姓名:{person.name},年龄:{person.age},城市:{person.city},技能:{person.skills}")
def run_list_demo(llm: ChatOpenAI) -> None:
list_parser = CommaSeparatedListOutputParser()
list_prompt = (
ChatPromptTemplate.from_messages([
SystemMessage(content="你是一个只输出逗号分隔列表的助手。使用英文逗号。"),
HumanMessagePromptTemplate.from_template("列出 {subject} 的 {n} 个优点。\n\n{format}"),
])
.partial(format=list_parser.get_format_instructions())
)
list_chain = list_prompt | llm | RunnableLambda(clean_list_output) | list_parser
result = list_chain.invoke({"subject": "Python", "n": 5})
print("列表结果:", result)
def main() -> None:
llm = build_llm()
# run_json_demo(llm)
# run_pydantic_demo(llm)
run_list_demo(llm)
if __name__ == "__main__":
main()

| 知识模块 | 内容 |
|---|---|
# 文件:demo05_rag_ingest.py
from pathlib import Path
import os
import requests
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader, PyPDFLoader, WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings
load_dotenv()
def build_embeddings() -> OllamaEmbeddings:
"""使用本地 Ollama embedding 模型"""
model = os.getenv("OLLAMA_EMBED_MODEL", "qwen3-embedding:0.6b")
base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
# 检查 Ollama 服务
health_url = f"{base_url.rstrip('/')}/api/tags"
try:
requests.get(health_url, timeout=3)
except requests.RequestException as exc:
raise RuntimeError(f"Ollama 服务不可用,请先启动 ollama") from exc
return OllamaEmbeddings(model=model, base_url=base_url)
def load_documents() -> list[Document]:
"""读取本地文本 / PDF / 网页"""
# 文本
text_path = Path("essay.txt")
if text_path.exists():
text_docs = TextLoader(str(text_path), encoding="utf-8").load()
else:
text_docs = [Document(
page_content="LangChain 是一个用于构建 LLM 应用的框架。",
metadata={"source": "inline:essay.txt"},
)]
print("essay.txt 不存在,已使用内置示例文本代替。")
# PDF
pdf_path = Path("paper.pdf")
if pdf_path.exists():
pdf_docs = PyPDFLoader(str(pdf_path)).load()
else:
pdf_docs = []
print("paper.pdf 不存在,已跳过 PDF 读取。")
# 网页
web_url = "https://python.langchain.com/docs/introduction"
try:
web_docs = WebBaseLoader(web_url).load()
except Exception as exc:
web_docs = []
print(f"网页读取失败,已跳过:{exc}")
return text_docs + pdf_docs + web_docs
def split_documents(documents: list[Document]) -> list[Document]:
"""切块"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
length_function=len,
add_start_index=True,
)
chunks = text_splitter.split_documents(documents)
print(f"切块数: {len(chunks)}")
return chunks
def build_vectorstore(chunks: list[Document], embeddings: OllamaEmbeddings) -> Chroma:
"""构建向量库"""
return Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db",
)
def run_queries(vectorstore: Chroma) -> None:
"""检索测试"""
# 相似度检索
query = "LangChain 的核心概念是什么?"
docs = vectorstore.similarity_search(query=query, k=3)
for i, doc in enumerate(docs, 1):
print(f"[{i}] {doc.page_content[:150]}")
# 带分数的检索
docs_with_scores = vectorstore.similarity_search_with_score(query=query, k=3)
for doc, score in docs_with_scores:
print(f" [分数:{score:.4f}] {doc.page_content[:100]}")
# Retriever
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
retrieved_docs = retriever.invoke("LangChain 是什么?")
print(f"Retriever 返回 {len(retrieved_docs)} 个文档块")
def main() -> None:
embeddings = build_embeddings()
documents = load_documents()
for doc in documents:
print(doc.metadata)
chunks = split_documents(documents)
for i, chunk in enumerate(chunks, 1):
print(f"Chunk {i}: 长度={chunk.page_content.__len__()}, 元数据={chunk.metadata}")
vectorstore = build_vectorstore(chunks, embeddings)
run_queries(vectorstore)
if __name__ == "__main__":
main()

| 知识模块 | 内容 |
|---|---|
# 文件:demo06_rag_chain.py
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.messages import SystemMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.messages import HumanMessage, AIMessage, get_buffer_string
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os
load_dotenv()
# 使用 MiniMax 作为 LLM
llm = ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
timeout=60,
max_retries=2,
)
# 使用 Ollama 作为 Embedding
embeddings = OllamaEmbeddings(
model="qwen3-embedding:0.6b",
base_url="http://localhost:11434",
)
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
def format_docs(docs):
return "\n\n".join(f"[来源{i+1}] {d.page_content}" for i, d in enumerate(docs))
# RAG Chain
rag_chain = (
{"question": RunnablePassthrough(), "context": retriever | format_docs}
| ChatPromptTemplate.from_messages([
SystemMessage(content=(
"你是知识渊博的助手,基于文档片段回答。"
"如果文档中没有答案,说「没有找到相关信息」,不要编造。"
)),
HumanMessagePromptTemplate.from_template("文档内容:\n{context}\n\n问题:{question}"),
])
| llm
| StrOutputParser()
)
result = rag_chain.invoke("LangChain 是什么?有哪些核心概念?")
print("RAG 结果:", result[:200])
# 多轮 RAG(合并历史问题)
chat_history = [
HumanMessage(content="LangChain 支持哪些模型?"),
AIMessage(content="LangChain 支持 OpenAI、Anthropic、Google Gemini、HuggingFace 等主流 LLM。"),
]
condense_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="把对话历史和最新问题合并成一个独立的问题。"),
HumanMessagePromptTemplate.from_template("历史:{chat_history}\n\n最新问题:{question}"),
])
combined_query = (
condense_prompt | llm | StrOutputParser()
).invoke({
"chat_history": get_buffer_string(chat_history),
"question": "那 embedding 模型呢?",
})
print("合并检索词:", combined_query)
docs = retriever.invoke(combined_query)
final_chain = (
ChatPromptTemplate.from_messages([
SystemMessage(content="基于文档回答问题。"),
HumanMessagePromptTemplate.from_template("文档:\n{context}\n\n问题:{question}"),
])
| llm
| StrOutputParser()
)
answer = final_chain.invoke({"context": format_docs(docs), "question": combined_query})
print("最终答案:", answer)

| 知识模块 | 内容 |
|---|---|
# 文件:demo07_tools_agent.py
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from dotenv import load_dotenv
import os
load_dotenv()
llm = ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
timeout=60,
max_retries=2,
)
@tool
def multiply(a: int, b: int) -> int:
"""计算两个整数相乘"""
return a * b
@tool
def get_weather(city: str) -> str:
"""查询城市天气"""
weather_db = {"深圳": "晴天28C", "北京": "多云22C", "上海": "小雨18C"}
return weather_db.get(city, f"暂无{city}数据")
@tool
def search_web(query: str) -> str:
"""在互联网上搜索信息"""
return f"关于「{query}」的搜索结果(模拟):来自 Wikipedia、知乎..."
def main() -> None:
# 创建 Agent
agent = create_agent(
model=llm,
tools=[multiply, get_weather, search_web],
system_prompt=(
"你是一个工具调用助手。"
"遇到计算就用 multiply,遇到天气就用 get_weather,"
"遇到搜索就用 search_web。"
),
)
result = agent.invoke({
"messages": [{
"role": "user",
"content": "深圳天气怎么样?然后把37和42相乘。搜索下今日知乎热点",
}]
})
print("最终输出:", result["messages"][-1].content)
if __name__ == "__main__":
main()

| 知识模块 | 内容 |
|---|---|
# 文件:demo08_callbacks.py
import logging
import os
from typing import Any, Dict, List
from dotenv import load_dotenv
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.output_parsers import StrOutputParser
from langchain_core.outputs import LLMResult
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def build_llm(*, streaming: bool = False) -> ChatOpenAI:
return ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
streaming=streaming,
timeout=60,
max_retries=2,
)
class MyCallbackHandler(BaseCallbackHandler):
def on_chain_start(self, serialized: Dict, inputs: Dict, **kwargs) -> None:
name = serialized.get("name") if isinstance(serialized, dict) else str(serialized)
logger.info(f"[Chain 开始] name={name}")
def on_chain_end(self, outputs: Dict, **kwargs) -> None:
logger.info(f"[Chain 结束] {str(outputs)[:50]}")
def on_chain_error(self, error: Exception, **kwargs) -> None:
logger.error(f"[Chain 错误] {error}")
def on_llm_start(self, serialized: Dict, prompts: List[str], **kwargs) -> None:
logger.info(f"[LLM 开始] {str(prompts)[:80]}...")
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
gen = response.generations[0][0]
text = getattr(gen, "text", "") or getattr(getattr(gen, "message", None), "content", "")
logger.info(f"[LLM 结束] {text[:50]}...")
if response.llm_output and "token_usage" in response.llm_output:
usage = response.llm_output["token_usage"]
logger.info(f"[Token] total={usage.get('total_tokens', 'N/A')}")
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(token, end="", flush=True)
class StreamingCallbackHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(token, end="", flush=True)
def main() -> None:
prompt = ChatPromptTemplate.from_template("用三句话讲一个关于 {topic} 的笑话")
# 流式调用
streaming_llm = build_llm(streaming=True)
streaming_chain = prompt | streaming_llm | StrOutputParser()
streaming_handler = StreamingCallbackHandler()
print("\n流式输出:")
streaming_chunks = []
for chunk in streaming_chain.stream(
{"topic": "Python 编程"},
config={"callbacks": [streaming_handler]},
):
streaming_chunks.append(chunk)
streaming_result = "".join(streaming_chunks)
print("\n最终流式结果:", streaming_result)
if __name__ == "__main__":
main()

RunnablePassthrough / Parallel / Branch / Lambda,实现透传、并行、分流和后处理的复合链路。| 知识模块 | 内容 |
|---|---|
# 文件:demo09_runnable_advanced.py
from dotenv import load_dotenv
from langchain_core.messages import SystemMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.runnables import (
RunnableBranch,
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
)
from langchain_openai import ChatOpenAI
import os
load_dotenv()
def build_llm() -> ChatOpenAI:
return ChatOpenAI(
model="MiniMax-M2.7",
base_url="https://api.minimaxi.com/v1",
api_key=os.getenv("MINIMAX_API_KEY"),
temperature=0.7,
max_tokens=1000,
timeout=60,
max_retries=2,
)
def run_passthrough_demo() -> None:
"""RunnablePassthrough:原样透传输入"""
passthrough_demo = RunnablePassthrough()
print("透传结果:", passthrough_demo.invoke({"topic": "LangChain"}))
def run_parallel_demo(llm: ChatOpenAI) -> None:
"""RunnableParallel:并行执行多任务"""
parallel_chain = RunnableParallel({
"openai_answer": (
ChatPromptTemplate.from_template("{topic} 是什么?用一句话回答。")
| llm
| StrOutputParser()
),
"word_count": RunnableLambda(
lambda x: f"「{x['topic']}」字符数:{len(x['topic'])}"
),
})
result = parallel_chain.invoke({"topic": "LangChain"})
print("并行结果:", result)
def is_code(x: dict) -> bool:
return "def " in x["input"] or "class " in x["input"]
def is_math(x: dict) -> bool:
return any(k in x["input"] for k in ["+", "-", "*", "/"])
def run_branch_demo() -> None:
"""RunnableBranch:条件分支"""
branch_chain = RunnableBranch(
(RunnableLambda(is_code), RunnableLambda(lambda x: f"[代码] {x['input']}")),
(RunnableLambda(is_math), RunnableLambda(lambda x: f"[数学] {x['input']}")),
RunnableLambda(lambda x: f"[普通] {x['input']}"),
)
print(branch_chain.invoke({"input": "def hello(): pass"}))
print(branch_chain.invoke({"input": "1 + 2 = 3"}))
print(branch_chain.invoke({"input": "今天天气好"}))
def add_suffix(text: str) -> str:
"""后处理:给输出追加后缀"""
return text + "\n\n[由 LCEL 处理]"
def run_postprocess_demo(llm: ChatOpenAI) -> None:
"""RunnableLambda 后处理"""
chain = (
ChatPromptTemplate.from_template("介绍一下 {topic},不少于50字")
| llm
| StrOutputParser()
| RunnableLambda(add_suffix)
)
result = chain.invoke({"topic": "Python 编程语言"})
print("后处理结果:", result)
def run_cot_demo(llm: ChatOpenAI) -> None:
"""思维链 CoT"""
cot_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=(
"你是逻辑推理助手。按步骤回答:"
"步骤1理解问题 步骤2列关键信息 步骤3逐步推理 步骤4给出答案。"
"用【步骤1】【步骤2】【步骤3】【步骤4】格式回答。"
)),
HumanMessagePromptTemplate.from_template("问题:{question}"),
])
cot_chain = cot_prompt | llm | StrOutputParser()
result = cot_chain.invoke({
"question": "如果所有的猫都喜欢鱼,A是猫,B喜欢鱼,那么A和B一定都是猫吗?"
})
print("CoT 结果:", result)
def main() -> None:
llm = build_llm()
run_passthrough_demo()
run_parallel_demo(llm)
run_branch_demo()
run_postprocess_demo(llm)
run_cot_demo(llm)
if __name__ == "__main__":
main()

utils_llm_service.py:封装 LangChain 主链(build_chain())和调用能力(invoke/raw/batch/stream)。demo10_api_service.py:把链路能力暴露为 /chat、/chat/raw、/batch、/chat/stream、/prompt、/health。demo10_chat_ui.html:调用接口并展示普通/流式结果;utils_http_hooks.py:统一日志和异常处理。demo10_chat_ui.html -> demo10_api_service.py -> utils_llm_service.py -> MiniMax。
invoke/raw/batch/stream 四种调用分层answer/think 拆分| 知识模块 | 内容 |
|---|---|
# ========== Demo10: 把 LangChain 变成 API 服务 ==========
# 文件:demo10_api_service.py
#
# 这个示例演示:
# 1. 用 FastAPI 把 LangChain 链路封装成 HTTP API;
# 2. 提供普通调用接口(/chat)和流式输出接口(/chat/stream);
# 3. 使用 Uvicorn 启动服务,便于本地开发和后续生产部署改造;
# 4. 把“HTTP Hook”和“LLM 调用”分别拆到独立模块,降低耦合。
#
# 启动方式:
# uvicorn demo10_api_service:app --host 0.0.0.0 --port 8000 --reload
from contextlib import asynccontextmanager
import json
import logging
import os
from pathlib import Path
from typing import Iterator
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, Field
from utils_http_hooks import register_http_hooks
from utils_llm_service import UtilsLLMService
load_dotenv()
# 应用全局日志,供路由和 hooks 复用。
logger = logging.getLogger("demo10.api")
if not logger.handlers:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
class ChatRequest(BaseModel):
"""聊天请求体。"""
question: str = Field(..., min_length=1, description="用户问题")
class ChatResponse(BaseModel):
"""聊天响应体。"""
answer: str
thinking: str | None = None
class RawChatResponse(BaseModel):
"""原始聊天响应体。"""
raw_answer: str
class BatchRequest(BaseModel):
"""批量提问请求体。"""
questions: list[str] = Field(..., min_length=1, description="问题列表")
class BatchResponse(BaseModel):
"""批量提问响应体。"""
answers: list[str]
class PromptRequest(BaseModel):
"""系统提示词请求体。"""
system_prompt: str = Field(..., min_length=1, description="系统提示词")
class PromptResponse(BaseModel):
"""系统提示词响应体。"""
system_prompt: str
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期 Hook:
- `yield` 前:初始化 UtilsLLMService;
- `yield` 后:做资源清理。
"""
service = UtilsLLMService()
service.initialize()
app.state.llm_service = service
if service.ready:
logger.info("llm init success")
else:
logger.error("llm init failed: %s", service.init_error)
yield
service.shutdown()
logger.info("app shutdown complete")
app = FastAPI(
title="Demo10 LangChain API",
description="使用 FastAPI + Uvicorn 暴露 LangChain 链路能力",
version="1.0.0",
lifespan=lifespan,
)
# 允许前端跨域访问(例如 VSCode Live Server:5500)。
# 也支持通过环境变量 CORS_ALLOW_ORIGINS 覆盖,多个 origin 用英文逗号分隔。
default_cors_origins = [
"http://127.0.0.1:5500",
"http://localhost:5500",
"http://127.0.0.1:8000",
"http://localhost:8000",
]
env_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "")
allow_origins = [x.strip() for x in env_cors_origins.split(",") if x.strip()] or default_cors_origins
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册请求中间件和全局异常处理器。
register_http_hooks(app, logger)
# UI 页面文件路径(和当前 py 文件放在同一目录)。
CHAT_UI_FILE = Path(__file__).with_name("demo10_chat_ui.html")
def get_service() -> UtilsLLMService:
"""读取已初始化的 LLM Service,不可用时抛出 503。"""
service: UtilsLLMService | None = getattr(app.state, "llm_service", None)
if service is None:
raise HTTPException(status_code=503, detail="service unavailable: llm service missing")
if not service.ready:
detail = service.init_error or "chain is not initialized"
raise HTTPException(status_code=503, detail=f"service unavailable: {detail}")
return service
@app.get("/health")
def health() -> dict:
"""健康检查接口,便于网关/监控系统探活。"""
service: UtilsLLMService | None = getattr(app.state, "llm_service", None)
if service is None:
return {"status": "degraded", "llm_ready": False, "reason": "llm service missing"}
if not service.ready:
return {"status": "degraded", "llm_ready": False, "reason": service.init_error}
return {"status": "ok", "llm_ready": True}
@app.get("/chat-ui")
def chat_ui() -> FileResponse:
"""返回一个简单网页,用于调用 /chat 和 /chat/stream。"""
if not CHAT_UI_FILE.exists():
raise HTTPException(status_code=404, detail="chat ui file not found")
return FileResponse(CHAT_UI_FILE)
@app.get("/prompt", response_model=PromptResponse)
def get_prompt() -> PromptResponse:
"""读取当前系统提示词。"""
service = get_service()
return PromptResponse(system_prompt=service.system_prompt)
@app.post("/prompt", response_model=PromptResponse)
def set_prompt(req: PromptRequest) -> PromptResponse:
"""更新系统提示词,并重建链路。"""
service = get_service()
service.set_prompt(req.system_prompt)
return PromptResponse(system_prompt=service.system_prompt)
@app.post("/chat", response_model=ChatResponse)
def chat(req: ChatRequest) -> ChatResponse:
"""普通调用:返回完整答案。"""
service = get_service()
answer, thinking = service.invoke_with_meta(req.question)
return ChatResponse(answer=answer, thinking=thinking or None)
@app.post("/chat/raw", response_model=RawChatResponse)
def chat_raw(req: ChatRequest) -> RawChatResponse:
"""返回模型原始输出,不做清洗。"""
service = get_service()
raw_answer = service.invoke_raw(req.question)
return RawChatResponse(raw_answer=raw_answer)
@app.post("/batch", response_model=BatchResponse)
def batch_chat(req: BatchRequest) -> BatchResponse:
"""批量提问:一次提交多个问题,返回清洗后的答案列表。"""
service = get_service()
answers = service.batch(req.questions)
return BatchResponse(answers=answers)
def _sse_event(event: str, payload: dict) -> str:
"""把字典数据编码成标准 SSE 事件格式。"""
return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
def sse_stream(question: str) -> Iterator[str]:
"""
把 LangChain 的流式输出包装成 SSE:
- status: 阶段提示
- think: 思考片段(如果模型返回了 think/reasoning/analysis 标签)
- answer: 最终答案文本块
- done: 正常结束
- error: 失败结束
"""
try:
service = get_service()
yield _sse_event("status", {"message": "请求已接收,模型开始处理..."})
think_preview = ""
for kind, chunk in service.stream_with_thinking(question):
if kind == "think":
think_preview += chunk
# 仅发送最近一小段思考片段,避免状态消息过长。
preview = think_preview[-120:].strip()
if preview:
yield _sse_event("think", {"text": chunk, "preview": preview})
else:
yield _sse_event("answer", {"text": chunk})
yield _sse_event("done", {"message": "生成完成"})
except Exception as exc:
# 流式响应一旦开始,无法再走标准 HTTP 错误码,故用 SSE 错误事件回传。
yield _sse_event("error", {"message": str(exc)})
@app.post("/chat/stream")
def chat_stream(req: ChatRequest) -> StreamingResponse:
"""流式调用:边生成边返回,适合前端实时显示打字效果。"""
# 先做一次可用性检查,避免明知不可用还建立流连接。
get_service()
return StreamingResponse(
sse_stream(req.question),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
if __name__ == "__main__":
# 允许 python 直接运行本文件,和命令行 uvicorn 启动二选一。
import uvicorn
uvicorn.run("demo10_api_service:app", host="0.0.0.0", port=8000, reload=True)
# ========== Utils: HTTP Hook 封装 ==========
# 文件:utils_http_hooks.py
#
# 这个文件只负责 FastAPI 的“请求级/异常级” Hook:
# 1. HTTP middleware(request_id、耗时日志);
# 2. 全局异常处理(HTTPException、ValidationError、兜底 Exception)。
import logging
import time
import uuid
from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
def register_http_hooks(app: FastAPI, logger: logging.Logger) -> None:
"""注册 middleware 和异常处理器。"""
@app.middleware("http")
async def request_context_middleware(request: Request, call_next):
# 优先透传网关请求 ID,没有就自动生成,便于排查链路日志。
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
request.state.request_id = request_id
request.state.start_time = time.perf_counter()
try:
response = await call_next(request)
except Exception:
# 这里只做日志,异常仍抛给全局处理器。
cost_ms = (time.perf_counter() - request.state.start_time) * 1000
logger.exception(
"[UNHANDLED] %s %s | request_id=%s | cost_ms=%.2f",
request.method,
request.url.path,
request_id,
cost_ms,
)
raise
cost_ms = (time.perf_counter() - request.state.start_time) * 1000
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time-Ms"] = f"{cost_ms:.2f}"
logger.info(
"[OK] %s %s -> %s | request_id=%s | cost_ms=%.2f",
request.method,
request.url.path,
response.status_code,
request_id,
cost_ms,
)
return response
def _request_id(request: Request) -> str:
"""从 request.state 安全读取 request_id。"""
return getattr(request.state, "request_id", "N/A")
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""处理业务层主动抛出的 HTTPException(如 400/404/503)。"""
request_id = _request_id(request)
logger.warning(
"[HTTPException] %s %s -> %s | request_id=%s | detail=%s",
request.method,
request.url.path,
exc.status_code,
request_id,
exc.detail,
)
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"type": "http_exception",
"message": str(exc.detail),
"request_id": request_id,
}
},
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""处理参数校验失败(FastAPI/Pydantic 自动抛出)。"""
request_id = _request_id(request)
logger.warning(
"[ValidationError] %s %s | request_id=%s | errors=%s",
request.method,
request.url.path,
request_id,
exc.errors(),
)
return JSONResponse(
status_code=422,
content={
"error": {
"type": "validation_error",
"message": "request validation failed",
"details": exc.errors(),
"request_id": request_id,
}
},
)
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
"""兜底处理未捕获异常,避免把回溯细节直接暴露给客户端。"""
request_id = _request_id(request)
logger.exception(
"[UnhandledException] %s %s | request_id=%s",
request.method,
request.url.path,
request_id,
)
return JSONResponse(
status_code=500,
content={
"error": {
"type": "internal_error",
"message": "internal server error",
"request_id": request_id,
}
},
)
# ========== Utils: LLM Service 封装 ==========
# 文件:utils_llm_service.py
#
# 这个文件只负责“模型和链路调用”相关逻辑:
# 1. 创建 LLM 实例;
# 2. 构建 LCEL 链;
# 3. 提供 initialize / invoke / stream / shutdown 方法;
# 4. 不依赖 FastAPI,便于单测和复用。
import os
import re
from typing import Iterator
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 这些标签通常是“思考过程”或“中间推理”,对前端展示不是最终答案。
HIDDEN_TAG_NAMES = ("think", "reasoning", "analysis")
HIDDEN_SECTION_PATTERN = re.compile(
r"<\s*(think|reasoning|analysis)\b[^>]*>.*?<\s*/\s*\1\s*>",
flags=re.IGNORECASE | re.DOTALL,
)
HIDDEN_CONTENT_PATTERN = re.compile(
r"<\s*(think|reasoning|analysis)\b[^>]*>(.*?)<\s*/\s*\1\s*>",
flags=re.IGNORECASE | re.DOTALL,
)
DEFAULT_SYSTEM_PROMPT = (
"你是一个清晰、耐心的技术助手。回答要准确、结构清楚、尽量简洁。"
"只输出最终答案,不要输出<think>、<reasoning>、<analysis>等中间思考标签。"
)
def build_llm() -> ChatOpenAI:
"""创建聊天模型实例。"""
# 从环境变量读取 MiniMax 的 API Key。
api_key = os.getenv("MINIMAX_API_KEY")
# 如果没配置 key,直接抛异常,避免后续请求才报错。
if not api_key:
# 抛出运行时错误,提示调用方先设置 MINIMAX_API_KEY。
raise RuntimeError("MINIMAX_API_KEY is not set")
# 返回一个 ChatOpenAI 客户端实例(这里通过兼容接口连接 MiniMax)。
return ChatOpenAI(
# 指定要使用的模型名称。
model="MiniMax-M2.7",
# 指定服务端 API 基地址(MiniMax 兼容 OpenAI 协议入口)。
base_url="https://api.minimaxi.com/v1",
# 传入鉴权密钥,用于请求签名认证。
api_key=api_key,
# 控制随机性,值越高越发散,值越低越稳定。
temperature=0.7,
# 限制单次回复的最大 token 数,防止返回过长。
max_tokens=1000,
# 设置单次请求超时时间(秒),避免长时间阻塞。
timeout=60,
# 失败时自动重试次数,提高临时网络抖动下的稳定性。
max_retries=2,
)
def build_chain(system_prompt: str = DEFAULT_SYSTEM_PROMPT):
"""构建基础 LCEL 链路:Prompt -> LLM -> StrOutputParser。"""
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
system_prompt,
),
("human", "{question}"),
]
)
return prompt | build_llm() | StrOutputParser()
class UtilsLLMService:
"""封装链路生命周期和调用细节。"""
def __init__(self) -> None:
self.chain = None
self.init_error: str | None = None
self.system_prompt: str = DEFAULT_SYSTEM_PROMPT
@property
def ready(self) -> bool:
"""链路是否已经可用。"""
return self.chain is not None
def initialize(self) -> None:
"""启动阶段初始化链路。"""
try:
self.chain = build_chain(self.system_prompt)
self.init_error = None
except Exception as exc:
self.chain = None
self.init_error = str(exc)
def shutdown(self) -> None:
"""关闭阶段清理资源。"""
self.chain = None
def set_system_prompt(self, system_prompt: str) -> None:
"""更新系统提示词,并重建链路。"""
self.system_prompt = str(system_prompt or "").strip() or DEFAULT_SYSTEM_PROMPT
if self.ready:
self.chain = build_chain(self.system_prompt)
def set_prompt(self, system_prompt: str) -> None:
"""set_system_prompt 的别名,方便外部语义化调用。"""
self.set_system_prompt(system_prompt)
def _require_chain(self):
"""拿到可用链路,不可用时抛出运行时异常。"""
if self.chain is None:
detail = self.init_error or "chain is not initialized"
raise RuntimeError(f"service unavailable: {detail}")
return self.chain
def _clean_text_output(self, raw_output) -> str:
"""
清洗模型输出,只保留可展示的有效答案文本。
- 先转字符串;
- 去掉 <think>/<reasoning>/<analysis> 包裹内容;
- 再做首尾空白清理。
"""
text = str(raw_output or "")
# 反复替换,兼容多个隐藏段落。
previous = None
while previous != text:
previous = text
text = HIDDEN_SECTION_PATTERN.sub("", text)
# 兼容“不完整标签”场景:有开标签但没闭合时,丢弃开标签后的内容。
text = re.sub(
r"<\s*(think|reasoning|analysis)\b[^>]*>.*$",
"",
text,
flags=re.IGNORECASE | re.DOTALL,
)
# 清理孤立闭标签。
text = re.sub(r"</\s*(think|reasoning|analysis)\s*>", "", text, flags=re.IGNORECASE)
return text.strip()
def _clean_answer_fragment(self, fragment: str) -> str:
"""
清理流式答案片段(轻量版):
- 去掉残留隐藏标签;
- 保留原始空格和换行,不做 strip,避免拼接后词语粘连。
"""
text = str(fragment or "")
text = re.sub(r"</?\s*(think|reasoning|analysis)\b[^>]*>", "", text, flags=re.IGNORECASE)
return text
def _clean_stream_chunks(self, chunks: Iterator[str]) -> Iterator[str]:
"""
增量清洗流式输出。
目标:即使标签被拆到不同 chunk,也尽量不把隐藏内容透传给前端。
"""
open_pattern = re.compile(
r"<\s*(think|reasoning|analysis)\b[^>]*>",
flags=re.IGNORECASE,
)
close_pattern = re.compile(
r"</\s*(think|reasoning|analysis)\s*>",
flags=re.IGNORECASE,
)
# 为了处理“标签被切分到多个 chunk”的场景,保留一段尾部缓冲。
hold_back = 64
in_hidden = False
buffer = ""
for raw_chunk in chunks:
piece = str(raw_chunk or "")
if not piece:
continue
buffer += piece
while True:
if not in_hidden:
match_open = open_pattern.search(buffer)
if not match_open:
# 暂留尾巴,避免把不完整标签片段提前输出。
if len(buffer) <= hold_back:
break
emit_text = buffer[:-hold_back]
buffer = buffer[-hold_back:]
cleaned = self._clean_text_output(emit_text)
if cleaned:
yield cleaned
break
# 遇到开标签:先输出标签前内容,再进入隐藏区。
visible = buffer[: match_open.start()]
cleaned_visible = self._clean_text_output(visible)
if cleaned_visible:
yield cleaned_visible
buffer = buffer[match_open.end() :]
in_hidden = True
else:
match_close = close_pattern.search(buffer)
if not match_close:
# 隐藏区里没有闭标签时,丢弃大部分内容,仅保留尾巴用于拼接闭标签。
if len(buffer) > hold_back:
buffer = buffer[-hold_back:]
break
# 找到闭标签后,移除隐藏内容,继续正常输出。
buffer = buffer[match_close.end() :]
in_hidden = False
if not in_hidden and buffer:
cleaned_tail = self._clean_text_output(buffer)
if cleaned_tail:
yield cleaned_tail
def _extract_hidden_text(self, raw_output) -> str:
"""
提取模型输出里的隐藏思考内容(如 think/reasoning/analysis 标签内文本)。
如果没有相关标签,返回空字符串。
"""
text = str(raw_output or "")
segments: list[str] = []
for match in HIDDEN_CONTENT_PATTERN.finditer(text):
content = match.group(2).strip()
if content:
segments.append(content)
return "\n".join(segments)
def _split_stream_visible_hidden(self, chunks: Iterator[str]) -> Iterator[tuple[str, str]]:
"""
把流式 chunk 拆成两类事件:
- ("answer", 可展示答案文本)
- ("think", 标签内思考文本)
这样上层可以一边展示答案,一边给用户“正在思考”的状态提示。
"""
open_pattern = re.compile( # 匹配 think/reasoning/analysis 开始标签。
r"<\s*(think|reasoning|analysis)\b[^>]*>", # 支持标签内带属性。
flags=re.IGNORECASE, # 标签名大小写不敏感。
) # 开始标签正则定义结束。
hold_back = 64 # 尾部缓冲长度,避免把被切断的标签片段提前输出。
in_hidden = False # 标记当前是否处于隐藏思考片段内部。
current_tag = "" # 记录当前打开的标签名,用于匹配对应闭标签。
buffer = "" # 流式拼接缓冲区,承接跨 chunk 的文本。
for raw_chunk in chunks: # 逐个处理底层模型返回的 chunk。
piece = str(raw_chunk or "") # 把 chunk 统一转成字符串(兼容 None)。
if not piece: # 空 chunk 直接跳过,避免无意义处理。
continue # 进入下一轮 chunk。
buffer += piece # 把当前 chunk 追加到缓冲区。
while True: # 对当前缓冲区循环消费,直到不能再安全解析为止。
if not in_hidden: # 当前不在隐藏区,优先找开标签。
open_match = open_pattern.search(buffer) # 查找最近的隐藏开标签。
if not open_match: # 没找到开标签,说明当前内容都属于可见答案。
if len(buffer) <= hold_back: # 缓冲长度太短时先保留,防止尾部是半截标签。
break # 暂停消费,等待下一 chunk 拼接后再判断。
emit_text = buffer[:-hold_back] # 安全输出前半部分,把尾巴留在缓冲区。
buffer = buffer[-hold_back:] # 把尾部保留用于下轮拼接解析。
if emit_text: # 只有有内容时才产出事件。
yield ("answer", emit_text) # 产出可见答案片段。
break # 当前缓冲区已处理到安全边界,等待下一 chunk。
visible = buffer[: open_match.start()] # 截取开标签前的可见内容。
if visible: # 可见内容非空时才输出。
yield ("answer", visible) # 产出答案事件。
current_tag = open_match.group(1) # 记录开标签名(think/reasoning/analysis)。
buffer = buffer[open_match.end() :] # 从缓冲区移除已处理部分(含开标签)。
in_hidden = True # 状态切换为“已进入隐藏区”。
else: # 当前位于隐藏区,需要找对应闭标签。
close_pattern = re.compile( # 为当前标签名动态构造闭标签正则。
rf"</\s*{re.escape(current_tag)}\s*>", # 只匹配与当前开标签同名的闭标签。
flags=re.IGNORECASE, # 闭标签大小写不敏感。
) # 闭标签正则定义结束。
close_match = close_pattern.search(buffer) # 在隐藏区中查找闭标签。
if not close_match: # 若还没等到闭标签,说明隐藏内容可能被拆包了。
if len(buffer) <= hold_back: # 缓冲区太短时先全部保留,继续等待拼接。
break # 暂停消费当前缓冲区。
think_text = buffer[:-hold_back] # 先输出安全部分隐藏文本,尾巴继续保留。
buffer = buffer[-hold_back:] # 保留尾部用于拼接潜在闭标签。
if think_text: # 仅当文本非空时产出事件。
yield ("think", think_text) # 产出思考文本事件。
break # 到达安全边界,等待下一 chunk。
think_text = buffer[: close_match.start()] # 截取闭标签前的隐藏文本。
if think_text: # 隐藏文本非空才输出。
yield ("think", think_text) # 产出思考事件。
buffer = buffer[close_match.end() :] # 从缓冲区移除隐藏内容和闭标签。
in_hidden = False # 退出隐藏区,回到可见区解析流程。
current_tag = "" # 清空当前标签名,避免影响后续匹配。
if buffer: # 流结束后如果缓冲区还有残留,按当前状态收尾输出。
if in_hidden: # 若结束时仍在隐藏区,残留文本按思考内容处理。
yield ("think", buffer) # 产出尾部思考文本。
else: # 若不在隐藏区,残留文本属于可见答案。
yield ("answer", buffer) # 产出尾部答案文本。
def invoke_with_meta(self, question: str) -> tuple[str, str]:
"""
普通调用(带元信息):
- answer: 清洗后的最终答案
- thinking: 提取到的思考片段(可能为空字符串)
"""
chain = self._require_chain()
raw_text = chain.invoke({"question": question})
answer = self._clean_text_output(raw_text)
thinking = self._extract_hidden_text(raw_text)
return answer, thinking
def invoke_raw(self, question: str) -> str:
"""普通调用:返回模型原始输出,不做清洗。"""
chain = self._require_chain()
return str(chain.invoke({"question": question}) or "")
def invoke(self, question: str) -> str:
"""普通调用:返回完整文本。"""
answer, _ = self.invoke_with_meta(question)
return answer
def batch(self, questions: list[str]) -> list[str]:
"""批量调用:输入多个问题,返回清洗后的答案列表。"""
chain = self._require_chain()
inputs = [{"question": question} for question in questions]
raw_outputs = chain.batch(inputs)
return [self._clean_text_output(output) for output in raw_outputs]
def stream_with_thinking(self, question: str) -> Iterator[tuple[str, str]]:
"""
流式调用(带事件类型):
- ("answer", 文本):可直接拼接到最终回答;
- ("think", 文本):可用作“模型正在思考”的状态提示。
"""
chain = self._require_chain()
raw_chunks = chain.stream({"question": question})
for kind, text in self._split_stream_visible_hidden(raw_chunks):
if kind == "answer":
cleaned = self._clean_answer_fragment(text)
if cleaned:
yield ("answer", cleaned)
elif text.strip():
yield ("think", text)
def stream(self, question: str) -> Iterator[str]:
"""流式调用:逐块返回文本。"""
for kind, text in self.stream_with_thinking(question):
if kind == "answer":
yield text
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Demo10 Chat UI</title>
<style>
:root {
--bg: #f4f7fb;
--panel: #ffffff;
--text: #162131;
--muted: #5e6b7b;
--primary: #0c66e4;
--primary-2: #2f80ed;
--border: #d9e2ee;
--ok: #18794e;
--bad: #c62828;
}
* {
box-sizing: border-box;
}
body {
margin: 0;
background: linear-gradient(145deg, #edf3ff 0%, #f9fbff 55%, #f4f7fb 100%);
color: var(--text);
font-family: "Avenir Next", "PingFang SC", "Microsoft YaHei", sans-serif;
min-height: 100vh;
display: flex;
justify-content: center;
padding: 16px;
}
.app {
width: min(980px, 100%);
background: var(--panel);
border: 1px solid var(--border);
border-radius: 14px;
box-shadow: 0 12px 40px rgba(17, 45, 78, 0.08);
display: grid;
grid-template-rows: auto auto 1fr auto;
gap: 10px;
padding: 12px;
}
.title {
display: flex;
justify-content: space-between;
align-items: center;
gap: 12px;
}
h1 {
margin: 0;
font-size: 20px;
letter-spacing: 0.2px;
}
.status {
color: var(--muted);
font-size: 13px;
}
.toolbar {
display: grid;
grid-template-columns: 1fr 280px auto auto;
gap: 12px;
align-items: center;
}
.prompt-box {
display: grid;
gap: 8px;
}
.prompt-actions {
display: flex;
gap: 8px;
flex-wrap: wrap;
}
.prompt-label {
font-size: 12px;
color: var(--muted);
font-weight: 600;
}
.api-base {
width: 100%;
border: 1px solid var(--border);
border-radius: 10px;
padding: 8px 10px;
font-size: 13px;
outline: none;
background: #fff;
}
.api-base:focus {
border-color: #91b9f2;
box-shadow: 0 0 0 3px rgba(12, 102, 228, 0.12);
}
.mode-group {
display: flex;
gap: 16px;
align-items: center;
color: var(--muted);
font-size: 14px;
}
.mode-group label {
display: inline-flex;
gap: 6px;
align-items: center;
cursor: pointer;
}
button {
border: none;
border-radius: 10px;
padding: 9px 14px;
font-size: 14px;
cursor: pointer;
color: #fff;
background: linear-gradient(120deg, var(--primary) 0%, var(--primary-2) 100%);
}
button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.btn-ghost {
color: var(--text);
background: #eef4ff;
border: 1px solid #cddcf7;
}
.panel {
border: 1px solid var(--border);
border-radius: 10px;
background: #fcfdff;
padding: 12px;
height: clamp(220px, 28vh, 320px);
overflow: auto;
}
.msg {
margin: 0 0 14px;
white-space: pre-wrap;
line-height: 1.55;
font-size: 14px;
}
.user {
color: #0f4ca6;
}
.assistant {
color: #1f2b3a;
}
.assistant-think {
color: #7a8493;
font-size: 12px;
line-height: 1.25;
white-space: pre-wrap;
margin-bottom: 6px;
}
.assistant-answer {
color: #1f2b3a;
font-size: 14px;
line-height: 1.55;
white-space: pre-wrap;
}
.meta {
color: var(--muted);
font-size: 12px;
margin-bottom: 4px;
}
.composer {
display: grid;
grid-template-columns: 1fr auto;
gap: 10px;
align-items: end;
}
textarea {
resize: vertical;
min-height: 72px;
max-height: 150px;
width: 100%;
border: 1px solid var(--border);
border-radius: 10px;
padding: 10px;
font-size: 14px;
outline: none;
}
#systemPrompt {
min-height: 72px;
max-height: 120px;
}
#question {
min-height: 72px;
max-height: 120px;
}
textarea:focus {
border-color: #91b9f2;
box-shadow: 0 0 0 3px rgba(12, 102, 228, 0.12);
}
.tips {
font-size: 12px;
color: var(--muted);
margin-top: 8px;
}
.ok {
color: var(--ok);
}
.bad {
color: var(--bad);
}
@media (max-width: 900px) {
body {
padding: 12px;
}
.app {
padding: 10px;
gap: 8px;
}
.toolbar {
grid-template-columns: 1fr;
}
.mode-group {
flex-wrap: wrap;
gap: 10px 14px;
}
.panel {
height: clamp(180px, 36vh, 280px);
}
}
@media (max-width: 640px) {
h1 {
font-size: 17px;
}
.title {
flex-direction: column;
align-items: flex-start;
}
.composer {
grid-template-columns: 1fr;
}
#sendBtn {
width: 100%;
}
textarea {
min-height: 68px;
}
}
</style>
</head>
<body>
<div class="app">
<div class="title">
<h1>Demo10 Chat Playground</h1>
</div>
<div class="prompt-box">
<div class="prompt-label">系统提示词(点击“保存提示词”后会同步到服务端)</div>
<textarea id="systemPrompt" placeholder="在这里输入系统提示词,修改后保存即可生效"></textarea>
<div class="prompt-actions">
<button id="loadPromptBtn" class="btn-ghost">刷新提示词</button>
<button id="savePromptBtn">保存提示词</button>
</div>
</div>
<div class="toolbar">
<div class="mode-group">
<strong>模式:</strong>
<label><input type="radio" name="mode" value="chat" checked /> 普通 chat</label>
<label><input type="radio" name="mode" value="stream" /> 流式 stream</label>
<label><input type="radio" name="mode" value="raw" /> 原始 raw</label>
<label><input type="radio" name="mode" value="batch" /> 批量 batch</label>
</div>
<input id="apiBase" class="api-base" type="text" />
<button id="healthBtn" class="btn-ghost">检查服务</button>
<button id="clearBtn" class="btn-ghost">清空对话</button>
</div>
<div id="status" class="status">就绪</div>
<div id="messages" class="panel"></div>
<div>
<div class="composer">
<textarea id="question" placeholder="输入你的问题,例如:请用三句话介绍 LangChain;batch 模式下每行一个问题"></textarea>
<button id="sendBtn">发送</button>
</div>
<div class="tips">
普通模式调用 <code>/chat</code>;流式模式调用 <code>/chat/stream</code>;原始模式调用 <code>/chat/raw</code>;批量模式调用 <code>/batch</code>。
</div>
</div>
</div>
<script>
const messagesEl = document.getElementById("messages");
const questionEl = document.getElementById("question");
const sendBtn = document.getElementById("sendBtn");
const healthBtn = document.getElementById("healthBtn");
const clearBtn = document.getElementById("clearBtn");
const loadPromptBtn = document.getElementById("loadPromptBtn");
const savePromptBtn = document.getElementById("savePromptBtn");
const statusEl = document.getElementById("status");
const apiBaseEl = document.getElementById("apiBase");
const systemPromptEl = document.getElementById("systemPrompt");
const API_BASE_STORAGE_KEY = "demo10_api_base";
const SYSTEM_PROMPT_STORAGE_KEY = "demo10_system_prompt";
function defaultApiBase() {
// 如果页面不是从 http(s) 打开(比如 file://),默认回退到本地 API。
if (!window.location.origin || window.location.origin === "null") {
return "http://127.0.0.1:8000";
}
// Live Server 常用端口 5500,默认把 API 指向 FastAPI 8000。
if (window.location.port === "5500") {
return "http://127.0.0.1:8000";
}
return window.location.origin;
}
// 优先使用上次成功配置的 API 地址,便于重复调试。
// 但如果缓存值误指向 5500(Live Server 自身),自动修正到 8000。
const storedApiBase = localStorage.getItem(API_BASE_STORAGE_KEY) || "";
const fixedStoredBase = /:5500\b/.test(storedApiBase) ? "http://127.0.0.1:8000" : storedApiBase;
apiBaseEl.value = fixedStoredBase || defaultApiBase();
apiBaseEl.placeholder = "API Base URL,例如 http://127.0.0.1:8000";
apiBaseEl.addEventListener("change", () => {
localStorage.setItem(API_BASE_STORAGE_KEY, apiBaseEl.value.trim());
});
const defaultSystemPrompt =
"你是一个清晰、耐心的技术助手。回答要准确、结构清楚、尽量简洁。只输出最终答案,不要输出<think>、<reasoning>、<analysis>等中间思考标签。";
systemPromptEl.value = localStorage.getItem(SYSTEM_PROMPT_STORAGE_KEY) || defaultSystemPrompt;
systemPromptEl.addEventListener("change", () => {
localStorage.setItem(SYSTEM_PROMPT_STORAGE_KEY, systemPromptEl.value.trim());
});
function buildUrl(path) {
const base = (apiBaseEl.value || "").trim().replace(/\/+$/, "");
return `${base}${path}`;
}
async function parseJsonOrThrow(resp) {
const url = resp.url || "unknown-url";
const contentType = resp.headers.get("content-type") || "";
const rawText = await resp.text();
// 空响应体:给出更清晰错误,避免浏览器直接抛 Unexpected end of JSON input。
if (!rawText || !rawText.trim()) {
throw new Error(`接口返回空响应体,status=${resp.status},url=${url}`);
}
if (contentType.includes("application/json")) {
try {
return JSON.parse(rawText);
} catch (e) {
throw new Error(
`JSON 解析失败,status=${resp.status},url=${url},body=${rawText.slice(0, 180)}`
);
}
}
throw new Error(
`接口未返回 JSON,status=${resp.status},url=${url},content-type=${contentType},body=${rawText.slice(0, 180)}`
);
}
function getMode() {
return document.querySelector('input[name="mode"]:checked')?.value || "chat";
}
function appendMessage(role, text) {
const wrap = document.createElement("div");
const meta = document.createElement("div");
meta.className = "meta";
meta.textContent = role === "user" ? "你" : "助手";
const body = document.createElement("div");
body.className = `msg ${role}`;
body.textContent = text;
wrap.appendChild(meta);
wrap.appendChild(body);
messagesEl.appendChild(wrap);
messagesEl.scrollTop = messagesEl.scrollHeight;
return body;
}
function setStatus(text, type = "normal") {
statusEl.textContent = text;
statusEl.classList.remove("ok", "bad");
if (type === "ok") statusEl.classList.add("ok");
if (type === "bad") statusEl.classList.add("bad");
}
async function checkHealth() {
setStatus("检查中...");
try {
const resp = await fetch(buildUrl("/health"));
const data = await parseJsonOrThrow(resp);
if (resp.ok && data.status === "ok") {
setStatus("服务正常: llm_ready=true", "ok");
} else {
setStatus(`服务异常: ${JSON.stringify(data)}`, "bad");
}
} catch (err) {
setStatus(`健康检查失败: ${err}`, "bad");
}
}
async function loadPrompt() {
setStatus("读取提示词中...");
try {
const resp = await fetch(buildUrl("/prompt"));
const data = await parseJsonOrThrow(resp);
if (resp.ok && typeof data.system_prompt === "string") {
systemPromptEl.value = data.system_prompt;
localStorage.setItem(SYSTEM_PROMPT_STORAGE_KEY, data.system_prompt);
setStatus("提示词已加载", "ok");
} else {
setStatus(`提示词读取异常: ${JSON.stringify(data)}`, "bad");
}
} catch (err) {
setStatus(`提示词读取失败: ${err}`, "bad");
}
}
async function savePrompt() {
const systemPrompt = systemPromptEl.value.trim();
if (!systemPrompt) {
setStatus("系统提示词不能为空", "bad");
return;
}
savePromptBtn.disabled = true;
setStatus("保存提示词中...");
try {
const resp = await fetch(buildUrl("/prompt"), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ system_prompt: systemPrompt }),
});
const data = await parseJsonOrThrow(resp);
if (!resp.ok) {
throw new Error(JSON.stringify(data));
}
localStorage.setItem(SYSTEM_PROMPT_STORAGE_KEY, data.system_prompt || systemPrompt);
setStatus("提示词已保存", "ok");
} catch (err) {
setStatus(`保存提示词失败: ${err}`, "bad");
} finally {
savePromptBtn.disabled = false;
}
}
async function sendChat(question) {
const resp = await fetch(buildUrl("/chat"), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question }),
});
const data = await parseJsonOrThrow(resp);
if (!resp.ok) {
throw new Error(JSON.stringify(data));
}
if (typeof data.answer !== "string") {
throw new Error(`接口返回缺少 answer 字段: ${JSON.stringify(data).slice(0, 200)}`);
}
return data;
}
async function sendRaw(question) {
const resp = await fetch(buildUrl("/chat/raw"), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question }),
});
const data = await parseJsonOrThrow(resp);
if (!resp.ok) {
throw new Error(JSON.stringify(data));
}
if (typeof data.raw_answer !== "string") {
throw new Error(`接口返回缺少 raw_answer 字段: ${JSON.stringify(data).slice(0, 200)}`);
}
return data.raw_answer;
}
async function sendBatch(questionText) {
const questions = questionText
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
if (!questions.length) {
throw new Error("batch 模式下请至少输入一行问题");
}
const resp = await fetch(buildUrl("/batch"), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ questions }),
});
const data = await parseJsonOrThrow(resp);
if (!resp.ok) {
throw new Error(JSON.stringify(data));
}
if (!Array.isArray(data.answers)) {
throw new Error(`接口返回缺少 answers 字段: ${JSON.stringify(data).slice(0, 200)}`);
}
return data.answers;
}
async function sendStream(question, answerNode, thinkNode) {
const resp = await fetch(buildUrl("/chat/stream"), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question }),
});
if (!resp.ok || !resp.body) {
const t = await resp.text();
throw new Error(t || `stream failed, status=${resp.status}`);
}
const reader = resp.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
let hasAnswerStarted = false;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split("\n\n");
buffer = events.pop() || "";
for (const evt of events) {
const lines = evt.split("\n");
let eventType = "message";
const dataLines = [];
for (const line of lines) {
if (line.startsWith("event:")) {
eventType = line.slice(6).trim();
continue;
}
if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trimStart());
}
}
const rawData = dataLines.join("\n");
let payload = null;
try {
payload = rawData ? JSON.parse(rawData) : null;
} catch (_) {
payload = null;
}
// 新版 SSE 事件格式:status / think / answer / done / error。
if (eventType === "status") {
setStatus(payload?.message || "模型处理中...");
continue;
}
if (eventType === "think") {
const preview = payload?.preview || payload?.text || "";
const thinkText = payload?.text || "";
if (thinkNode && thinkText) {
thinkNode.textContent += thinkText;
messagesEl.scrollTop = messagesEl.scrollHeight;
}
if (preview) {
setStatus(`模型思考中: ${preview.slice(-36)}`);
}
continue;
}
if (eventType === "answer") {
const text = payload?.text ?? rawData;
if (text) {
if (!hasAnswerStarted) {
answerNode.textContent = "";
hasAnswerStarted = true;
}
answerNode.textContent += text;
messagesEl.scrollTop = messagesEl.scrollHeight;
}
continue;
}
if (eventType === "done") {
return;
}
if (eventType === "error") {
throw new Error(payload?.message || rawData || "stream error");
}
// 向后兼容旧格式:只有 data 行,没有 event 行。
if (rawData === "[DONE]") return;
if (rawData.startsWith("[ERROR]")) {
throw new Error(rawData);
}
if (rawData) {
if (!hasAnswerStarted) {
answerNode.textContent = "";
hasAnswerStarted = true;
}
answerNode.textContent += rawData;
messagesEl.scrollTop = messagesEl.scrollHeight;
}
}
}
}
function startWaitingHints(mode) {
const hints =
mode === "stream"
? ["连接流式通道中", "模型正在思考", "正在生成答案片段", "正在整理输出格式"]
: mode === "raw"
? ["请求已发送", "模型正在直接返回原始内容", "正在接收原始输出", "正在整理原始结果"]
: mode === "batch"
? ["批量请求已发送", "模型正在逐条处理", "正在汇总批量答案", "正在整理批量结果"]
: ["请求已发送", "模型正在思考", "正在生成完整答案", "正在清洗输出内容"];
let tick = 0;
setStatus(`${hints[0]}...`);
const timer = setInterval(() => {
tick += 1;
const message = hints[tick % hints.length];
setStatus(`${message}... ${tick}s`);
}, 1000);
return () => clearInterval(timer);
}
async function onSend() {
const question = questionEl.value.trim();
if (!question) {
setStatus("请输入问题", "bad");
return;
}
sendBtn.disabled = true;
appendMessage("user", question);
questionEl.value = "";
const assistantBody = appendMessage("assistant", "");
let answerNode = assistantBody;
let thinkNode = null;
const mode = getMode();
if (mode === "chat") {
answerNode.textContent = "正在思考中,请稍候...";
} else if (mode === "stream") {
thinkNode = document.createElement("div");
thinkNode.className = "assistant-think";
assistantBody.appendChild(thinkNode);
answerNode = document.createElement("div");
answerNode.className = "assistant-answer";
answerNode.textContent = "正在建立流式连接...";
assistantBody.appendChild(answerNode);
} else if (mode === "raw") {
answerNode.textContent = "正在请求原始输出...";
} else {
answerNode.textContent = "正在批量处理...";
}
const stopHints = startWaitingHints(mode);
try {
if (mode === "chat") {
const result = await sendChat(question);
answerNode.textContent = result.answer;
// chat 是一次性返回,无法实时展示思考过程;这里只展示请求状态和最终结果。
setStatus("完成", "ok");
} else if (mode === "stream") {
await sendStream(question, answerNode, thinkNode);
setStatus("完成", "ok");
} else if (mode === "raw") {
const rawAnswer = await sendRaw(question);
answerNode.textContent = rawAnswer;
setStatus("完成", "ok");
} else {
const answers = await sendBatch(question);
answerNode.textContent = answers
.map((answer, index) => `${index + 1}. ${answer}`)
.join("\n\n");
setStatus("完成", "ok");
}
} catch (err) {
answerNode.textContent = `请求失败: ${err}`;
setStatus("请求失败", "bad");
} finally {
stopHints();
sendBtn.disabled = false;
}
}
sendBtn.addEventListener("click", onSend);
healthBtn.addEventListener("click", checkHealth);
loadPromptBtn.addEventListener("click", loadPrompt);
savePromptBtn.addEventListener("click", savePrompt);
clearBtn.addEventListener("click", () => {
messagesEl.innerHTML = "";
setStatus("已清空");
});
questionEl.addEventListener("keydown", (e) => {
// 仅按钮发送:这里不拦截 Enter,保持输入框默认换行行为。
});
checkHealth();
loadPrompt();
</script>
</body>
</html>



https://github.com/kunyashaw/langChainBaisc# MiniMax API(硅基流动)
MINIMAX_API_KEY=sk-your-key
# Ollama 本地服务
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_EMBED_MODEL=qwen3-embedding:0.6b
| 用途 | 模型 | 配置 |
|---|---|---|
uv add langchain langchain-openai langchain-community langchain-chroma langchain-ollama langchain-text-splitters python-dotenv pydantic
下一步:进入
demos_basic目录,逐个运行 demo 文件验证环境。
Demo 01-09 均可直接运行。
| 概念 | 一句话说明 | 对应 Demo |
|---|---|---|
| API | 作用 | 对应 Demo |
|---|---|---|
| 核心代码 | 作用 | 使用场景 |
|---|---|---|
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。