FastAPI + LangGraph 构建可记忆对话机器人:从状态管理到流式响应

📅 发布时间:2026/7/6 1:05:27 👁️ 浏览次数:
FastAPI + LangGraph 构建可记忆对话机器人:从状态管理到流式响应
1. 从零开始理解可记忆对话机器人的核心大家好我是老张在AI和智能硬件这块摸爬滚打了十来年。今天想和大家聊聊一个特别有意思也特别实用的东西如何用 FastAPI 和 LangGraph 亲手搭建一个能记住你们聊过什么的对话机器人。你可能会想现在聊天机器人不是满大街都是吗但说实话很多机器人聊两句就忘了你是谁每次对话都像第一次见面体验感很差。我们今天要做的就是让它变成一个“老朋友”能记住之前的对话脉络甚至能帮你总结聊过的重点。这背后的核心技术就是状态管理。你可以把一次完整的对话想象成一次长途旅行。普通的机器人就像没有地图的游客走到哪算哪下次再来还得从头问路。而我们要做的是给这个机器人一张可以不断绘制和保存的地图也就是对话状态让它知道“我们之前聊到哪了”、“用户喜欢什么”、“我们达成了什么共识”。LangGraph 提供的StateGraph状态图和Checkpointer检查点机制就是绘制和保存这张地图的绝佳工具。FastAPI 在这里扮演什么角色呢它就是我们机器人的“对外服务窗口”。一个强大的大脑LangGraph 状态机需要一个高效、易用的接口来与用户交互FastAPI 凭借其异步高性能和直观的 API 设计完美胜任。我们将从最基础的状态定义开始一步步构建出这个具备记忆和总结能力的智能体最后再给它装上“实时对讲机”WebSocket实现像 ChatGPT 那样一个字一个字往外蹦的流式响应效果。整个过程我会把我踩过的坑、调试的技巧都分享出来保证你跟着做就能跑起来。2. 搭建基石定义对话状态与记忆存储万事开头难我们先从最核心的部分——状态State说起。在 LangGraph 的世界里State 就是一个 Python 字典它定义了在整个对话流程中需要被跟踪和更新的所有数据。对于我们的可记忆机器人至少需要两个东西一是完整的对话消息列表二是对历史对话的总结摘要。2.1 设计我们的 State 类直接看代码这是最直观的。我们基于 LangGraph 提供的MessagesState来扩展它已经帮我们处理好了消息列表的合并逻辑。from typing import Annotated from typing_extensions import TypedDict from langchain_core.messages import AnyMessage from langgraph.graph.message import add_messages class State(TypedDict): messages: Annotated[list[AnyMessage], add_messages] summary: str我来解释一下这几行代码在干什么。我们定义了一个新的类型State它继承自TypedDict这相当于告诉 Python 和 LangGraph“我们的状态字典里必须有这两个键”。messages键存放所有的对话消息用户的和AI的Annotated注解和add_messages函数是 LangGraph 的魔法它自动帮我们把新产生的消息追加到原有的消息列表后面省去了我们手动拼接列表的麻烦。summary键就是一个字符串用来存放我们对历史对话的浓缩总结。为什么需要summary想象一下如果你和机器人聊了100条消息每次都把全部历史记录塞给 AI 模型不仅浪费钱API调用按token计费而且模型可能因为上下文太长而“失焦”。一个精炼的总结就能在保留核心信息的同时极大地缩短上下文长度。2.2 选择与配置记忆存储器Checkpointer状态有了存在哪LangGraph 提供了多种Checkpointer检查点你可以理解为游戏存档点。它负责把 State 序列化后存起来下次同一个用户进来可以从这个存档点继续。最常用也最简单的是MemorySaver它把状态存在程序的内存里。好处是快零配置。坏处也很明显程序一重启所有记忆就清零了。对于学习和小型演示这完全够用。from langgraph.checkpoint.memory import MemorySaver memory MemorySaver()但在真实的生产环境你肯定希望用户的记忆能持久化。LangGraph 支持 Redis、PostgreSQL 等后端。比如用 RedisSaver配置也不复杂# 假设你已经安装了 langgraph-redis from langgraph.checkpoint.redis import RedisSaver import redis redis_client redis.Redis(hostlocalhost, port6379, db0) redis_checkpointer RedisSaver(clientredis_client)这里的关键是configurable配置字典。注意看下面编译图时的参数以及后续调用时的configapp workflow.compile(checkpointermemory) # 使用内存存储 # 每个用户或对话线程通过唯一的 thread_id 来区分 user_config {configurable: {thread_id: user_123}}这个thread_id就是用户的“身份证”。通过它LangGraph 能精准地找到并加载对应用户的对话状态。你可以用用户的唯一ID、会话ID等来填充它。3. 构建大脑用 LangGraph 编排对话流程有了状态和存储器接下来我们要设计机器人的“思维流程”。LangGraph 使用状态图StateGraph来定义这个流程它由节点Node和边Edge组成非常直观。3.1 创建核心的工作节点我们的机器人主要有两个任务1. 正常对话2. 在适当的时候总结对话。我们为每个任务创建一个节点函数。首先是对话节点conversation node。它的职责是调用大模型生成回复。这里有个小技巧在调用模型前我们会先检查是否存在历史总结summary如果存在就把它作为一条系统消息System Message插入到本次对话消息的前面。这样模型就能在“知晓历史背景”的前提下进行回复。from langchain_openai import ChatOpenAI from langchain_core.messages import SystemMessage # 初始化模型这里以 OpenAI 为例用 Azure、智谱等都一样 model ChatOpenAI(modelgpt-3.5-turbo) async def call_model(state: State): # 获取当前的历史总结 current_summary state.get(summary, ) # 准备要发送给模型的消息列表 if current_summary: # 如果有总结把它作为背景知识告诉模型 system_msg SystemMessage(contentf先前对话的总结{current_summary}) messages_for_model [system_msg] state[messages] else: messages_for_model state[messages] # 调用大模型获取回复 response await model.ainvoke(messages_for_model) # 返回的结果会被 LangGraph 自动添加到 state[“messages”] 中 return {messages: [response]}然后是总结节点summarize node。当对话轮次太多时我们需要触发这个节点来压缩历史。它的逻辑是将当前所有消息加上一个“请总结”的提示发给模型得到一段总结文本然后用这段总结替换掉旧的summary。同时为了不让messages列表无限膨胀我们通常会删除一些早期的消息只保留最近几条。from langchain_core.messages import HumanMessage, RemoveMessage async def summarize_conversation(state: State): old_summary state.get(summary, ) # 根据是否存在旧总结构造不同的总结提示词 if old_summary: summary_prompt ( f这是到目前为止的对话总结{old_summary}\n\n 请结合上方的新消息扩展这个总结 ) else: summary_prompt 请为上面的对话创建一个总结 # 将历史消息和总结指令一起发给模型 messages_for_summary state[messages] [HumanMessage(contentsummary_prompt)] new_summary_result await model.ainvoke(messages_for_summary) new_summary new_summary_result.content # 保留最近2条消息删除更早的避免列表过长 # RemoveMessage 是 LangGraph 的特殊指令告诉系统删除指定ID的消息 messages_to_keep state[messages][-2:] if len(state[messages]) 2 else state[messages] ids_to_remove [m.id for m in state[messages] if m not in messages_to_keep] delete_instructions [RemoveMessage(idmsg_id) for msg_id in ids_to_remove] # 更新状态存入新总结并应用删除消息的指令 return {summary: new_summary, messages: delete_instructions}3.2 组装状态图并设置流转逻辑节点定义好了现在用 LangGraph 的StateGraph把它们组装起来并设定好执行路径。from langgraph.graph import StateGraph, START, END # 1. 创建一个状态图指定我们自定义的 State 类型 workflow StateGraph(State) # 2. 添加节点给每个节点函数起个名字 workflow.add_node(conversation, call_model) workflow.add_node(summarize, summarize_conversation) # 3. 设置起始边对话总是从 conversation 节点开始 workflow.add_edge(START, conversation) # 4. 设置条件边这是实现智能流转的关键 # 在 conversation 节点执行完后根据条件决定下一步去哪 workflow.add_conditional_edges( conversation, # 源节点 should_continue, # 条件判断函数 ) # 5. 设置总结后的边执行完 summarize 节点后本次对话轮次结束 workflow.add_edge(summarize, END) # 6. 编译成可执行的应用并挂载我们之前创建的存储器 app workflow.compile(checkpointermemory)重点在于那个should_continue函数它决定了对话的走向。一个常见的策略是基于对话轮次进行总结from typing import Literal def should_continue(state: State) - Literal[summarize, END]: 判断是继续总结还是结束本轮处理 messages state[messages] # 如果消息条数超过5条就触发总结 if len(messages) 5: return summarize # 否则本次对话流程结束 return END这样一个具备自动记忆和总结能力的对话大脑就构建完成了。你可以通过app.invoke()或app.astream()来运行它并传入用户的thread_id来维持记忆。4. 提供接口用 FastAPI 封装非流式 API大脑在后台运行起来了现在我们需要给它开一个对外的“窗口”让用户能通过 HTTP 请求和它交互。FastAPI 以其简洁和高效成为不二之选。我们先从最传统的“一问一答”非流式接口开始。4.1 设计 API 数据模型与路由首先规划一下我们需要哪些接口。至少需要两个一个用于发送消息并获取回复另一个用于查询某个用户的历史对话记录。# main.py from fastapi import FastAPI from pydantic import BaseModel from typing import List app FastAPI(title可记忆对话机器人API) # 定义请求和响应的数据格式 class ChatRequest(BaseModel): user_id: str # 用户唯一标识 query: str # 用户发送的消息 class ChatResponse(BaseModel): response: str # AI的回复 class HistoryItem(BaseModel): role: str # “human” 或 “ai” content: str class HistoryResponse(BaseModel): history: List[HistoryItem] # 历史记录列表接下来实现聊天接口。这个接口的核心是调用我们上一节编译好的 LangGraphapp并传入对应的用户配置。# chat_service.py from langchain_core.messages import HumanMessage from .graph_app import app as graph_app # 导入上一节编译好的图 async def generate_response(query: str, user_id: str) - str: 处理用户查询返回AI回复 # 1. 准备用户配置记忆的钥匙 config {configurable: {thread_id: user_id}} # 2. 将用户输入包装成 LangChain 消息 user_message HumanMessage(contentquery) # 3. 调用图应用传入初始状态和配置 # stream_modevalues 表示我们只关心最终的状态值 final_state await graph_app.ainvoke( {messages: [user_message]}, configconfig, stream_modevalues ) # 4. 从最终状态中取出最新的AI消息内容 # final_state[“messages”] 是一个列表最后一条通常是AI的最新回复 ai_messages [msg for msg in final_state[messages] if msg.type ai] if ai_messages: return ai_messages[-1].content return 抱歉我没有生成回复。然后在 FastAPI 中创建路由# main.py (续) from chat_service import generate_response, get_chat_history app.post(/chat, response_modelChatResponse) async def chat_endpoint(request: ChatRequest): 聊天接口 ai_response await generate_response(request.query, request.user_id) return ChatResponse(responseai_response) app.get(/history/{user_id}, response_modelHistoryResponse) async def get_history(user_id: str): 获取历史记录接口 history_list get_chat_history(user_id) return HistoryResponse(historyhistory_list)4.2 实现历史记录查询功能历史记录查询的核心是从我们配置的 Checkpointer存储器中根据thread_id把完整的 State 取出来然后从中提取出格式化的消息。# chat_service.py (续) def get_chat_history(user_id: str) - List[dict]: 根据user_id获取聊天历史 config {configurable: {thread_id: user_id}} try: # 从检查点获取该用户的最新状态 state graph_app.get_state(config) if not state or not state.values: return [] history [] for msg in state.values.get(messages, []): # 简单提取角色和内容 history.append({ role: user if msg.type human else assistant, content: msg.content }) return history except Exception as e: # 处理用户首次对话状态不存在等情况 print(f获取历史记录失败: {e}) return []到这里一个具备完整记忆功能的非流式对话 API 就搭建好了。你可以用uvicorn main:app --reload启动服务然后通过/docs页面方便地进行测试。输入user_id和query你会发现当你用同一个user_id连续提问时机器人能准确引用之前的对话内容这就是状态管理和记忆存储的威力。5. 升级体验实现 WebSocket 流式响应非流式接口虽然能用但体验上总感觉“卡顿”用户发送问题后要等待模型完全生成完毕才能看到结果。现在我们来把它升级成类似 ChatGPT 的实时流式输出让回复一个字一个字地“流”到前端。这需要用到 FastAPI 的 WebSocket 支持。5.1 建立 WebSocket 连接与前端准备首先我们创建一个简单的 HTML 页面作为测试前端它会通过 WebSocket 连接到我们的后端。!DOCTYPE html html head title流式对话测试/title /head body input typetext idinputBox placeholder输入你的问题... button onclicksendMessage()发送/button div idresponseArea stylewhite-space: pre-wrap; border:1px solid #ccc; min-height:100px;/div script const socket new WebSocket(ws://${window.location.host}/ws/chat); const responseArea document.getElementById(responseArea); socket.onopen function(event) { console.log(WebSocket 连接已建立); }; socket.onmessage function(event) { // 接收到的数据是流式的文本片段直接追加显示 responseArea.textContent event.data; }; socket.onerror function(error) { console.error(WebSocket 错误:, error); }; function sendMessage() { const inputBox document.getElementById(inputBox); const userId test_user_001; // 示例用户ID const message inputBox.value; if (message) { // 发送 JSON 格式的数据包含用户ID和查询内容 socket.send(JSON.stringify({ user_id: userId, query: message })); inputBox.value ; responseArea.textContent \n\n用户: message \n助手: ; } } /script /body /html5.2 后端 WebSocket 路由与流式处理后端需要创建一个 WebSocket 端点来处理连接、接收消息并最关键的一步——将 LangGraph 的流式输出实时转发给前端。# main.py (续) from fastapi import WebSocket, WebSocketDisconnect import json app.websocket(/ws/chat) async def websocket_chat(websocket: WebSocket): # 接受客户端连接 await websocket.accept() try: while True: # 等待接收客户端发来的消息 data await websocket.receive_text() request_data json.loads(data) user_id request_data.get(user_id) query request_data.get(query) if not user_id or not query: await websocket.send_text(错误请求中需要 user_id 和 query 字段) continue # 调用流式处理函数 await stream_response_to_websocket(query, user_id, websocket) except WebSocketDisconnect: print(f客户端断开连接) except json.JSONDecodeError: await websocket.send_text(错误无法解析JSON数据) except Exception as e: print(fWebSocket处理异常: {e}) await websocket.send_text(f服务器内部错误: {e})核心的流式处理逻辑在stream_response_to_websocket函数中。这里我们不再使用ainvoke一次性获取结果而是使用 LangGraph 的astream方法它返回一个异步生成器每当图中有新消息产生时就会yield出来。# chat_service.py (续) async def stream_response_to_websocket(query: str, user_id: str, websocket: WebSocket): 将 LangGraph 的流式输出通过 WebSocket 发送 config {configurable: {thread_id: user_id}} user_message HumanMessage(contentquery) # 关键使用 astream 进行流式调用 # stream_mode“messages” 表示我们流式接收的是消息对象本身 async for event in graph_app.astream( {messages: [user_message]}, configconfig, stream_modemessages ): # 事件(event)是一个元组 (消息对象, 元数据) msg, metadata event # 我们需要过滤只处理来自“conversation”节点的AI消息内容 if ( hasattr(msg, content) and msg.content and metadata.get(langgraph_node) conversation and msg.type ai ): # 将消息内容发送给前端 # 注意这里发送的是纯文本前端直接拼接显示 await websocket.send_text(msg.content) # 可选如果需要更精细的控制如区分token可以遍历 msg.content这里有个细节需要注意astream会产生很多事件包括节点开始、结束、状态更新等。我们通过metadata.get(langgraph_node)来判断事件来自哪个节点只将conversation节点产生的 AI 消息内容流式发送出去。这样前端就能实时看到模型生成的每一个字了。5.3 流式总结的优化处理你可能已经发现当触发总结节点时summarize节点也会调用一次大模型来生成总结文本。如果我们不对其进行过滤这个总结文本也可能被流式发送到前端这通常不是我们想要的。用户看到一段突然出现的总结会感到困惑。因此在上面的流式过滤条件中我们严格限定只发送来自conversation节点的ai类型消息。总结节点的输出只更新内部的state[“summary”]不会流向客户端。这样就保证了前端体验的纯净性用户感知到的始终是连贯的对话流。6. 实战调试与性能优化要点把代码跑起来只是第一步要让这个系统稳定、高效地运行还需要注意不少细节。我把自己在项目中踩过的几个坑分享给大家。6.1 状态序列化与自定义对象处理LangGraph 的 Checkpointer 在存储 State 时需要将其序列化比如转换成 JSON。如果你的 State 里包含了自定义的类对象或者像 LangChain 的AIMessage、HumanMessage这样的复杂对象可能会遇到序列化错误。一个常见的解决方案是使用 LangGraph 推荐的jsonplus序列化器它对 Python 的复杂对象如 datetime, numpy array有更好的支持并且需要处理 LangChain 消息对象的特殊序列化逻辑。你可能会在代码中看到这样的补丁# 处理消息包序列化问题 from langgraph.checkpoint.serde import jsonplus import msgpack def custom_default(obj): # 为无法序列化的类型提供自定义转换 if hasattr(obj, ‘to_dict’): return obj.to_dict() raise TypeError(f“Object of type {obj.__class__.__name__} is not JSON serializable”) jsonplus._msgpack_default custom_default更稳健的做法是在 State 中尽量存储可序列化的基本类型字符串、数字、列表、字典。对于消息LangChain 的消息类通常已经做好了序列化支持。6.2 控制上下文长度与总结策略记忆虽好但不能无限增长。我们的策略是当消息条数len(state[“messages”])超过一个阈值比如5条时触发总结。但这里有几个可优化的点基于 Token 数判断消息条数不如总 Token 数准确。你可以集成tiktoken或其他模型的 Tokenizer 来计算上下文总长度超过模型限制如 4096 tokens前就触发总结。这更科学但计算会稍微增加开销。总结的粒度我们现在的总结是覆盖性的每次都用新总结完全替换旧总结。对于超长对话这可能丢失早期的重要信息。可以考虑“分层总结”或“增量总结”只总结最近 N 条消息然后将新总结与旧总结进行融合。保留关键消息在summarize_conversation节点中我们粗暴地删除了倒数两条之外的所有消息。在实际应用中你可能希望保留那些标记了重要性的消息例如用户设置了偏好而不是单纯按时间删除。6.3 异步处理与并发考量我们的服务使用了async/await异步编程。这对于 I/O 密集型的 AI 应用网络请求、数据库读写至关重要能极大提升并发能力。但要注意线程安全确保你使用的 Checkpointer如 RedisSaver是支持异步操作的或者在异步上下文中使用是安全的。模型调用超时在调用model.ainvoke()时最好设置一个超时时间避免因为网络或模型服务问题导致请求一直挂起拖垮整个服务。import asyncio from langchain_core.runnables import RunnableConfig try: # 为模型调用设置超时 response await asyncio.wait_for( model.ainvoke(messages_for_model), timeout30.0 # 30秒超时 ) except asyncio.TimeoutError: # 返回一个超时提示消息 return {messages: [AIMessage(content思考时间过长请稍后再试。)]}连接池管理如果你的模型客户端如ChatOpenAI或数据库客户端支持连接池请合理配置。FastAPI 的依赖注入系统可以帮助你在应用生命周期内管理这些共享客户端。6.4 部署与监控建议当你的机器人准备上线时考虑以下几点无状态服务我们的应用本身是无状态的所有状态都保存在外部的 Checkpointer如 Redis中。这非常有利于水平扩展你可以启动多个服务实例通过负载均衡器分发请求。健康检查为 FastAPI 添加/health端点用于检查服务本身以及 Redis 等外部依赖是否正常。日志与追踪为 LangGraph 的图执行过程添加详细的日志。LangGraph 本身提供了不错的日志功能你可以通过配置看到每个节点的输入输出这对于调试复杂的流转逻辑非常有帮助。配置化管理将总结阈值、模型温度、最大 Token 数等参数提取到配置文件如.env或config.yaml中方便不同环境开发、测试、生产的调整。最后我想说用 FastAPI 和 LangGraph 搭建这样一个系统就像在组装一个精密的乐高模型。FastAPI 提供了坚固易用的框架LangGraph 则提供了描述复杂思维链的直观工具。把它们组合起来你就能创造出体验远超普通问答机器人的智能体。我最初实现这个架构时被它清晰的逻辑和强大的表达能力惊艳到了。希望这份详细的指南能帮你少走弯路快速搭建出属于自己的、会记忆、能思考的对话伙伴。如果在实践过程中遇到问题多看看官方文档多在社区里交流你会发现很多有趣的玩法和优化空间。