惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - kunyashaw

基于langgraph的智能问答工作流 langgraph 基础使用(条件/循环/嵌套子图) vue3Crush以及对比vue2 渗透与常见服务配置 LangChain教程-4、构建简易智能 PPT 生成器 LangChain教程-3、Langchain进阶 LangChain教程-1、python基础 openclaw skill--一键生成项目宣讲介绍网页及长截图 openclaw新手skill推荐: openclaw-newbie-faq 用opencode和minimax给娃搭了一个raz学习站点 clawdbot(新名字:moltbot、OpenClaw)折腾过程 angualr基础 node基础 vue基础 某业务技术架构 漏洞治理 堡垒机方案 linux常见软件的环境搭建 linux运维基础
LangChain教程-2、Langchain基础
kunyashaw · 2026-03-26 · via 博客园 - kunyashaw

httpsgithub

代码已上传到: https://github.com/kunyashaw/langChainBaisc

langchain_9_demos_summary

LangChain 1.x 实战教程 · 10 个 Demo

基于 demo_basic 实战代码
前置要求:Python 3.10+、uv、API Key(MiniMax 硅基流动)、Ollama 本地服务
官方文档:https://python.langchain.com/docs/

开场:LangChain 的 3W1H

1) Why:为什么需要 LangChain?

直接调用模型 API 只能解决"单次问答",但真实业务要的是"系统能力":提示词管理、记忆、检索(RAG)、工具调用、多步编排和可观测性。LangChain 的价值,是把这些应用层能力标准化、组件化。

2) What:LangChain 是什么?

LangChain 是一个 LLM 应用编排框架,核心是把不同能力拼成可维护的链路:

  • 模型层:ChatOpenAI / ChatOllama 负责推理与生成
  • 编排层:LCEL 和 Runnable 负责把步骤连起来
  • 数据层:Loader / Splitter / Vector Store / Retriever 负责处理知识
  • 能力层:Tool / Agent / Memory / Parser 负责扩展应用能力

3) Who:谁适合学?

  • 想把模型能力接进业务流程的开发者
  • 想把提示词脚本做成可维护系统的人
  • 想做 RAG、Agent、工具调用、可观测性的人

4) How:怎么把它落地?

先从最小闭环开始:Prompt -> LLM -> Parser
再往上叠加:Memory 解决上下文,Retriever 解决知识,ToolAgent 解决外部能力,Callback 解决可观测性。
最后把这些组件组合成真正可维护的应用链路。

补充:这篇文章怎么读

  • 基于 demo_basic 真实运行代码
  • 使用 MiniMax API(硅基流动)+ Ollama 本地 Embedding
  • 每章节对应一个可运行的 demo 文件

LangChain 架构图

[用户输入]
   ↓
[Prompt / LCEL]
   ↓
[LLM 推理]
   ↓
[Output Parser]
   ↓
[结果输出]

旁路能力:
Memory ──► Prompt
Retriever ──► Prompt
Tool / Agent ◄──► LLM
Callback / Streaming ──► 全链路观测

10 个 Demo 一览

点击下表里的 Demo 名称可直接跳转到对应章节;每个 Demo 标题右侧的 可以返回这里。

D01LCEL 管道语法 demo01_lcel_basics.py D02Prompt Template demo02_prompt_template.py D03Memory 记忆 demo03_memory.py D04Output Parser demo04_output_parser.py D05RAG 核心链路 demo05_rag_ingest.py D06RAG Chain demo06_rag_chain.py D07Tool + Agent demo07_tools_agent.py D08Callback 可观测性 demo08_callbacks.py D09LCEL 进阶 demo09_runnable_advanced.py D10FastAPI API 服务 demo10_api_service.py
Demo 主题 对应文件

Demo 01 · LCEL 语法入门 ↑ 返回目录

代码整体功能

  • 用最小 LCEL 闭环(prompt | llm | parser)完成主题文案生成,并演示单次调用、模板渲染和批量调用。

学习目标

  • ✅ 掌握 LCEL 三件套:ChatPromptTemplate + llm + StrOutputParser
  • ✅ 理解管道符 \| 原理
  • ✅ 掌握 .invoke() / .batch() 调用方式

重点知识强化

组件说明    ChatPromptTemplate:定义提示词模板,把变量位预留出来;ChatOpenAI:负责把 prompt 送去模型并拿回结果;StrOutputParser:把模型输出转成纯字符串;from_template():创建模板;.partial():预填固定变量;|:串联提示词、模型和解析器。
学习目标组件: ChatPromptTemplate、ChatOpenAI、StrOutputParser、|.invoke().batch()代码对应   ChatPromptTemplate.from_template(...) 负责定义模板;.partial(profession="技术写作") 负责预填固定变量;prompt | llm | StrOutputParser() 负责串起最小闭环;.invoke() 做单次调用,.batch() 做批量调用。 对比理解   format() 只是渲染模板,不会调用模型;.invoke() 才会真正执行整条链;.batch() 适合批量生成;| 表示上游输出自动流入下游输入。 常见坑    常见坑是变量名不一致、批量文本过长、调试时直接看模型输出而不先看渲染结果。建议把模板、模型参数和输出解析拆开配置,便于复用、排查和 A/B 测试。 真实场景    你要做一个最小问答助手,输入一个主题就输出固定风格的介绍。这个 Demo 讲的就是把“提示词 -> 模型 -> 输出”的最小闭环先跑通。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo01_lcel_basics.py

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

# 使用 MiniMax 模型(硅基流动)
llm = ChatOpenAI(
    model="MiniMax-M2.7",
    base_url="https://api.minimaxi.com/v1",
    api_key=os.getenv("MINIMAX_API_KEY"),
    temperature=0.7,
    max_tokens=1000,
    request_timeout=60,
    max_retries=2,
)

# Prompt 模板
prompt = ChatPromptTemplate.from_template(
    "你是一位{profession}专家。请用3句话介绍{topic},最后加一个彩蛋笑话。"
).partial(profession="技术写作")

# LCEL 管道
chain = prompt | llm | StrOutputParser()

# 单次调用
result = chain.invoke({"topic": "LangChain 框架"})
print("=== 单次调用 ===")
print(result)

# 渲染后的 Prompt
rendered = prompt.format(topic="Python 装饰器")
print("=== 渲染后的 Prompt ===")
print(rendered)

# 批量调用
results = chain.batch([
    {"topic": "量子计算"},
    {"topic": "区块链"},
    {"topic": "神经网络"},
])
print("=== 批量调用 ===")
for r in results:
    print(r)

demo01_what_when_how_wide

Demo 02 · Prompt Template ↑ 返回目录

代码整体功能

  • 演示纯文本模板、消息模板、历史消息占位和模板组合,构建更可维护的提示词组织方式。

学习目标

  • ✅ 掌握 PromptTemplate(纯文本)和 ChatPromptTemplate(消息型)
  • ✅ 理解 MessagesPlaceholder 的作用
  • ✅ 掌握 PipelinePromptTemplate 嵌套模板

重点知识强化

组件说明    PromptTemplate:纯文本模板,适合单段变量替换;ChatPromptTemplate:消息模板,适合多角色对话;MessagesPlaceholder:把历史消息插到指定位置;HumanMessagePromptTemplate / SystemMessagePromptTemplate:分别定义用户消息和系统消息;PipelinePromptTemplate:把子模板串起来。
学习目标组件: PromptTemplate、ChatPromptTemplate、MessagesPlaceholder、HumanMessagePromptTemplate、SystemMessagePromptTemplate、PipelinePromptTemplate。 代码对应   PromptTemplate.from_template(...) 适合纯文本替换;ChatPromptTemplate.from_messages([...]) 适合多角色对话;MessagesPlaceholder(variable_name="chat_history") 负责插入历史消息;PipelinePromptTemplate(...) 演示模板套模板。 对比理解    纯文本模板适合简单变量替换,消息模板更适合对话场景;MessagesPlaceholder 是消息级插入,不是字符串拼接;模板嵌套适合复杂提示词拆分。 常见坑    常见坑是历史消息传成字符串、嵌套变量漏填、模板层级越写越复杂。建议系统提示词尽量固定,历史消息只保留必要内容,复杂场景再做模板组合。 真实场景    客服、陪伴类、知识问答类应用,都需要把系统人设、历史对话和当前问题拆开管理。这个 Demo 讲的就是提示词模板化。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo02_prompt_template.py

from langchain_core.prompts import (
    ChatPromptTemplate,
    PromptTemplate,
    MessagesPlaceholder,
    HumanMessagePromptTemplate,
    SystemMessagePromptTemplate
)
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

llm = ChatOpenAI(
    model="MiniMax-M2.7",
    base_url="https://api.minimaxi.com/v1",
    api_key=os.getenv("MINIMAX_API_KEY"),
    temperature=0.7,
    max_tokens=1000,
)

# PipelinePrompt 替代方案(更简洁)
introduction = PromptTemplate.from_template("你是 {character},一个乐于助人的 AI 助手。")
main_template = "{introduction}\n\n用户问:{question}\n\n你的回答:"

def create_pipeline_prompt(character, question):
    intro = introduction.format(character=character)
    return main_template.format(introduction=intro, question=question)

final = create_pipeline_prompt("LangChain 助手", "什么是 RAG?")
print("PipelinePrompt 结果:", final)
response = llm.invoke(final)

# response 是 AIMessage 对象
answer = response.content
token_usage = response.response_metadata["token_usage"]
model_name = response.response_metadata["model_name"]
finish_reason = response.response_metadata["finish_reason"]

print("AI 回复:", answer)
print("Token用量:", token_usage)
print("模型名称:", model_name)
print("结束原因:", finish_reason)

demo02_what_when_how_wide

Demo 03 · Memory 记忆 ↑ 返回目录

代码整体功能

  • 实现窗口记忆与摘要记忆两种策略,把多轮上下文压缩后继续接入 LangChain 链路。

学习目标

  • ✅ 掌握窗口记忆 WindowedMemory
  • ✅ 掌握摘要记忆 SummaryMemory
  • ✅ 理解 get_buffer_string 的用法

重点知识强化

组件说明    WindowedMemory:只保留最近几轮对话;SummaryMemory:把长对话压缩成摘要;get_buffer_string():把消息列表转成可读文本,方便继续喂给模型;MessagesPlaceholder:把历史消息接入提示词;ChatPromptTemplate / StrOutputParser:负责接回推理链。
学习目标组件: WindowedMemory、SummaryMemory、get_buffer_string代码对应   WindowedMemory(k=2) 保留最近窗口;SummaryMemory(llm, max_history=3) 负责自动摘要;get_buffer_string(...) 把消息列表转成可读上下文;summary_chain 让摘要后的上下文继续参与回答。 对比理解    窗口记忆保留原文但会丢掉更早内容,摘要记忆保信息密度但会损失细节。窗口适合短对话,摘要适合长对话,真实项目里通常要结合使用。 常见坑    常见坑是只保存在内存里、不做持久化;窗口太短会丢上下文,太长又会浪费 token;摘要会失真。建议长对话先摘要,再保留最近几轮原文,并持久化到 Redis 或数据库。 真实场景    用户希望助手“记得我”,但历史又不能无限增长。这个 Demo 讲的是如何把记忆做成一个可控的工程组件。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo03_memory.py

from langchain_openai import ChatOpenAI
from langchain_core.prompts import (
    ChatPromptTemplate,
    MessagesPlaceholder,
    HumanMessagePromptTemplate,
)
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, get_buffer_string
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
import os

load_dotenv()

llm = ChatOpenAI(
    model="MiniMax-M2.7",
    base_url="https://api.minimaxi.com/v1",
    api_key=os.getenv("MINIMAX_API_KEY"),
    temperature=0.7,
    max_tokens=1000,
)

class WindowedMemory:
    """只保留最近 k 条消息的窗口记忆"""
    def __init__(self, k: int = 6):
        self.k = k
        self.messages = []

    def add_user_message(self, text: str):
        self.messages.append(HumanMessage(content=text))
        if len(self.messages) > self.k * 2:
            self.messages = self.messages[-self.k * 2:]

    def add_ai_message(self, text: str):
        self.messages.append(AIMessage(content=text))
        if len(self.messages) > self.k * 2:
            self.messages = self.messages[-self.k * 2:]

    def get_messages(self) -> list:
        return self.messages[-self.k:]

class SummaryMemory:
    """AI 摘要记忆:对话太长时自动压缩"""
    def __init__(self, llm, max_history: int = 20):
        self.llm = llm
        self.max_history = max_history
        self.summary = ""
        self.current_messages = []

    def add_user_message(self, text: str):
        self.current_messages.append(HumanMessage(content=text))

    def add_ai_message(self, text: str):
        self.current_messages.append(AIMessage(content=text))

    def get_context(self) -> str:
        recent = get_buffer_string(self.current_messages[-self.max_history:])
        if self.summary:
            return f"【历史摘要】{self.summary}\n\n【最近对话】\n{recent}"
        return recent

    def should_summarize(self) -> bool:
        return len(self.current_messages) >= self.max_history * 2

    def summarize(self):
        summary_prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="请把以下对话压缩成一段简短摘要,保留关键信息:"),
            HumanMessagePromptTemplate.from_template("{messages}"),
        ])
        self.summary = (
            summary_prompt | self.llm | StrOutputParser()
        ).invoke({"messages": get_buffer_string(self.current_messages)})
        self.current_messages = []

# 去掉 <think> 标签的辅助函数
def extract_plain_text(summary_with_thinking: str) -> str:
    import re
    text = re.sub(r'<think>.*?', '', summary_with_thinking, flags=re.DOTALL)
    return text.strip()

# 示例
print("\n===== WindowedMemory 示例 =====")
w_memory = WindowedMemory(k=2)
w_memory.add_user_message("你好!")
w_memory.add_ai_message("你好,有什么可以帮你的?")
w_memory.add_user_message("我想了解 LangChain")
w_memory.add_ai_message("LangChain 是一个构建 LLM 应用的框架。")
print("只保留最近 2 条:", w_memory.get_messages())

print("\n===== SummaryMemory 示例 =====")
s_memory = SummaryMemory(llm, max_history=3)
s_memory.add_user_message("我想计划下个月去日本东京旅游")
s_memory.add_ai_message("东京旅游很棒!必去景点包括:浅草寺、东京塔...")
s_memory.add_user_message("签证怎么办理?")
s_memory.add_ai_message("日本旅游签证需要通过旅行社代办...")
s_memory.add_user_message("机票和酒店有推荐吗?")
s_memory.add_ai_message("机票建议提前2个月订,东京往返含税约2500-4500元...")

print(f"消息数: {len(s_memory.current_messages)}, 需摘要: {s_memory.should_summarize()}")

s_memory.summarize()
plain_summary = extract_plain_text(s_memory.summary)
print(f"压缩后摘要: {plain_summary}")

demo03_what_when_how_wide

Demo 04 · Output Parser ↑ 返回目录

学习目标

  • ✅ 掌握 JsonOutputParser
  • ✅ 掌握 PydanticOutputParser + BaseModel
  • ✅ 学会处理 MiniMax 输出的 <think> 标签

重点知识强化

组件说明    JsonOutputParser:解析 JSON 字典;PydanticOutputParser:把输出校验成模型对象;CommaSeparatedListOutputParser:解析逗号列表;RunnableLambda:在 parser 前做清洗;clean_json_output() / clean_list_output():去掉 <think> 和中文标点噪声;PersonInfo / field_validator:定义字段和校验规则。
学习目标组件: JsonOutputParser、PydanticOutputParser、BaseModel、clean_json_output、RunnableLambda、CommaSeparatedListOutputParser。 代码对应   build_llm() 把温度调到 0.0 降低漂移;chain_json = ... | RunnableLambda(clean_json_output) | json_parser 先清噪音再解析 JSON;PersonInfo 负责 schema 和字段校验;list_chain 负责把输出归一成逗号列表。 对比理解    JSON 解析更灵活,Pydantic 解析更严格,列表解析更轻量;parser 负责“解析”,不负责“纠错”。结构越关键,校验就越要严格。 常见坑    常见坑是模型看起来像 JSON 但其实不是合法 JSON,中文标点会影响列表解析,只靠提示词也不够稳。建议关键字段做 schema 验证、重试和人工兜底。 真实场景    当你需要从简历、工单、合同或订单备注里抽字段时,最怕模型胡说八道。这个 Demo 讲的就是结构化输出。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo04_output_parser.py

### 代码整体功能
- 通过清洗函数 + 输出解析器,把模型文本稳定转换为 JSON、Pydantic 对象和列表结构。

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.messages import SystemMessage
from langchain_core.output_parsers import (
    JsonOutputParser,
    PydanticOutputParser,
    CommaSeparatedListOutputParser,
)
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel, Field, field_validator
import re
from dotenv import load_dotenv
import os

load_dotenv()

def build_llm() -> ChatOpenAI:
    return ChatOpenAI(
        model="MiniMax-M2.7",
        base_url="https://api.minimaxi.com/v1",
        api_key=os.getenv("MINIMAX_API_KEY"),
        temperature=0.0,
        max_tokens=1000,
        timeout=60,
        max_retries=2,
    )

# 关键:处理 MiniMax 输出的 <think> 标签
def clean_json_output(input_str: str) -> str:
    if hasattr(input_str, "content"):
        input_str = input_str.content
    # 去掉 <think>...
    pattern = r"<think>.*?"
    text = re.sub(pattern, "", str(input_str), flags=re.DOTALL)
    return text.strip()

def clean_list_output(input_str: str) -> str:
    text = clean_json_output(input_str)
    return text.replace(",", ",").replace("、", ",")

def run_json_demo(llm: ChatOpenAI) -> None:
    json_parser = JsonOutputParser()
    json_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="你是一个数据提取助手。只返回合法 JSON,不要解释。"),
        HumanMessagePromptTemplate.from_template(
            "从以下文本中提取信息,以 JSON 格式返回:\n{text}\n\n格式要求:\n{format}"
        ),
    ])
    chain_json = json_prompt | llm | RunnableLambda(clean_json_output) | json_parser

    result = chain_json.invoke({
        "text": "布鲁斯,37岁,来自深圳,是一名技术合伙人,擅长 Python 和 Java。",
        "format": json_parser.get_format_instructions(),
    })
    print("JSON 结果:", result)

# Pydantic 模型
class PersonInfo(BaseModel):
    name: str = Field(description="人物姓名")
    age: int = Field(description="人物年龄(必须是整数)")
    city: str = Field(description="所在城市")
    skills: list[str] = Field(default_factory=list, description="掌握的技能列表")

    @field_validator("age")
    @classmethod
    def age_must_be_positive(cls, v: int) -> int:
        if v <= 0 or v > 150:
            raise ValueError(f"年龄 {v} 不合理!")
        return v

def run_pydantic_demo(llm: ChatOpenAI) -> None:
    pydantic_parser = PydanticOutputParser(pydantic_object=PersonInfo)
    pydantic_prompt = (
        ChatPromptTemplate.from_messages([
            SystemMessage(content="你是一个数据提取助手。只返回符合格式要求的 JSON 本身。"),
            HumanMessagePromptTemplate.from_template("从以下文本中提取信息:\n{text}\n\n{format}"),
        ])
        .partial(format=pydantic_parser.get_format_instructions())
    )
    chain_pydantic = pydantic_prompt | llm | RunnableLambda(clean_json_output) | pydantic_parser

    person: PersonInfo = chain_pydantic.invoke({
        "text": "布鲁斯,37岁,深圳技术合伙人,擅长Node Python、Java、Go。"
    })
    print(f"姓名:{person.name},年龄:{person.age},城市:{person.city},技能:{person.skills}")

def run_list_demo(llm: ChatOpenAI) -> None:
    list_parser = CommaSeparatedListOutputParser()
    list_prompt = (
        ChatPromptTemplate.from_messages([
            SystemMessage(content="你是一个只输出逗号分隔列表的助手。使用英文逗号。"),
            HumanMessagePromptTemplate.from_template("列出 {subject} 的 {n} 个优点。\n\n{format}"),
        ])
        .partial(format=list_parser.get_format_instructions())
    )
    list_chain = list_prompt | llm | RunnableLambda(clean_list_output) | list_parser
    result = list_chain.invoke({"subject": "Python", "n": 5})
    print("列表结果:", result)

def main() -> None:
    llm = build_llm()
    # run_json_demo(llm)
    # run_pydantic_demo(llm)
    run_list_demo(llm)

if __name__ == "__main__":
    main()

demo04_what_when_how_wide

Demo 05 · RAG 核心链路 ↑ 返回目录

代码整体功能

  • 完成文档加载、切块、向量化入库和检索验证,搭建 RAG 的数据准备链路。

学习目标

  • ✅ 掌握 TextLoader / PyPDFLoader / WebBaseLoader
  • ✅ 掌握 RecursiveCharacterTextSplitter
  • ✅ 掌握 Chroma 向量库 + Ollama Embedding

重点知识强化

组件说明    TextLoader:读本地文本;PyPDFLoader:读 PDF;WebBaseLoader:读网页;RecursiveCharacterTextSplitter:把长文切块;Chroma:存向量并检索;OllamaEmbeddings:把文本转成向量;requests:做 Ollama 健康检查;Document:统一承载内容和 metadata。
学习目标组件: TextLoader、PyPDFLoader、WebBaseLoader、RecursiveCharacterTextSplitter、Chroma、OllamaEmbeddings。 代码对应   build_embeddings() 先做 Ollama 健康检查;load_documents() 把文本、PDF、网页统一成 Documentsplit_documents() 切块;build_vectorstore() 写入 Chroma;run_queries() 验证检索效果。 对比理解    文档加载和文档切块是两件事;similarity_search() 只看结果,similarity_search_with_score() 还能看分数;不同来源最终都要统一成 Document 列表。 常见坑    常见坑是文档来源不统一、切块过大或过小、网页加载受网络影响。建议入库前做清洗、去重、metadata 标注,文档更新后做重建或增量同步。 真实场景    企业知识库、FAQ 搜索、产品文档问答,都需要先把文本处理成向量库。这个 Demo 讲的是“入库”阶段。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo05_rag_ingest.py

from pathlib import Path
import os
import requests
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader, PyPDFLoader, WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings

load_dotenv()

def build_embeddings() -> OllamaEmbeddings:
    """使用本地 Ollama embedding 模型"""
    model = os.getenv("OLLAMA_EMBED_MODEL", "qwen3-embedding:0.6b")
    base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
    
    # 检查 Ollama 服务
    health_url = f"{base_url.rstrip('/')}/api/tags"
    try:
        requests.get(health_url, timeout=3)
    except requests.RequestException as exc:
        raise RuntimeError(f"Ollama 服务不可用,请先启动 ollama") from exc

    return OllamaEmbeddings(model=model, base_url=base_url)

def load_documents() -> list[Document]:
    """读取本地文本 / PDF / 网页"""
    # 文本
    text_path = Path("essay.txt")
    if text_path.exists():
        text_docs = TextLoader(str(text_path), encoding="utf-8").load()
    else:
        text_docs = [Document(
            page_content="LangChain 是一个用于构建 LLM 应用的框架。",
            metadata={"source": "inline:essay.txt"},
        )]
        print("essay.txt 不存在,已使用内置示例文本代替。")

    # PDF
    pdf_path = Path("paper.pdf")
    if pdf_path.exists():
        pdf_docs = PyPDFLoader(str(pdf_path)).load()
    else:
        pdf_docs = []
        print("paper.pdf 不存在,已跳过 PDF 读取。")

    # 网页
    web_url = "https://python.langchain.com/docs/introduction"
    try:
        web_docs = WebBaseLoader(web_url).load()
    except Exception as exc:
        web_docs = []
        print(f"网页读取失败,已跳过:{exc}")

    return text_docs + pdf_docs + web_docs

def split_documents(documents: list[Document]) -> list[Document]:
    """切块"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        length_function=len,
        add_start_index=True,
    )
    chunks = text_splitter.split_documents(documents)
    print(f"切块数: {len(chunks)}")
    return chunks

def build_vectorstore(chunks: list[Document], embeddings: OllamaEmbeddings) -> Chroma:
    """构建向量库"""
    return Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory="./chroma_db",
    )

def run_queries(vectorstore: Chroma) -> None:
    """检索测试"""
    # 相似度检索
    query = "LangChain 的核心概念是什么?"
    docs = vectorstore.similarity_search(query=query, k=3)
    for i, doc in enumerate(docs, 1):
        print(f"[{i}] {doc.page_content[:150]}")

    # 带分数的检索
    docs_with_scores = vectorstore.similarity_search_with_score(query=query, k=3)
    for doc, score in docs_with_scores:
        print(f"  [分数:{score:.4f}] {doc.page_content[:100]}")

    # Retriever
    retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
    retrieved_docs = retriever.invoke("LangChain 是什么?")
    print(f"Retriever 返回 {len(retrieved_docs)} 个文档块")

def main() -> None:
    embeddings = build_embeddings()
    documents = load_documents()
    for doc in documents:
        print(doc.metadata)
    
    chunks = split_documents(documents)
    for i, chunk in enumerate(chunks, 1):
        print(f"Chunk {i}: 长度={chunk.page_content.__len__()}, 元数据={chunk.metadata}")
    
    vectorstore = build_vectorstore(chunks, embeddings)
    run_queries(vectorstore)

if __name__ == "__main__":
    main()

demo05_what_when_how_wide

Demo 06 · RAG Chain ↑ 返回目录

代码整体功能

  • 把 Retriever、Prompt 和 LLM 组装成 RAG 问答链,并加入多轮问题改写能力。

学习目标

  • ✅ 掌握 RAG Chain 的 LCEL 组装
  • ✅ 理解 RunnablePassthrough 透传
  • ✅ 掌握多轮 RAG(condense query)

重点知识强化

组件说明    RunnablePassthrough:原样透传问题;retriever:负责检索;format_docs():把检索结果拼成字符串上下文;ChatPromptTemplate:组织回答提示词;StrOutputParser:输出字符串;get_buffer_string():把历史对话转成文本;ChatOpenAI / OllamaEmbeddings / Chroma:分别负责生成、向量化和向量库。
学习目标组件: RunnablePassthrough、retriever、format_docs、ChatPromptTemplate、StrOutputParser、get_buffer_string代码对应   rag_chain = {"question": RunnablePassthrough(), "context": retriever | format_docs} | prompt | llm | parser 负责检索+回答;condense_prompt 负责把多轮问题改写成独立查询;final_chain 负责把检索到的文档内容转成最终答案。 对比理解    主流程负责检索和回答,改写流程负责把“它”“这个功能”这类多轮问题转成独立查询;在这条链里 context 是字符串,不是消息列表。 常见坑    常见坑是检索结果没有来源、问题改写偏题、上下文过长导致超窗。建议加来源标签,先 rewrite 再 retrieval,回答里加“未找到”兜底。 真实场景    用户常常会说“这个功能”“它”“上一条那个问题”,而不是完整句子。这个 Demo 讲的是知识检索和多轮上下文怎么接起来。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo06_rag_chain.py

from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.messages import SystemMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.messages import HumanMessage, AIMessage, get_buffer_string
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

# 使用 MiniMax 作为 LLM
llm = ChatOpenAI(
    model="MiniMax-M2.7",
    base_url="https://api.minimaxi.com/v1",
    api_key=os.getenv("MINIMAX_API_KEY"),
    temperature=0.7,
    max_tokens=1000,
    timeout=60,
    max_retries=2,
)

# 使用 Ollama 作为 Embedding
embeddings = OllamaEmbeddings(
    model="qwen3-embedding:0.6b",
    base_url="http://localhost:11434",
)

vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})

def format_docs(docs):
    return "\n\n".join(f"[来源{i+1}] {d.page_content}" for i, d in enumerate(docs))

# RAG Chain
rag_chain = (
    {"question": RunnablePassthrough(), "context": retriever | format_docs}
    | ChatPromptTemplate.from_messages([
        SystemMessage(content=(
            "你是知识渊博的助手,基于文档片段回答。"
            "如果文档中没有答案,说「没有找到相关信息」,不要编造。"
        )),
        HumanMessagePromptTemplate.from_template("文档内容:\n{context}\n\n问题:{question}"),
    ])
    | llm
    | StrOutputParser()
)

result = rag_chain.invoke("LangChain 是什么?有哪些核心概念?")
print("RAG 结果:", result[:200])

# 多轮 RAG(合并历史问题)
chat_history = [
    HumanMessage(content="LangChain 支持哪些模型?"),
    AIMessage(content="LangChain 支持 OpenAI、Anthropic、Google Gemini、HuggingFace 等主流 LLM。"),
]

condense_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="把对话历史和最新问题合并成一个独立的问题。"),
    HumanMessagePromptTemplate.from_template("历史:{chat_history}\n\n最新问题:{question}"),
])

combined_query = (
    condense_prompt | llm | StrOutputParser()
).invoke({
    "chat_history": get_buffer_string(chat_history),
    "question": "那 embedding 模型呢?",
})
print("合并检索词:", combined_query)

docs = retriever.invoke(combined_query)
final_chain = (
    ChatPromptTemplate.from_messages([
        SystemMessage(content="基于文档回答问题。"),
        HumanMessagePromptTemplate.from_template("文档:\n{context}\n\n问题:{question}"),
    ])
    | llm
    | StrOutputParser()
)
answer = final_chain.invoke({"context": format_docs(docs), "question": combined_query})
print("最终答案:", answer)

demo06_what_when_how_wide

Demo 07 · Tool + Agent ↑ 返回目录

代码整体功能

  • 把普通函数声明为工具,并让 Agent 根据自然语言自动选择并调用对应工具。

学习目标

  • ✅ 掌握 @tool 装饰器定义工具
  • ✅ 掌握 create_agent 的用法

重点知识强化

组件说明   @tool:把普通函数注册成工具;create_agent:创建会自己选工具的 Agent;ChatOpenAI:驱动推理决策;multiply() / get_weather() / search_web():分别处理计算、天气和搜索。
学习目标组件:@toolcreate_agent、ChatOpenAI。 代码对应   @tool 负责把普通函数声明成工具;create_agent(model=llm, tools=[...], system_prompt=...) 负责创建 Agent;agent.invoke({"messages": [...]}) 负责接收自然语言并让模型自行决定调用哪个工具。 对比理解    工具是能力边界,Agent 是决策层;流程固定时直接写链更稳,动态选工具时再上 Agent。工具越多,描述越要清晰。 常见坑    常见坑是工具描述太抽象、返回值太长、入参缺少校验。建议最小权限开放工具,严格校验入参,返回结果尽量短且结构化。 真实场景    用户希望助手既会算数、又会查天气、还会搜索。这个 Demo 讲的就是让模型调用工具。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo07_tools_agent.py

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from dotenv import load_dotenv
import os

load_dotenv()

llm = ChatOpenAI(
    model="MiniMax-M2.7",
    base_url="https://api.minimaxi.com/v1",
    api_key=os.getenv("MINIMAX_API_KEY"),
    temperature=0.7,
    max_tokens=1000,
    timeout=60,
    max_retries=2,
)

@tool
def multiply(a: int, b: int) -> int:
    """计算两个整数相乘"""
    return a * b

@tool
def get_weather(city: str) -> str:
    """查询城市天气"""
    weather_db = {"深圳": "晴天28C", "北京": "多云22C", "上海": "小雨18C"}
    return weather_db.get(city, f"暂无{city}数据")

@tool
def search_web(query: str) -> str:
    """在互联网上搜索信息"""
    return f"关于「{query}」的搜索结果(模拟):来自 Wikipedia、知乎..."

def main() -> None:
    # 创建 Agent
    agent = create_agent(
        model=llm,
        tools=[multiply, get_weather, search_web],
        system_prompt=(
            "你是一个工具调用助手。"
            "遇到计算就用 multiply,遇到天气就用 get_weather,"
            "遇到搜索就用 search_web。"
        ),
    )

    result = agent.invoke({
        "messages": [{
            "role": "user",
            "content": "深圳天气怎么样?然后把37和42相乘。搜索下今日知乎热点",
        }]
    })
    print("最终输出:", result["messages"][-1].content)

if __name__ == "__main__":
    main()

demo07_what_when_how_wide

Demo 08 · Callback 可观测性 ↑ 返回目录

代码整体功能

  • 用回调监听链路与模型事件,配合流式输出实现运行过程观测与调试。

学习目标

  • ✅ 掌握 BaseCallbackHandler
  • ✅ 理解 on_chain_start/end/error 和 on_llm_start/end/new_token
  • ✅ 掌握 streaming 打字机效果

重点知识强化

组件说明    BaseCallbackHandler:定义回调接口;build_llm(streaming=True):开启流式输出;callbacks:运行时注入回调;streaming:让 token 边生成边打印;on_chain_start/end/erroron_llm_start/end/new_token:分别监听链路、模型和 token。
学习目标组件: BaseCallbackHandler、on_chain_starton_chain_endon_chain_erroron_llm_starton_llm_endon_llm_new_token、streaming。 代码对应   on_chain_* 监听链路开始、结束和错误;on_llm_* 监听模型开始、结束和 token;streaming_chain.stream(..., config={"callbacks": [...]}) 用于逐 token 打印和观测。 对比理解    普通调用看最终结果,流式调用看 token 生成;回调是观测手段,不是业务逻辑。链路越复杂,回调越重要。 常见坑    常见坑是不打开 streaming 就看不到 token 回调、回调里做重活拖慢响应、部分后端不返回完整 token 统计。建议回调只做轻量日志和指标,并结合 request id / session id 做追踪。 真实场景    线上服务里,最重要的是知道一次调用到底发生了什么。这个 Demo 讲的就是可观测性。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo08_callbacks.py

import logging
import os
from typing import Any, Dict, List

from dotenv import load_dotenv
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.output_parsers import StrOutputParser
from langchain_core.outputs import LLMResult
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def build_llm(*, streaming: bool = False) -> ChatOpenAI:
    return ChatOpenAI(
        model="MiniMax-M2.7",
        base_url="https://api.minimaxi.com/v1",
        api_key=os.getenv("MINIMAX_API_KEY"),
        temperature=0.7,
        max_tokens=1000,
        streaming=streaming,
        timeout=60,
        max_retries=2,
    )

class MyCallbackHandler(BaseCallbackHandler):
    def on_chain_start(self, serialized: Dict, inputs: Dict, **kwargs) -> None:
        name = serialized.get("name") if isinstance(serialized, dict) else str(serialized)
        logger.info(f"[Chain 开始] name={name}")

    def on_chain_end(self, outputs: Dict, **kwargs) -> None:
        logger.info(f"[Chain 结束] {str(outputs)[:50]}")

    def on_chain_error(self, error: Exception, **kwargs) -> None:
        logger.error(f"[Chain 错误] {error}")

    def on_llm_start(self, serialized: Dict, prompts: List[str], **kwargs) -> None:
        logger.info(f"[LLM 开始] {str(prompts)[:80]}...")

    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        gen = response.generations[0][0]
        text = getattr(gen, "text", "") or getattr(getattr(gen, "message", None), "content", "")
        logger.info(f"[LLM 结束] {text[:50]}...")
        if response.llm_output and "token_usage" in response.llm_output:
            usage = response.llm_output["token_usage"]
            logger.info(f"[Token] total={usage.get('total_tokens', 'N/A')}")

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(token, end="", flush=True)

class StreamingCallbackHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(token, end="", flush=True)

def main() -> None:
    prompt = ChatPromptTemplate.from_template("用三句话讲一个关于 {topic} 的笑话")

    # 流式调用
    streaming_llm = build_llm(streaming=True)
    streaming_chain = prompt | streaming_llm | StrOutputParser()
    streaming_handler = StreamingCallbackHandler()
    print("\n流式输出:")
    streaming_chunks = []
    for chunk in streaming_chain.stream(
        {"topic": "Python 编程"},
        config={"callbacks": [streaming_handler]},
    ):
        streaming_chunks.append(chunk)
    streaming_result = "".join(streaming_chunks)
    print("\n最终流式结果:", streaming_result)

if __name__ == "__main__":
    main()

demo08_what_when_how_wide

Demo 09 · LCEL 进阶 ↑ 返回目录

代码整体功能

  • 组合 RunnablePassthrough / Parallel / Branch / Lambda,实现透传、并行、分流和后处理的复合链路。

学习目标

  • ✅ 掌握 RunnableParallel:并行执行
  • ✅ 掌握 RunnableBranch:条件分支
  • ✅ 掌握 RunnableLambda:自定义处理

重点知识强化

组件说明    RunnablePassthrough:保留原始输入;RunnableParallel:并行执行多个子链;RunnableBranch:按条件分流;RunnableLambda:插入自定义处理函数;ChatPromptTemplate / StrOutputParser / SystemMessage / HumanMessagePromptTemplate / ChatOpenAI:分别负责提示词、解析、系统约束、用户输入和模型调用。
学习目标组件: RunnablePassthrough、RunnableParallel、RunnableBranch、RunnableLambda。 代码对应   parallel_chain 负责并行输出;branch_chain 负责条件分流;add_suffix() 负责后处理;cot_chain 负责用提示词引导步骤化推理。 对比理解    Parallel 是同时做多件事,Branch 是按条件走不同路;Passthrough 保留原始输入,Lambda 插入自定义逻辑。组合得越好,链路越灵活。 常见坑    常见坑是分支条件太粗、并行输出结构不统一、推理步骤暴露过多。建议并行输出统一 schema,分支规则先简单后复杂,状态和流程太复杂时再考虑迁移到 LangGraph。 真实场景    一个请求往往要同时产出多个结果,或者根据不同输入走不同分支。这个 Demo 讲的就是 LCEL 的组合、分流和后处理。
知识模块      内容
🔴点击展开代码🔴

完整代码

# 文件:demo09_runnable_advanced.py

from dotenv import load_dotenv
from langchain_core.messages import SystemMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.runnables import (
    RunnableBranch,
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)
from langchain_openai import ChatOpenAI
import os

load_dotenv()

def build_llm() -> ChatOpenAI:
    return ChatOpenAI(
        model="MiniMax-M2.7",
        base_url="https://api.minimaxi.com/v1",
        api_key=os.getenv("MINIMAX_API_KEY"),
        temperature=0.7,
        max_tokens=1000,
        timeout=60,
        max_retries=2,
    )

def run_passthrough_demo() -> None:
    """RunnablePassthrough:原样透传输入"""
    passthrough_demo = RunnablePassthrough()
    print("透传结果:", passthrough_demo.invoke({"topic": "LangChain"}))

def run_parallel_demo(llm: ChatOpenAI) -> None:
    """RunnableParallel:并行执行多任务"""
    parallel_chain = RunnableParallel({
        "openai_answer": (
            ChatPromptTemplate.from_template("{topic} 是什么?用一句话回答。")
            | llm
            | StrOutputParser()
        ),
        "word_count": RunnableLambda(
            lambda x: f"「{x['topic']}」字符数:{len(x['topic'])}"
        ),
    })
    result = parallel_chain.invoke({"topic": "LangChain"})
    print("并行结果:", result)

def is_code(x: dict) -> bool:
    return "def " in x["input"] or "class " in x["input"]

def is_math(x: dict) -> bool:
    return any(k in x["input"] for k in ["+", "-", "*", "/"])

def run_branch_demo() -> None:
    """RunnableBranch:条件分支"""
    branch_chain = RunnableBranch(
        (RunnableLambda(is_code), RunnableLambda(lambda x: f"[代码] {x['input']}")),
        (RunnableLambda(is_math), RunnableLambda(lambda x: f"[数学] {x['input']}")),
        RunnableLambda(lambda x: f"[普通] {x['input']}"),
    )

    print(branch_chain.invoke({"input": "def hello(): pass"}))
    print(branch_chain.invoke({"input": "1 + 2 = 3"}))
    print(branch_chain.invoke({"input": "今天天气好"}))

def add_suffix(text: str) -> str:
    """后处理:给输出追加后缀"""
    return text + "\n\n[由 LCEL 处理]"

def run_postprocess_demo(llm: ChatOpenAI) -> None:
    """RunnableLambda 后处理"""
    chain = (
        ChatPromptTemplate.from_template("介绍一下 {topic},不少于50字")
        | llm
        | StrOutputParser()
        | RunnableLambda(add_suffix)
    )
    result = chain.invoke({"topic": "Python 编程语言"})
    print("后处理结果:", result)

def run_cot_demo(llm: ChatOpenAI) -> None:
    """思维链 CoT"""
    cot_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=(
            "你是逻辑推理助手。按步骤回答:"
            "步骤1理解问题 步骤2列关键信息 步骤3逐步推理 步骤4给出答案。"
            "用【步骤1】【步骤2】【步骤3】【步骤4】格式回答。"
        )),
        HumanMessagePromptTemplate.from_template("问题:{question}"),
    ])

    cot_chain = cot_prompt | llm | StrOutputParser()
    result = cot_chain.invoke({
        "question": "如果所有的猫都喜欢鱼,A是猫,B喜欢鱼,那么A和B一定都是猫吗?"
    })
    print("CoT 结果:", result)

def main() -> None:
    llm = build_llm()
    run_passthrough_demo()
    run_parallel_demo(llm)
    run_branch_demo()
    run_postprocess_demo(llm)
    run_cot_demo(llm)

if __name__ == "__main__":
    main()

demo09_what_when_how_wide

Demo 10 · FastAPI API 服务与 SSE 流式聊天 ↑ 返回目录

代码整体功能

  • utils_llm_service.py:封装 LangChain 主链(build_chain())和调用能力(invoke/raw/batch/stream)。
  • demo10_api_service.py:把链路能力暴露为 /chat/chat/raw/batch/chat/stream/prompt/health
  • demo10_chat_ui.html:调用接口并展示普通/流式结果;utils_http_hooks.py:统一日志和异常处理。
  • 整体调用链为:demo10_chat_ui.html -> demo10_api_service.py -> utils_llm_service.py -> MiniMax
    image

学习目标

  • ✅ 学会把 LCEL 链路封装成可复用 Service
  • ✅ 学会 invoke/raw/batch/stream 四种调用分层
  • ✅ 学会思考标签清洗与 answer/think 拆分
  • ✅ 学会用 SSE 把流式结果推给前端
  • ✅ 理解 Uvicorn:FastAPI 的 ASGI 运行服务器

重点知识强化

组件说明   utils_llm_service.py:LangChain 链路与调用封装。
demo10_api_service.py:HTTP 路由与协议输出封装。
utils_http_hooks.py:请求日志与异常处理封装。
demo10_chat_ui.html:接口调试与交互展示页面。 代码对应   utils_llm_service.pybuild_chain() 组主链,invoke_with_meta/raw/batch/stream_with_thinking 提供核心调用。demo10_api_service.pychat/chat_raw/batch_chat/chat_stream 映射四类请求,sse_stream() 输出 SSE 事件。 对比理解    分层好处:易维护、易测试、易替换。utils_llm_service.py 管链路逻辑,demo10_api_service.py 管协议路由,demo10_chat_ui.html 管交互,utils_http_hooks.py 管日志异常。前端请求选型:chat 拿最终结果,stream 实时展示,raw 联调排错,batch 批量任务。 常见坑    CORS(allow_credentials=True* 的组合限制、预检 OPTIONS
SSE 协议格式(text/event-stream、事件块空行分隔)
StreamingResponse 生成器取消与中断点
GZipMiddleware 对流式响应的缓冲延迟
Uvicorn/代理配置(keep-alive、并发上限 503root_path真实场景   /chat:适合客服问答、后台内容生成,对应 utils_llm_service.pyinvoke_with_meta()(底层 chain.invoke(...))。
/chat/stream:适合网页助手、IDE 实时交互,对应 stream_with_thinking()(底层 chain.stream(...))。
/chat/raw:适合 Prompt/解析器联调,对应 invoke_raw()
/batch:适合离线批量改写、摘要、分类,对应 batch()(底层 chain.batch(...))。
/prompt:适合运营在线调参,对应 set_prompt() 重建链路。 Uvicorn 介绍    Uvicorn 是 Python 生态主流 ASGI 服务器,负责把 FastAPI 应用真正跑起来(监听端口、处理并发连接、承接 HTTP 生命周期)。常用方式是直接启动 uvicorn demo10_api_service:app --host 0.0.0.0 --port 8000,生产常配 --workers 或由 Gunicorn 托管 UvicornWorker。行业里它是 FastAPI/Starlette 部署的事实标准之一,文档、社区和云平台支持都很成熟。
知识模块      内容
🔴点击展开代码🔴

完整代码 demo10_api_service.py

# ========== Demo10: 把 LangChain 变成 API 服务 ==========
# 文件:demo10_api_service.py
#
# 这个示例演示:
# 1. 用 FastAPI 把 LangChain 链路封装成 HTTP API;
# 2. 提供普通调用接口(/chat)和流式输出接口(/chat/stream);
# 3. 使用 Uvicorn 启动服务,便于本地开发和后续生产部署改造;
# 4. 把“HTTP Hook”和“LLM 调用”分别拆到独立模块,降低耦合。
#
# 启动方式:
# uvicorn demo10_api_service:app --host 0.0.0.0 --port 8000 --reload

from contextlib import asynccontextmanager
import json
import logging
import os
from pathlib import Path
from typing import Iterator

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, Field

from utils_http_hooks import register_http_hooks
from utils_llm_service import UtilsLLMService

load_dotenv()

# 应用全局日志,供路由和 hooks 复用。
logger = logging.getLogger("demo10.api")
if not logger.handlers:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    )


class ChatRequest(BaseModel):
    """聊天请求体。"""

    question: str = Field(..., min_length=1, description="用户问题")


class ChatResponse(BaseModel):
    """聊天响应体。"""

    answer: str
    thinking: str | None = None


class RawChatResponse(BaseModel):
    """原始聊天响应体。"""

    raw_answer: str


class BatchRequest(BaseModel):
    """批量提问请求体。"""

    questions: list[str] = Field(..., min_length=1, description="问题列表")


class BatchResponse(BaseModel):
    """批量提问响应体。"""

    answers: list[str]


class PromptRequest(BaseModel):
    """系统提示词请求体。"""

    system_prompt: str = Field(..., min_length=1, description="系统提示词")


class PromptResponse(BaseModel):
    """系统提示词响应体。"""

    system_prompt: str


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期 Hook:
    - `yield` 前:初始化 UtilsLLMService;
    - `yield` 后:做资源清理。
    """
    service = UtilsLLMService()
    service.initialize()
    app.state.llm_service = service

    if service.ready:
        logger.info("llm init success")
    else:
        logger.error("llm init failed: %s", service.init_error)

    yield

    service.shutdown()
    logger.info("app shutdown complete")


app = FastAPI(
    title="Demo10 LangChain API",
    description="使用 FastAPI + Uvicorn 暴露 LangChain 链路能力",
    version="1.0.0",
    lifespan=lifespan,
)

# 允许前端跨域访问(例如 VSCode Live Server:5500)。
# 也支持通过环境变量 CORS_ALLOW_ORIGINS 覆盖,多个 origin 用英文逗号分隔。
default_cors_origins = [
    "http://127.0.0.1:5500",
    "http://localhost:5500",
    "http://127.0.0.1:8000",
    "http://localhost:8000",
]
env_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "")
allow_origins = [x.strip() for x in env_cors_origins.split(",") if x.strip()] or default_cors_origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=allow_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册请求中间件和全局异常处理器。
register_http_hooks(app, logger)

# UI 页面文件路径(和当前 py 文件放在同一目录)。
CHAT_UI_FILE = Path(__file__).with_name("demo10_chat_ui.html")


def get_service() -> UtilsLLMService:
    """读取已初始化的 LLM Service,不可用时抛出 503。"""
    service: UtilsLLMService | None = getattr(app.state, "llm_service", None)
    if service is None:
        raise HTTPException(status_code=503, detail="service unavailable: llm service missing")
    if not service.ready:
        detail = service.init_error or "chain is not initialized"
        raise HTTPException(status_code=503, detail=f"service unavailable: {detail}")
    return service


@app.get("/health")
def health() -> dict:
    """健康检查接口,便于网关/监控系统探活。"""
    service: UtilsLLMService | None = getattr(app.state, "llm_service", None)
    if service is None:
        return {"status": "degraded", "llm_ready": False, "reason": "llm service missing"}
    if not service.ready:
        return {"status": "degraded", "llm_ready": False, "reason": service.init_error}
    return {"status": "ok", "llm_ready": True}


@app.get("/chat-ui")
def chat_ui() -> FileResponse:
    """返回一个简单网页,用于调用 /chat 和 /chat/stream。"""
    if not CHAT_UI_FILE.exists():
        raise HTTPException(status_code=404, detail="chat ui file not found")
    return FileResponse(CHAT_UI_FILE)


@app.get("/prompt", response_model=PromptResponse)
def get_prompt() -> PromptResponse:
    """读取当前系统提示词。"""
    service = get_service()
    return PromptResponse(system_prompt=service.system_prompt)


@app.post("/prompt", response_model=PromptResponse)
def set_prompt(req: PromptRequest) -> PromptResponse:
    """更新系统提示词,并重建链路。"""
    service = get_service()
    service.set_prompt(req.system_prompt)
    return PromptResponse(system_prompt=service.system_prompt)


@app.post("/chat", response_model=ChatResponse)
def chat(req: ChatRequest) -> ChatResponse:
    """普通调用:返回完整答案。"""
    service = get_service()
    answer, thinking = service.invoke_with_meta(req.question)
    return ChatResponse(answer=answer, thinking=thinking or None)


@app.post("/chat/raw", response_model=RawChatResponse)
def chat_raw(req: ChatRequest) -> RawChatResponse:
    """返回模型原始输出,不做清洗。"""
    service = get_service()
    raw_answer = service.invoke_raw(req.question)
    return RawChatResponse(raw_answer=raw_answer)


@app.post("/batch", response_model=BatchResponse)
def batch_chat(req: BatchRequest) -> BatchResponse:
    """批量提问:一次提交多个问题,返回清洗后的答案列表。"""
    service = get_service()
    answers = service.batch(req.questions)
    return BatchResponse(answers=answers)


def _sse_event(event: str, payload: dict) -> str:
    """把字典数据编码成标准 SSE 事件格式。"""
    return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"


def sse_stream(question: str) -> Iterator[str]:
    """
    把 LangChain 的流式输出包装成 SSE:
    - status: 阶段提示
    - think: 思考片段(如果模型返回了 think/reasoning/analysis 标签)
    - answer: 最终答案文本块
    - done: 正常结束
    - error: 失败结束
    """
    try:
        service = get_service()
        yield _sse_event("status", {"message": "请求已接收,模型开始处理..."})
        think_preview = ""

        for kind, chunk in service.stream_with_thinking(question):
            if kind == "think":
                think_preview += chunk
                # 仅发送最近一小段思考片段,避免状态消息过长。
                preview = think_preview[-120:].strip()
                if preview:
                    yield _sse_event("think", {"text": chunk, "preview": preview})
            else:
                yield _sse_event("answer", {"text": chunk})

        yield _sse_event("done", {"message": "生成完成"})
    except Exception as exc:
        # 流式响应一旦开始,无法再走标准 HTTP 错误码,故用 SSE 错误事件回传。
        yield _sse_event("error", {"message": str(exc)})


@app.post("/chat/stream")
def chat_stream(req: ChatRequest) -> StreamingResponse:
    """流式调用:边生成边返回,适合前端实时显示打字效果。"""
    # 先做一次可用性检查,避免明知不可用还建立流连接。
    get_service()
    return StreamingResponse(
        sse_stream(req.question),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
    )


if __name__ == "__main__":
    # 允许 python 直接运行本文件,和命令行 uvicorn 启动二选一。
    import uvicorn

    uvicorn.run("demo10_api_service:app", host="0.0.0.0", port=8000, reload=True)

完整代码 utils_http_hooks.py

# ========== Utils: HTTP Hook 封装 ==========
# 文件:utils_http_hooks.py
#
# 这个文件只负责 FastAPI 的“请求级/异常级” Hook:
# 1. HTTP middleware(request_id、耗时日志);
# 2. 全局异常处理(HTTPException、ValidationError、兜底 Exception)。

import logging
import time
import uuid

from fastapi import FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse


def register_http_hooks(app: FastAPI, logger: logging.Logger) -> None:
    """注册 middleware 和异常处理器。"""

    @app.middleware("http")
    async def request_context_middleware(request: Request, call_next):
        # 优先透传网关请求 ID,没有就自动生成,便于排查链路日志。
        request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
        request.state.request_id = request_id
        request.state.start_time = time.perf_counter()

        try:
            response = await call_next(request)
        except Exception:
            # 这里只做日志,异常仍抛给全局处理器。
            cost_ms = (time.perf_counter() - request.state.start_time) * 1000
            logger.exception(
                "[UNHANDLED] %s %s | request_id=%s | cost_ms=%.2f",
                request.method,
                request.url.path,
                request_id,
                cost_ms,
            )
            raise

        cost_ms = (time.perf_counter() - request.state.start_time) * 1000
        response.headers["X-Request-ID"] = request_id
        response.headers["X-Process-Time-Ms"] = f"{cost_ms:.2f}"
        logger.info(
            "[OK] %s %s -> %s | request_id=%s | cost_ms=%.2f",
            request.method,
            request.url.path,
            response.status_code,
            request_id,
            cost_ms,
        )
        return response

    def _request_id(request: Request) -> str:
        """从 request.state 安全读取 request_id。"""
        return getattr(request.state, "request_id", "N/A")

    @app.exception_handler(HTTPException)
    async def http_exception_handler(request: Request, exc: HTTPException):
        """处理业务层主动抛出的 HTTPException(如 400/404/503)。"""
        request_id = _request_id(request)
        logger.warning(
            "[HTTPException] %s %s -> %s | request_id=%s | detail=%s",
            request.method,
            request.url.path,
            exc.status_code,
            request_id,
            exc.detail,
        )
        return JSONResponse(
            status_code=exc.status_code,
            content={
                "error": {
                    "type": "http_exception",
                    "message": str(exc.detail),
                    "request_id": request_id,
                }
            },
        )

    @app.exception_handler(RequestValidationError)
    async def validation_exception_handler(request: Request, exc: RequestValidationError):
        """处理参数校验失败(FastAPI/Pydantic 自动抛出)。"""
        request_id = _request_id(request)
        logger.warning(
            "[ValidationError] %s %s | request_id=%s | errors=%s",
            request.method,
            request.url.path,
            request_id,
            exc.errors(),
        )
        return JSONResponse(
            status_code=422,
            content={
                "error": {
                    "type": "validation_error",
                    "message": "request validation failed",
                    "details": exc.errors(),
                    "request_id": request_id,
                }
            },
        )

    @app.exception_handler(Exception)
    async def unhandled_exception_handler(request: Request, exc: Exception):
        """兜底处理未捕获异常,避免把回溯细节直接暴露给客户端。"""
        request_id = _request_id(request)
        logger.exception(
            "[UnhandledException] %s %s | request_id=%s",
            request.method,
            request.url.path,
            request_id,
        )
        return JSONResponse(
            status_code=500,
            content={
                "error": {
                    "type": "internal_error",
                    "message": "internal server error",
                    "request_id": request_id,
                }
            },
        )

完整代码 utils_llm_service.py

# ========== Utils: LLM Service 封装 ==========
# 文件:utils_llm_service.py
#
# 这个文件只负责“模型和链路调用”相关逻辑:
# 1. 创建 LLM 实例;
# 2. 构建 LCEL 链;
# 3. 提供 initialize / invoke / stream / shutdown 方法;
# 4. 不依赖 FastAPI,便于单测和复用。

import os
import re
from typing import Iterator

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 这些标签通常是“思考过程”或“中间推理”,对前端展示不是最终答案。
HIDDEN_TAG_NAMES = ("think", "reasoning", "analysis")
HIDDEN_SECTION_PATTERN = re.compile(
    r"<\s*(think|reasoning|analysis)\b[^>]*>.*?<\s*/\s*\1\s*>",
    flags=re.IGNORECASE | re.DOTALL,
)
HIDDEN_CONTENT_PATTERN = re.compile(
    r"<\s*(think|reasoning|analysis)\b[^>]*>(.*?)<\s*/\s*\1\s*>",
    flags=re.IGNORECASE | re.DOTALL,
)
DEFAULT_SYSTEM_PROMPT = (
    "你是一个清晰、耐心的技术助手。回答要准确、结构清楚、尽量简洁。"
    "只输出最终答案,不要输出<think>、<reasoning>、<analysis>等中间思考标签。"
)


def build_llm() -> ChatOpenAI:
    """创建聊天模型实例。"""
    # 从环境变量读取 MiniMax 的 API Key。
    api_key = os.getenv("MINIMAX_API_KEY")
    # 如果没配置 key,直接抛异常,避免后续请求才报错。
    if not api_key:
        # 抛出运行时错误,提示调用方先设置 MINIMAX_API_KEY。
        raise RuntimeError("MINIMAX_API_KEY is not set")

    # 返回一个 ChatOpenAI 客户端实例(这里通过兼容接口连接 MiniMax)。
    return ChatOpenAI(
        # 指定要使用的模型名称。
        model="MiniMax-M2.7",
        # 指定服务端 API 基地址(MiniMax 兼容 OpenAI 协议入口)。
        base_url="https://api.minimaxi.com/v1",
        # 传入鉴权密钥,用于请求签名认证。
        api_key=api_key,
        # 控制随机性,值越高越发散,值越低越稳定。
        temperature=0.7,
        # 限制单次回复的最大 token 数,防止返回过长。
        max_tokens=1000,
        # 设置单次请求超时时间(秒),避免长时间阻塞。
        timeout=60,
        # 失败时自动重试次数,提高临时网络抖动下的稳定性。
        max_retries=2,
    )


def build_chain(system_prompt: str = DEFAULT_SYSTEM_PROMPT):
    """构建基础 LCEL 链路:Prompt -> LLM -> StrOutputParser。"""
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            ("human", "{question}"),
        ]
    )
    return prompt | build_llm() | StrOutputParser()


class UtilsLLMService:
    """封装链路生命周期和调用细节。"""

    def __init__(self) -> None:
        self.chain = None
        self.init_error: str | None = None
        self.system_prompt: str = DEFAULT_SYSTEM_PROMPT

    @property
    def ready(self) -> bool:
        """链路是否已经可用。"""
        return self.chain is not None

    def initialize(self) -> None:
        """启动阶段初始化链路。"""
        try:
            self.chain = build_chain(self.system_prompt)
            self.init_error = None
        except Exception as exc:
            self.chain = None
            self.init_error = str(exc)

    def shutdown(self) -> None:
        """关闭阶段清理资源。"""
        self.chain = None

    def set_system_prompt(self, system_prompt: str) -> None:
        """更新系统提示词,并重建链路。"""
        self.system_prompt = str(system_prompt or "").strip() or DEFAULT_SYSTEM_PROMPT
        if self.ready:
            self.chain = build_chain(self.system_prompt)

    def set_prompt(self, system_prompt: str) -> None:
        """set_system_prompt 的别名,方便外部语义化调用。"""
        self.set_system_prompt(system_prompt)

    def _require_chain(self):
        """拿到可用链路,不可用时抛出运行时异常。"""
        if self.chain is None:
            detail = self.init_error or "chain is not initialized"
            raise RuntimeError(f"service unavailable: {detail}")
        return self.chain

    def _clean_text_output(self, raw_output) -> str:
        """
        清洗模型输出,只保留可展示的有效答案文本。
        - 先转字符串;
        - 去掉 <think>/<reasoning>/<analysis> 包裹内容;
        - 再做首尾空白清理。
        """
        text = str(raw_output or "")
        # 反复替换,兼容多个隐藏段落。
        previous = None
        while previous != text:
            previous = text
            text = HIDDEN_SECTION_PATTERN.sub("", text)

        # 兼容“不完整标签”场景:有开标签但没闭合时,丢弃开标签后的内容。
        text = re.sub(
            r"<\s*(think|reasoning|analysis)\b[^>]*>.*$",
            "",
            text,
            flags=re.IGNORECASE | re.DOTALL,
        )
        # 清理孤立闭标签。
        text = re.sub(r"</\s*(think|reasoning|analysis)\s*>", "", text, flags=re.IGNORECASE)
        return text.strip()

    def _clean_answer_fragment(self, fragment: str) -> str:
        """
        清理流式答案片段(轻量版):
        - 去掉残留隐藏标签;
        - 保留原始空格和换行,不做 strip,避免拼接后词语粘连。
        """
        text = str(fragment or "")
        text = re.sub(r"</?\s*(think|reasoning|analysis)\b[^>]*>", "", text, flags=re.IGNORECASE)
        return text

    def _clean_stream_chunks(self, chunks: Iterator[str]) -> Iterator[str]:
        """
        增量清洗流式输出。
        目标:即使标签被拆到不同 chunk,也尽量不把隐藏内容透传给前端。
        """
        open_pattern = re.compile(
            r"<\s*(think|reasoning|analysis)\b[^>]*>",
            flags=re.IGNORECASE,
        )
        close_pattern = re.compile(
            r"</\s*(think|reasoning|analysis)\s*>",
            flags=re.IGNORECASE,
        )
        # 为了处理“标签被切分到多个 chunk”的场景,保留一段尾部缓冲。
        hold_back = 64
        in_hidden = False
        buffer = ""

        for raw_chunk in chunks:
            piece = str(raw_chunk or "")
            if not piece:
                continue
            buffer += piece

            while True:
                if not in_hidden:
                    match_open = open_pattern.search(buffer)
                    if not match_open:
                        # 暂留尾巴,避免把不完整标签片段提前输出。
                        if len(buffer) <= hold_back:
                            break
                        emit_text = buffer[:-hold_back]
                        buffer = buffer[-hold_back:]
                        cleaned = self._clean_text_output(emit_text)
                        if cleaned:
                            yield cleaned
                        break

                    # 遇到开标签:先输出标签前内容,再进入隐藏区。
                    visible = buffer[: match_open.start()]
                    cleaned_visible = self._clean_text_output(visible)
                    if cleaned_visible:
                        yield cleaned_visible
                    buffer = buffer[match_open.end() :]
                    in_hidden = True
                else:
                    match_close = close_pattern.search(buffer)
                    if not match_close:
                        # 隐藏区里没有闭标签时,丢弃大部分内容,仅保留尾巴用于拼接闭标签。
                        if len(buffer) > hold_back:
                            buffer = buffer[-hold_back:]
                        break
                    # 找到闭标签后,移除隐藏内容,继续正常输出。
                    buffer = buffer[match_close.end() :]
                    in_hidden = False

        if not in_hidden and buffer:
            cleaned_tail = self._clean_text_output(buffer)
            if cleaned_tail:
                yield cleaned_tail

    def _extract_hidden_text(self, raw_output) -> str:
        """
        提取模型输出里的隐藏思考内容(如 think/reasoning/analysis 标签内文本)。
        如果没有相关标签,返回空字符串。
        """
        text = str(raw_output or "")
        segments: list[str] = []
        for match in HIDDEN_CONTENT_PATTERN.finditer(text):
            content = match.group(2).strip()
            if content:
                segments.append(content)
        return "\n".join(segments)

    def _split_stream_visible_hidden(self, chunks: Iterator[str]) -> Iterator[tuple[str, str]]:
        """
        把流式 chunk 拆成两类事件:
        - ("answer", 可展示答案文本)
        - ("think", 标签内思考文本)
        这样上层可以一边展示答案,一边给用户“正在思考”的状态提示。
        """
        open_pattern = re.compile(  # 匹配 think/reasoning/analysis 开始标签。
            r"<\s*(think|reasoning|analysis)\b[^>]*>",  # 支持标签内带属性。
            flags=re.IGNORECASE,  # 标签名大小写不敏感。
        )  # 开始标签正则定义结束。
        hold_back = 64  # 尾部缓冲长度,避免把被切断的标签片段提前输出。
        in_hidden = False  # 标记当前是否处于隐藏思考片段内部。
        current_tag = ""  # 记录当前打开的标签名,用于匹配对应闭标签。
        buffer = ""  # 流式拼接缓冲区,承接跨 chunk 的文本。

        for raw_chunk in chunks:  # 逐个处理底层模型返回的 chunk。
            piece = str(raw_chunk or "")  # 把 chunk 统一转成字符串(兼容 None)。
            if not piece:  # 空 chunk 直接跳过,避免无意义处理。
                continue  # 进入下一轮 chunk。
            buffer += piece  # 把当前 chunk 追加到缓冲区。

            while True:  # 对当前缓冲区循环消费,直到不能再安全解析为止。
                if not in_hidden:  # 当前不在隐藏区,优先找开标签。
                    open_match = open_pattern.search(buffer)  # 查找最近的隐藏开标签。
                    if not open_match:  # 没找到开标签,说明当前内容都属于可见答案。
                        if len(buffer) <= hold_back:  # 缓冲长度太短时先保留,防止尾部是半截标签。
                            break  # 暂停消费,等待下一 chunk 拼接后再判断。
                        emit_text = buffer[:-hold_back]  # 安全输出前半部分,把尾巴留在缓冲区。
                        buffer = buffer[-hold_back:]  # 把尾部保留用于下轮拼接解析。
                        if emit_text:  # 只有有内容时才产出事件。
                            yield ("answer", emit_text)  # 产出可见答案片段。
                        break  # 当前缓冲区已处理到安全边界,等待下一 chunk。

                    visible = buffer[: open_match.start()]  # 截取开标签前的可见内容。
                    if visible:  # 可见内容非空时才输出。
                        yield ("answer", visible)  # 产出答案事件。
                    current_tag = open_match.group(1)  # 记录开标签名(think/reasoning/analysis)。
                    buffer = buffer[open_match.end() :]  # 从缓冲区移除已处理部分(含开标签)。
                    in_hidden = True  # 状态切换为“已进入隐藏区”。
                else:  # 当前位于隐藏区,需要找对应闭标签。
                    close_pattern = re.compile(  # 为当前标签名动态构造闭标签正则。
                        rf"</\s*{re.escape(current_tag)}\s*>",  # 只匹配与当前开标签同名的闭标签。
                        flags=re.IGNORECASE,  # 闭标签大小写不敏感。
                    )  # 闭标签正则定义结束。
                    close_match = close_pattern.search(buffer)  # 在隐藏区中查找闭标签。
                    if not close_match:  # 若还没等到闭标签,说明隐藏内容可能被拆包了。
                        if len(buffer) <= hold_back:  # 缓冲区太短时先全部保留,继续等待拼接。
                            break  # 暂停消费当前缓冲区。
                        think_text = buffer[:-hold_back]  # 先输出安全部分隐藏文本,尾巴继续保留。
                        buffer = buffer[-hold_back:]  # 保留尾部用于拼接潜在闭标签。
                        if think_text:  # 仅当文本非空时产出事件。
                            yield ("think", think_text)  # 产出思考文本事件。
                        break  # 到达安全边界,等待下一 chunk。

                    think_text = buffer[: close_match.start()]  # 截取闭标签前的隐藏文本。
                    if think_text:  # 隐藏文本非空才输出。
                        yield ("think", think_text)  # 产出思考事件。
                    buffer = buffer[close_match.end() :]  # 从缓冲区移除隐藏内容和闭标签。
                    in_hidden = False  # 退出隐藏区,回到可见区解析流程。
                    current_tag = ""  # 清空当前标签名,避免影响后续匹配。

        if buffer:  # 流结束后如果缓冲区还有残留,按当前状态收尾输出。
            if in_hidden:  # 若结束时仍在隐藏区,残留文本按思考内容处理。
                yield ("think", buffer)  # 产出尾部思考文本。
            else:  # 若不在隐藏区,残留文本属于可见答案。
                yield ("answer", buffer)  # 产出尾部答案文本。

    def invoke_with_meta(self, question: str) -> tuple[str, str]:
        """
        普通调用(带元信息):
        - answer: 清洗后的最终答案
        - thinking: 提取到的思考片段(可能为空字符串)
        """
        chain = self._require_chain()
        raw_text = chain.invoke({"question": question})
        answer = self._clean_text_output(raw_text)
        thinking = self._extract_hidden_text(raw_text)
        return answer, thinking

    def invoke_raw(self, question: str) -> str:
        """普通调用:返回模型原始输出,不做清洗。"""
        chain = self._require_chain()
        return str(chain.invoke({"question": question}) or "")

    def invoke(self, question: str) -> str:
        """普通调用:返回完整文本。"""
        answer, _ = self.invoke_with_meta(question)
        return answer

    def batch(self, questions: list[str]) -> list[str]:
        """批量调用:输入多个问题,返回清洗后的答案列表。"""
        chain = self._require_chain()
        inputs = [{"question": question} for question in questions]
        raw_outputs = chain.batch(inputs)
        return [self._clean_text_output(output) for output in raw_outputs]

    def stream_with_thinking(self, question: str) -> Iterator[tuple[str, str]]:
        """
        流式调用(带事件类型):
        - ("answer", 文本):可直接拼接到最终回答;
        - ("think", 文本):可用作“模型正在思考”的状态提示。
        """
        chain = self._require_chain()
        raw_chunks = chain.stream({"question": question})
        for kind, text in self._split_stream_visible_hidden(raw_chunks):
            if kind == "answer":
                cleaned = self._clean_answer_fragment(text)
                if cleaned:
                    yield ("answer", cleaned)
            elif text.strip():
                yield ("think", text)

    def stream(self, question: str) -> Iterator[str]:
        """流式调用:逐块返回文本。"""
        for kind, text in self.stream_with_thinking(question):
            if kind == "answer":
                yield text

完整代码 demo10_chat_ui.html

<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="UTF-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Demo10 Chat UI</title>
  <style>
    :root {
      --bg: #f4f7fb;
      --panel: #ffffff;
      --text: #162131;
      --muted: #5e6b7b;
      --primary: #0c66e4;
      --primary-2: #2f80ed;
      --border: #d9e2ee;
      --ok: #18794e;
      --bad: #c62828;
    }

    * {
      box-sizing: border-box;
    }

    body {
      margin: 0;
      background: linear-gradient(145deg, #edf3ff 0%, #f9fbff 55%, #f4f7fb 100%);
      color: var(--text);
      font-family: "Avenir Next", "PingFang SC", "Microsoft YaHei", sans-serif;
      min-height: 100vh;
      display: flex;
      justify-content: center;
      padding: 16px;
    }

    .app {
      width: min(980px, 100%);
      background: var(--panel);
      border: 1px solid var(--border);
      border-radius: 14px;
      box-shadow: 0 12px 40px rgba(17, 45, 78, 0.08);
      display: grid;
      grid-template-rows: auto auto 1fr auto;
      gap: 10px;
      padding: 12px;
    }

    .title {
      display: flex;
      justify-content: space-between;
      align-items: center;
      gap: 12px;
    }

    h1 {
      margin: 0;
      font-size: 20px;
      letter-spacing: 0.2px;
    }

    .status {
      color: var(--muted);
      font-size: 13px;
    }

    .toolbar {
      display: grid;
      grid-template-columns: 1fr 280px auto auto;
      gap: 12px;
      align-items: center;
    }

    .prompt-box {
      display: grid;
      gap: 8px;
    }

    .prompt-actions {
      display: flex;
      gap: 8px;
      flex-wrap: wrap;
    }

    .prompt-label {
      font-size: 12px;
      color: var(--muted);
      font-weight: 600;
    }

    .api-base {
      width: 100%;
      border: 1px solid var(--border);
      border-radius: 10px;
      padding: 8px 10px;
      font-size: 13px;
      outline: none;
      background: #fff;
    }

    .api-base:focus {
      border-color: #91b9f2;
      box-shadow: 0 0 0 3px rgba(12, 102, 228, 0.12);
    }

    .mode-group {
      display: flex;
      gap: 16px;
      align-items: center;
      color: var(--muted);
      font-size: 14px;
    }

    .mode-group label {
      display: inline-flex;
      gap: 6px;
      align-items: center;
      cursor: pointer;
    }

    button {
      border: none;
      border-radius: 10px;
      padding: 9px 14px;
      font-size: 14px;
      cursor: pointer;
      color: #fff;
      background: linear-gradient(120deg, var(--primary) 0%, var(--primary-2) 100%);
    }

    button:disabled {
      opacity: 0.6;
      cursor: not-allowed;
    }

    .btn-ghost {
      color: var(--text);
      background: #eef4ff;
      border: 1px solid #cddcf7;
    }

    .panel {
      border: 1px solid var(--border);
      border-radius: 10px;
      background: #fcfdff;
      padding: 12px;
      height: clamp(220px, 28vh, 320px);
      overflow: auto;
    }

    .msg {
      margin: 0 0 14px;
      white-space: pre-wrap;
      line-height: 1.55;
      font-size: 14px;
    }

    .user {
      color: #0f4ca6;
    }

    .assistant {
      color: #1f2b3a;
    }

    .assistant-think {
      color: #7a8493;
      font-size: 12px;
      line-height: 1.25;
      white-space: pre-wrap;
      margin-bottom: 6px;
    }

    .assistant-answer {
      color: #1f2b3a;
      font-size: 14px;
      line-height: 1.55;
      white-space: pre-wrap;
    }

    .meta {
      color: var(--muted);
      font-size: 12px;
      margin-bottom: 4px;
    }

    .composer {
      display: grid;
      grid-template-columns: 1fr auto;
      gap: 10px;
      align-items: end;
    }

    textarea {
      resize: vertical;
      min-height: 72px;
      max-height: 150px;
      width: 100%;
      border: 1px solid var(--border);
      border-radius: 10px;
      padding: 10px;
      font-size: 14px;
      outline: none;
    }

    #systemPrompt {
      min-height: 72px;
      max-height: 120px;
    }

    #question {
      min-height: 72px;
      max-height: 120px;
    }

    textarea:focus {
      border-color: #91b9f2;
      box-shadow: 0 0 0 3px rgba(12, 102, 228, 0.12);
    }

    .tips {
      font-size: 12px;
      color: var(--muted);
      margin-top: 8px;
    }

    .ok {
      color: var(--ok);
    }

    .bad {
      color: var(--bad);
    }

    @media (max-width: 900px) {
      body {
        padding: 12px;
      }

      .app {
        padding: 10px;
        gap: 8px;
      }

      .toolbar {
        grid-template-columns: 1fr;
      }

      .mode-group {
        flex-wrap: wrap;
        gap: 10px 14px;
      }

      .panel {
        height: clamp(180px, 36vh, 280px);
      }
    }

    @media (max-width: 640px) {
      h1 {
        font-size: 17px;
      }

      .title {
        flex-direction: column;
        align-items: flex-start;
      }

      .composer {
        grid-template-columns: 1fr;
      }

      #sendBtn {
        width: 100%;
      }

      textarea {
        min-height: 68px;
      }
    }
  </style>
</head>
<body>
  <div class="app">
    <div class="title">
      <h1>Demo10 Chat Playground</h1>
      
    </div>

    <div class="prompt-box">
      <div class="prompt-label">系统提示词(点击“保存提示词”后会同步到服务端)</div>
      <textarea id="systemPrompt" placeholder="在这里输入系统提示词,修改后保存即可生效"></textarea>
      <div class="prompt-actions">
        <button id="loadPromptBtn" class="btn-ghost">刷新提示词</button>
        <button id="savePromptBtn">保存提示词</button>
      </div>
    </div>

    <div class="toolbar">
      <div class="mode-group">
        <strong>模式:</strong>
        <label><input type="radio" name="mode" value="chat" checked /> 普通 chat</label>
        <label><input type="radio" name="mode" value="stream" /> 流式 stream</label>
        <label><input type="radio" name="mode" value="raw" /> 原始 raw</label>
        <label><input type="radio" name="mode" value="batch" /> 批量 batch</label>
      </div>
      <input id="apiBase" class="api-base" type="text" />
      <button id="healthBtn" class="btn-ghost">检查服务</button>
      <button id="clearBtn" class="btn-ghost">清空对话</button>
      
    </div>

    <div id="status" class="status">就绪</div>
    <div id="messages" class="panel"></div>

    <div>
      <div class="composer">
        <textarea id="question" placeholder="输入你的问题,例如:请用三句话介绍 LangChain;batch 模式下每行一个问题"></textarea>
        <button id="sendBtn">发送</button>
      </div>
      <div class="tips">
        普通模式调用 <code>/chat</code>;流式模式调用 <code>/chat/stream</code>;原始模式调用 <code>/chat/raw</code>;批量模式调用 <code>/batch</code>。
      </div>
    </div>
  </div>

  &lt;script&gt;
    const messagesEl = document.getElementById("messages");
    const questionEl = document.getElementById("question");
    const sendBtn = document.getElementById("sendBtn");
    const healthBtn = document.getElementById("healthBtn");
    const clearBtn = document.getElementById("clearBtn");
    const loadPromptBtn = document.getElementById("loadPromptBtn");
    const savePromptBtn = document.getElementById("savePromptBtn");
    const statusEl = document.getElementById("status");
    const apiBaseEl = document.getElementById("apiBase");
    const systemPromptEl = document.getElementById("systemPrompt");
    const API_BASE_STORAGE_KEY = "demo10_api_base";
    const SYSTEM_PROMPT_STORAGE_KEY = "demo10_system_prompt";

    function defaultApiBase() {
      // 如果页面不是从 http(s) 打开(比如 file://),默认回退到本地 API。
      if (!window.location.origin || window.location.origin === "null") {
        return "http://127.0.0.1:8000";
      }
      // Live Server 常用端口 5500,默认把 API 指向 FastAPI 8000。
      if (window.location.port === "5500") {
        return "http://127.0.0.1:8000";
      }
      return window.location.origin;
    }

    // 优先使用上次成功配置的 API 地址,便于重复调试。
    // 但如果缓存值误指向 5500(Live Server 自身),自动修正到 8000。
    const storedApiBase = localStorage.getItem(API_BASE_STORAGE_KEY) || "";
    const fixedStoredBase = /:5500\b/.test(storedApiBase) ? "http://127.0.0.1:8000" : storedApiBase;
    apiBaseEl.value = fixedStoredBase || defaultApiBase();
    apiBaseEl.placeholder = "API Base URL,例如 http://127.0.0.1:8000";
    apiBaseEl.addEventListener("change", () => {
      localStorage.setItem(API_BASE_STORAGE_KEY, apiBaseEl.value.trim());
    });

    const defaultSystemPrompt =
      "你是一个清晰、耐心的技术助手。回答要准确、结构清楚、尽量简洁。只输出最终答案,不要输出<think>、<reasoning>、<analysis>等中间思考标签。";
    systemPromptEl.value = localStorage.getItem(SYSTEM_PROMPT_STORAGE_KEY) || defaultSystemPrompt;
    systemPromptEl.addEventListener("change", () => {
      localStorage.setItem(SYSTEM_PROMPT_STORAGE_KEY, systemPromptEl.value.trim());
    });

    function buildUrl(path) {
      const base = (apiBaseEl.value || "").trim().replace(/\/+$/, "");
      return `${base}${path}`;
    }

    async function parseJsonOrThrow(resp) {
      const url = resp.url || "unknown-url";
      const contentType = resp.headers.get("content-type") || "";
      const rawText = await resp.text();

      // 空响应体:给出更清晰错误,避免浏览器直接抛 Unexpected end of JSON input。
      if (!rawText || !rawText.trim()) {
        throw new Error(`接口返回空响应体,status=${resp.status},url=${url}`);
      }

      if (contentType.includes("application/json")) {
        try {
          return JSON.parse(rawText);
        } catch (e) {
          throw new Error(
            `JSON 解析失败,status=${resp.status},url=${url},body=${rawText.slice(0, 180)}`
          );
        }
      }
      throw new Error(
        `接口未返回 JSON,status=${resp.status},url=${url},content-type=${contentType},body=${rawText.slice(0, 180)}`
      );
    }

    function getMode() {
      return document.querySelector('input[name="mode"]:checked')?.value || "chat";
    }

    function appendMessage(role, text) {
      const wrap = document.createElement("div");
      const meta = document.createElement("div");
      meta.className = "meta";
      meta.textContent = role === "user" ? "你" : "助手";

      const body = document.createElement("div");
      body.className = `msg ${role}`;
      body.textContent = text;

      wrap.appendChild(meta);
      wrap.appendChild(body);
      messagesEl.appendChild(wrap);
      messagesEl.scrollTop = messagesEl.scrollHeight;
      return body;
    }

    function setStatus(text, type = "normal") {
      statusEl.textContent = text;
      statusEl.classList.remove("ok", "bad");
      if (type === "ok") statusEl.classList.add("ok");
      if (type === "bad") statusEl.classList.add("bad");
    }

    async function checkHealth() {
      setStatus("检查中...");
      try {
        const resp = await fetch(buildUrl("/health"));
        const data = await parseJsonOrThrow(resp);
        if (resp.ok && data.status === "ok") {
          setStatus("服务正常: llm_ready=true", "ok");
        } else {
          setStatus(`服务异常: ${JSON.stringify(data)}`, "bad");
        }
      } catch (err) {
        setStatus(`健康检查失败: ${err}`, "bad");
      }
    }

    async function loadPrompt() {
      setStatus("读取提示词中...");
      try {
        const resp = await fetch(buildUrl("/prompt"));
        const data = await parseJsonOrThrow(resp);
        if (resp.ok && typeof data.system_prompt === "string") {
          systemPromptEl.value = data.system_prompt;
          localStorage.setItem(SYSTEM_PROMPT_STORAGE_KEY, data.system_prompt);
          setStatus("提示词已加载", "ok");
        } else {
          setStatus(`提示词读取异常: ${JSON.stringify(data)}`, "bad");
        }
      } catch (err) {
        setStatus(`提示词读取失败: ${err}`, "bad");
      }
    }

    async function savePrompt() {
      const systemPrompt = systemPromptEl.value.trim();
      if (!systemPrompt) {
        setStatus("系统提示词不能为空", "bad");
        return;
      }

      savePromptBtn.disabled = true;
      setStatus("保存提示词中...");
      try {
        const resp = await fetch(buildUrl("/prompt"), {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify({ system_prompt: systemPrompt }),
        });
        const data = await parseJsonOrThrow(resp);
        if (!resp.ok) {
          throw new Error(JSON.stringify(data));
        }
        localStorage.setItem(SYSTEM_PROMPT_STORAGE_KEY, data.system_prompt || systemPrompt);
        setStatus("提示词已保存", "ok");
      } catch (err) {
        setStatus(`保存提示词失败: ${err}`, "bad");
      } finally {
        savePromptBtn.disabled = false;
      }
    }

    async function sendChat(question) {
      const resp = await fetch(buildUrl("/chat"), {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ question }),
      });
      const data = await parseJsonOrThrow(resp);
      if (!resp.ok) {
        throw new Error(JSON.stringify(data));
      }
      if (typeof data.answer !== "string") {
        throw new Error(`接口返回缺少 answer 字段: ${JSON.stringify(data).slice(0, 200)}`);
      }
      return data;
    }

    async function sendRaw(question) {
      const resp = await fetch(buildUrl("/chat/raw"), {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ question }),
      });
      const data = await parseJsonOrThrow(resp);
      if (!resp.ok) {
        throw new Error(JSON.stringify(data));
      }
      if (typeof data.raw_answer !== "string") {
        throw new Error(`接口返回缺少 raw_answer 字段: ${JSON.stringify(data).slice(0, 200)}`);
      }
      return data.raw_answer;
    }

    async function sendBatch(questionText) {
      const questions = questionText
        .split("\n")
        .map((line) => line.trim())
        .filter(Boolean);

      if (!questions.length) {
        throw new Error("batch 模式下请至少输入一行问题");
      }

      const resp = await fetch(buildUrl("/batch"), {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ questions }),
      });
      const data = await parseJsonOrThrow(resp);
      if (!resp.ok) {
        throw new Error(JSON.stringify(data));
      }
      if (!Array.isArray(data.answers)) {
        throw new Error(`接口返回缺少 answers 字段: ${JSON.stringify(data).slice(0, 200)}`);
      }
      return data.answers;
    }

    async function sendStream(question, answerNode, thinkNode) {
      const resp = await fetch(buildUrl("/chat/stream"), {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ question }),
      });
      if (!resp.ok || !resp.body) {
        const t = await resp.text();
        throw new Error(t || `stream failed, status=${resp.status}`);
      }

      const reader = resp.body.getReader();
      const decoder = new TextDecoder("utf-8");
      let buffer = "";
      let hasAnswerStarted = false;

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const events = buffer.split("\n\n");
        buffer = events.pop() || "";

        for (const evt of events) {
          const lines = evt.split("\n");
          let eventType = "message";
          const dataLines = [];

          for (const line of lines) {
            if (line.startsWith("event:")) {
              eventType = line.slice(6).trim();
              continue;
            }
            if (line.startsWith("data:")) {
              dataLines.push(line.slice(5).trimStart());
            }
          }

          const rawData = dataLines.join("\n");
          let payload = null;
          try {
            payload = rawData ? JSON.parse(rawData) : null;
          } catch (_) {
            payload = null;
          }

          // 新版 SSE 事件格式:status / think / answer / done / error。
          if (eventType === "status") {
            setStatus(payload?.message || "模型处理中...");
            continue;
          }
          if (eventType === "think") {
            const preview = payload?.preview || payload?.text || "";
            const thinkText = payload?.text || "";
            if (thinkNode && thinkText) {
              thinkNode.textContent += thinkText;
              messagesEl.scrollTop = messagesEl.scrollHeight;
            }
            if (preview) {
              setStatus(`模型思考中: ${preview.slice(-36)}`);
            }
            continue;
          }
          if (eventType === "answer") {
            const text = payload?.text ?? rawData;
            if (text) {
              if (!hasAnswerStarted) {
                answerNode.textContent = "";
                hasAnswerStarted = true;
              }
              answerNode.textContent += text;
              messagesEl.scrollTop = messagesEl.scrollHeight;
            }
            continue;
          }
          if (eventType === "done") {
            return;
          }
          if (eventType === "error") {
            throw new Error(payload?.message || rawData || "stream error");
          }

          // 向后兼容旧格式:只有 data 行,没有 event 行。
          if (rawData === "[DONE]") return;
          if (rawData.startsWith("[ERROR]")) {
            throw new Error(rawData);
          }
          if (rawData) {
            if (!hasAnswerStarted) {
              answerNode.textContent = "";
              hasAnswerStarted = true;
            }
            answerNode.textContent += rawData;
            messagesEl.scrollTop = messagesEl.scrollHeight;
          }
        }
      }
    }

    function startWaitingHints(mode) {
      const hints =
        mode === "stream"
          ? ["连接流式通道中", "模型正在思考", "正在生成答案片段", "正在整理输出格式"]
          : mode === "raw"
            ? ["请求已发送", "模型正在直接返回原始内容", "正在接收原始输出", "正在整理原始结果"]
            : mode === "batch"
              ? ["批量请求已发送", "模型正在逐条处理", "正在汇总批量答案", "正在整理批量结果"]
              : ["请求已发送", "模型正在思考", "正在生成完整答案", "正在清洗输出内容"];

      let tick = 0;
      setStatus(`${hints[0]}...`);
      const timer = setInterval(() => {
        tick += 1;
        const message = hints[tick % hints.length];
        setStatus(`${message}... ${tick}s`);
      }, 1000);

      return () => clearInterval(timer);
    }

    async function onSend() {
      const question = questionEl.value.trim();
      if (!question) {
        setStatus("请输入问题", "bad");
        return;
      }

      sendBtn.disabled = true;
      appendMessage("user", question);
      questionEl.value = "";
      const assistantBody = appendMessage("assistant", "");
      let answerNode = assistantBody;
      let thinkNode = null;
      const mode = getMode();
      if (mode === "chat") {
        answerNode.textContent = "正在思考中,请稍候...";
      } else if (mode === "stream") {
        thinkNode = document.createElement("div");
        thinkNode.className = "assistant-think";
        assistantBody.appendChild(thinkNode);

        answerNode = document.createElement("div");
        answerNode.className = "assistant-answer";
        answerNode.textContent = "正在建立流式连接...";
        assistantBody.appendChild(answerNode);
      } else if (mode === "raw") {
        answerNode.textContent = "正在请求原始输出...";
      } else {
        answerNode.textContent = "正在批量处理...";
      }
      const stopHints = startWaitingHints(mode);

      try {
        if (mode === "chat") {
          const result = await sendChat(question);
          answerNode.textContent = result.answer;
          // chat 是一次性返回,无法实时展示思考过程;这里只展示请求状态和最终结果。
          setStatus("完成", "ok");
        } else if (mode === "stream") {
          await sendStream(question, answerNode, thinkNode);
          setStatus("完成", "ok");
        } else if (mode === "raw") {
          const rawAnswer = await sendRaw(question);
          answerNode.textContent = rawAnswer;
          setStatus("完成", "ok");
        } else {
          const answers = await sendBatch(question);
          answerNode.textContent = answers
            .map((answer, index) => `${index + 1}. ${answer}`)
            .join("\n\n");
          setStatus("完成", "ok");
        }
      } catch (err) {
        answerNode.textContent = `请求失败: ${err}`;
        setStatus("请求失败", "bad");
      } finally {
        stopHints();
        sendBtn.disabled = false;
      }
    }

    sendBtn.addEventListener("click", onSend);
    healthBtn.addEventListener("click", checkHealth);
    loadPromptBtn.addEventListener("click", loadPrompt);
    savePromptBtn.addEventListener("click", savePrompt);
    clearBtn.addEventListener("click", () => {
      messagesEl.innerHTML = "";
      setStatus("已清空");
    });
    questionEl.addEventListener("keydown", (e) => {
      // 仅按钮发送:这里不拦截 Enter,保持输入框默认换行行为。
    });

    checkHealth();
    loadPrompt();
  &lt;/script&gt;
</body>
</html>

image
image

httpsgithub

本项目完整代码已上传:https://github.com/kunyashaw/langChainBaisc

附录 · 配置速查

.env 配置

# MiniMax API(硅基流动)
MINIMAX_API_KEY=sk-your-key

# Ollama 本地服务
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_EMBED_MODEL=qwen3-embedding:0.6b

模型对比

LLM MiniMax-M2.7 base_url="https://api.minimaxi.com/v1" Embedding qwen3-embedding:0.6b Ollama 本地
用途 模型 配置

依赖安装

uv add langchain langchain-openai langchain-community langchain-chroma langchain-ollama langchain-text-splitters python-dotenv pydantic

下一步:进入 demos_basic 目录,逐个运行 demo 文件验证环境。
Demo 01-09 均可直接运行。

附录 · LangChain 1.0 核心概念速查卡

核心概念

LCEL 用 | 把提示词、模型、解析器串成流水线,是 LangChain 1.x 的基础语法。 Demo 01 PromptTemplate 把固定结构和动态变量分开,适合模板化生成。 Demo 02 ChatPromptTemplate 以消息列表组织提示词,更适合多轮对话。 Demo 02 MessagesPlaceholder 在模板里动态插入历史消息。 Demo 02 / 03 Memory 让模型记住上下文,控制长对话的上下文成本。 Demo 03 OutputParser 把模型输出转成 JSON、Pydantic 或列表等结构化结果。 Demo 04 Document Loader 把文本、PDF、网页等来源统一成 Document。 Demo 05 Text Splitter 把长文档切成更适合检索的小块。 Demo 05 Vector Store 保存文档向量,支持语义检索。 Demo 05 Retriever 从向量库里检索最相关的内容。 Demo 05 / 06 RAG 检索增强生成,让回答基于真实文档。 Demo 06 Tool 把普通函数包装成可调用工具。 Demo 07 Agent 让模型根据输入自动选择工具。 Demo 07 Callback 记录链路、模型和 token 变化,方便观测与排错。 Demo 08 Streaming 模型边生成边输出,适合实时交互。 Demo 08 RunnableParallel 并行执行多个子链,适合同一输入多输出。 Demo 09 RunnableBranch 按条件分流到不同处理路径。 Demo 09 RunnableLambda 把自定义 Python 函数插入链路。 Demo 04 / 09
概念 一句话说明 对应 Demo

核心 API

ChatPromptTemplate.from_template(...) 从单段模板创建聊天提示词。 Demo 01 ChatPromptTemplate.from_messages([...]) 从消息列表创建聊天提示词。 Demo 02 / 03 / 06 / 08 / 09 .partial(...) 预填固定变量,减少重复传参。 Demo 01 / 02 .invoke(...) 执行一次链路。 Demo 01-09 .batch([...]) 批量执行多个输入。 Demo 01 MessagesPlaceholder(variable_name="history") 在提示词中插入历史消息。 Demo 02 / 03 get_buffer_string(messages) 把消息列表转成可读字符串上下文。 Demo 03 / 06 JsonOutputParser() 解析 JSON 输出。 Demo 04 PydanticOutputParser(...) 解析并校验结构化对象。 Demo 04 CommaSeparatedListOutputParser() 解析逗号分隔列表。 Demo 04 TextLoader / PyPDFLoader / WebBaseLoader 加载文本、PDF、网页内容。 Demo 05 RecursiveCharacterTextSplitter(...) 把长文档切成小块。 Demo 05 Chroma.from_documents(...) 将文档块写入向量库。 Demo 05 vectorstore.as_retriever() 把向量库包装成 Retriever。 Demo 05 / 06 RunnablePassthrough() 原样透传输入。 Demo 06 / 09 @tool 把普通函数声明成工具。 Demo 07 create_agent(...) 创建可调工具的 Agent。 Demo 07 BaseCallbackHandler 定义回调处理器。 Demo 08 .stream(..., config={"callbacks": [...]}) 流式输出并触发回调。 Demo 08 RunnableParallel({...}) 并行运行多个子链。 Demo 09 RunnableBranch(...) 按条件选择执行分支。 Demo 09 RunnableLambda(func) 把自定义函数包装进 LCEL。 Demo 04 / 09
API 作用 对应 Demo

核心代码

prompt | llm | parser 最小闭环,把提示词、模型和输出解析串起来。 Demo 01 prompt.partial(profession="技术写作") 预填固定变量。 Demo 01 MessagesPlaceholder(variable_name="chat_history") 动态插入对话历史。 Demo 02 / 03 WindowedMemory 保留最近窗口对话。 Demo 03 SummaryMemory 对长对话做摘要压缩。 Demo 03 clean_json_output() 清理模型输出里的噪声内容。 Demo 04 PersonInfo(BaseModel) 定义结构化输出 schema。 Demo 04 load_documents() 统一加载文本、PDF、网页。 Demo 05 split_documents() 把长文档切块。 Demo 05 build_vectorstore() 写入向量库。 Demo 05 rag_chain = {...} | prompt | llm | parser 检索增强生成链。 Demo 06 condense_prompt | llm | StrOutputParser() 把多轮问题改写成独立查询。 Demo 06 @tool def multiply(...) 把函数变成工具。 Demo 07 create_agent(model=llm, tools=[...], system_prompt=...) 创建 Agent 并注册工具。 Demo 07 class MyCallbackHandler(BaseCallbackHandler) 自定义回调日志。 Demo 08 streaming_chain.stream(..., config={"callbacks": [...]}) 流式输出 token。 Demo 08 RunnableParallel({...}) 一次输入并行产生多个结果。 Demo 09 RunnableBranch(...) 按内容路由到不同逻辑。 Demo 09 RunnableLambda(add_suffix) 对链路结果做后处理。 Demo 09
核心代码 作用 使用场景