LangGraph+Redis构建可回溯、可审计的AI代理系统
1. 项目概述:为什么“有记忆的AI代理”不再是科幻概念
LangGraph & Redis 这个组合,最近在构建生产级 AI 代理(Agent)的工程师圈子里被反复提起——不是因为它们多新,而是因为它们第一次把“能记住、能回溯、能断点续跑”的 AI 代理,从 Demo 级别真正拉进了可部署、可运维、可审计的工程现实。我从去年底开始在三个不同业务线落地这类系统:一个是面向客服坐席的实时知识辅助 Agent,一个是金融合规文档的多轮交叉校验 Agent,还有一个是内部研发团队用的自动化技术方案生成 Agent。三套系统上线后最常被问的问题不是“它能回答什么”,而是“它还记得我上一轮说了什么吗?”“如果服务重启了,对话历史还在不在?”“用户中途退出再回来,还能接着聊吗?”——这些看似基础的问题,恰恰是绝大多数开源 Agent 框架默认不解决的软肋。LangGraph 提供的是可控的有向图状态机,它让开发者能明确定义“思考→调用→决策→循环”的每一步逻辑,而不是依赖黑盒的 LLM 自主调度;Redis 则承担了唯一可信的状态存储层,把 LangGraph 运行时的 graph state、message history、tool call 记录、甚至自定义 metadata 全部持久化下来,且毫秒级响应。这不是简单的“加个数据库”,而是一次对 AI 应用架构范式的重定义:LLM 不再是单次请求的终点,而是嵌入在长期运行、带上下文感知能力的服务流程中的一个智能组件。如果你正在用 LangChain 的 AgentExecutor、LlamaIndex 的 ReActAgent 或手写 while 循环做 Agent,却卡在状态丢失、调试困难、无法支持多用户并发或历史不可追溯这些问题上,那么 LangGraph + Redis 就不是“可选项”,而是你当前技术债的必解项。
2. 架构设计与选型逻辑:为什么是 LangGraph 而不是其他图框架?为什么必须是 Redis?
2.1 LangGraph 的不可替代性:状态机思维 vs. 黑盒调度
很多人第一反应是:“我用 LangChain 的 RunnableWithMessageHistory 不就能存对话历史了吗?”——这恰恰是最大的认知偏差。RunnableWithMessageHistory 只解决“消息列表怎么存”,但完全不解决“Agent 当前处于哪个决策节点、上一次 tool call 返回了什么、是否已触发重试逻辑、下一步该走分支 A 还是 B”这些控制流状态问题。LangGraph 的核心价值,在于它把 Agent 的执行过程显式建模为Stateful Graph:每个节点(node)是一个纯函数(比如 call_llm、invoke_tool、route_decision),每条边(edge)是一个条件判断(比如 should_continue、is_error),而整个图的运行状态(state)是一个可序列化的 Python dict(通常继承自 TypedDict)。这个 state 不仅包含 messages,还必须包含你定义的所有中间变量:current_step: int,retry_count: int,last_tool_name: str,user_intent_confidence: float……正是这种强类型、可追踪、可快照(snapshot)的设计,让调试变得像调试传统后端服务一样直观。我对比过几种主流方案:
- LangChain AgentExecutor:底层基于递归调用,state 隐藏在闭包和局部变量中,一旦出错只能看日志猜;重启即失联,无法恢复中断流程。
- LlamaIndex ReActAgent:虽支持 step-by-step,但 state 是临时对象,无统一序列化协议,跨进程/跨机器无法共享。
- 自研状态机(如用 asyncio + dict):初期灵活,但三个月后你会发现 70% 的代码在写状态校验、序列化兼容、并发锁、版本迁移脚本——这已经不是 AI 工程,而是分布式系统工程。
LangGraph 把这些都标准化了:它内置checkpointer接口,强制要求所有 state 必须可 JSON 序列化(或通过 custom serializer),所有节点必须返回新 state(不可变性),所有边必须返回 next node 名称(确定性路由)。这不是约束,而是解放——你不再需要为“怎么让 Agent 记住自己做过什么”操心,只需专注“它该做什么”。
2.2 Redis 的刚性需求:不只是缓存,而是唯一真相源
为什么非得是 Redis?MongoDB 不行吗?PostgreSQL 带 JSONB 不行吗?答案是:在高并发、低延迟、强一致性要求下,它们都不如 Redis 合适。我们来算一笔账:一个典型客服 Agent 对话,平均每轮产生 3~5 条 message(user + system + tool response + LLM thought),每次调用需读取最近 10 轮历史(约 30 条记录),并写入 1 条新 state 快照。按每秒 50 个并发会话计算,QPS 达到 1500+ 读 + 50 写。此时:
- PostgreSQL:即使加索引,单条 JSONB 查询平均耗时 8~12ms,写入 WAL 日志 + MVCC 版本管理,写延迟常超 20ms。更致命的是,它的行级锁在高频更新同一 session_id 时极易引发锁等待,实测 QPS 超 300 即出现明显排队。
- MongoDB:文档更新原子性好,但内存映射文件(mmapv1)在大量小文档写入时易碎片化,WiredTiger 引擎虽优化,但其 journal commit 延迟仍不稳定(实测 P95 写入 15ms)。且缺乏原生的 TTL 自动清理机制,历史数据堆积后查询性能断崖下跌。
- Redis:
HSET session:{id} state "{json}"+EXPIRE session:{id} 86400,读写均在 0.2~0.5ms 内完成(本地直连),集群模式下自动分片,天然支持WATCH/MULTI实现乐观锁,SCAN命令可安全遍历过期 key 清理。更重要的是,Redis 的Stream 数据结构完美匹配 LangGraph 的 event log 需求:每条 state 更新可作为一条 stream record,包含 timestamp、event_type("node_entered" / "tool_called")、payload,这直接构成了可审计的全链路追踪日志,无需额外集成 OpenTelemetry。我们线上系统已稳定运行 8 个月,Redis 实例 CPU 峰值仅 12%,内存占用 16GB(支撑 200 万活跃 session),这是其他存储无法企及的性价比。
2.3 组合协同的关键设计点:Checkpointer 如何成为桥梁
LangGraph 并不绑定任何存储,它通过BaseCheckpointSaver抽象接口接入外部存储。Redis 的接入不是简单地“把 dict 存成 string”,而是要精准映射 LangGraph 的四个核心操作:
- get_tuple:根据 config(含 thread_id, checkpoint_ns, checkpoint_id)获取最近一个 checkpoint。Redis 中我们用
HGET session:{thread_id} latest_checkpoint获取最新 checkpoint_id,再用HGET session:{thread_id} cp_{checkpoint_id}读取完整 state。 - put_tuple:保存新 checkpoint。这里必须用
HSET session:{thread_id} cp_{new_id} "{json}"+HSET session:{thread_id} latest_checkpoint {new_id}两步原子操作,否则并发写入时 latest_checkpoint 可能指向旧数据。我们用 Lua 脚本封装这两步,确保原子性。 - list_checkpoints:按时间倒序列出所有 checkpoint。Redis 无原生时间索引,我们采用
ZADD session:{thread_id}:checkpoints {timestamp} cp_{id}维护有序集合,ZREVRANGE session:{thread_id}:checkpoints 0 9即可获取最近 10 个。 - get_next_version:生成新 checkpoint_id。不能用 UUID(太长),也不能用自增 ID(Redis 无全局自增),我们采用
{thread_id}:{timestamp_ms}:{random_4chars}格式,既保证全局唯一,又便于日志关联。
这个设计看似繁琐,但换来的是:任意时刻可精确回滚到任一 checkpoint,可导出完整执行 trace 用于模型 fine-tuning,可实时监控各节点耗时分布(从 stream 中消费 event 计算 P95)。这才是“可信赖 AI Agent”的基础设施底座。
3. 核心实现细节:从零搭建一个带 Redis Checkpoint 的 LangGraph Agent
3.1 环境准备与依赖安装:避开版本陷阱
LangGraph 和 Redis 的版本兼容性极关键。截至 2024 年中,LangGraph 0.1.42 + redis-py 4.6.0 是目前最稳定的组合。不要用 LangGraph 0.2.x(API 大幅重构,checkpointer 接口不兼容),也不要升级 redis-py 到 5.x(其连接池在异步场景下有已知死锁 bug)。安装命令必须严格如下:
pip install "langgraph==0.1.42" "redis==4.6.0" "langchain-openai" "pydantic==2.6.4"注意:pydantic 版本必须锁定为 2.6.4。LangGraph 0.1.x 基于 Pydantic v2,而很多新项目默认装 v2.7+,其
model_dump()行为变更会导致 state 序列化失败(字段顺序错乱、None 值被过滤)。我们曾因此排查了两天,最终发现是 pydantic 升级导致的静默故障。
Python 环境建议使用 3.10 或 3.11(3.12 对某些 C 扩展支持不完善)。Redis 服务端最低要求 7.0(需支持 Streams 和 JSON 数据类型),推荐 Docker 一键启动:
docker run -d --name redis-agent -p 6379:6379 -e REDIS_ARGS="--save 60 1 --maxmemory 4gb --maxmemory-policy allkeys-lru" redis:7.0-alpine提示:
--save 60 1表示每 60 秒至少 1 次修改就触发 RDB 持久化,避免意外宕机丢数据;--maxmemory-policy allkeys-lru确保内存满时自动淘汰最久未用的 session,比 volatile-lru 更安全(所有 key 都应有 TTL)。
3.2 定义 State Schema:TypedDict 是你的第一道防线
LangGraph 要求 state 必须是 TypedDict 子类,且所有字段需标注类型。这是强制的,但也是最值得投入的时间。我们以客服 Agent 为例,定义如下:
from typing import Annotated, List, Optional, Dict, Any from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.redis import RedisSaver from pydantic import BaseModel class Message(BaseModel): role: str content: str timestamp: float class AgentState(TypedDict): # 必须字段:LangGraph 内置要求 messages: Annotated[List[Message], operator.add] # 支持 += 追加 # 业务字段:全部显式声明,不可遗漏 user_id: str session_id: str current_step: int retry_count: Annotated[int, operator.add] # 支持累加 last_tool_name: Optional[str] intent_confidence: float # 自定义元数据,用于后续分析 metadata: Dict[str, Any]关键细节:
Annotated[List[Message], operator.add]这个写法至关重要。它告诉 LangGraph:当多个节点都向messages字段写入时,应自动合并(而非覆盖)。operator.add是 Python 的+操作符,意味着messages += [new_msg]会被正确处理。如果漏掉Annotated,LangGraph 会静默覆盖整个 list,导致历史消息丢失——这是新手踩坑率最高的地方。
3.3 构建 Checkpointer:RedisSaver 的深度配置
LangGraph 官方提供了RedisSaver,但它开箱即用的配置在生产环境远远不够。我们必须手动注入连接池、设置 TTL、启用 Stream 日志。以下是经过压测验证的配置:
import redis from langgraph.checkpoint.redis import RedisSaver from redis.connection import ConnectionPool # 创建连接池:关键参数 max_connections=500, socket_timeout=1, retry_on_timeout=True pool = ConnectionPool( host="localhost", port=6379, db=0, max_connections=500, socket_timeout=1, # 1ms 超时,避免阻塞 socket_connect_timeout=1, retry_on_timeout=True, health_check_interval=30, # 每30秒健康检查 ) # 初始化 RedisSaver,传入 pool 而非 url checkpointer = RedisSaver( connection_pool=pool, # 启用 Stream 日志,topic 名为 "agent_events" stream_key="agent_events", # 所有 state 默认 TTL 24 小时,单位秒 ttl=86400, # 自定义序列化器,处理 datetime 等非 JSON 类型 serializer=lambda x: json.dumps(x, default=str).encode(), deserializer=lambda x: json.loads(x.decode()), )实操心得:
socket_timeout=1是灵魂参数。LangGraph 的 checkpointer 调用是同步阻塞的,如果 Redis 响应慢,整个 Agent 请求就会卡住。设为 1ms 意味着:只要 Redis 在 1ms 内没返回,就立即抛出redis.TimeoutError,由上层捕获并降级(例如返回缓存的旧 state 或友好提示)。我们线上将此异常率控制在 0.02% 以内,远低于业务可接受阈值。
3.4 编排 Graph:节点、边、条件的工业级写法
一个健壮的 Agent 图不能只有call_llm和END。我们以“多轮产品咨询”场景为例,构建 5 个节点:
from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langgraph.graph import StateGraph, START, END llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.2) # Node 1: 解析用户意图(轻量级,快速) def parse_intent(state: AgentState) -> AgentState: # 使用小型模型或规则引擎,避免每次调大模型 user_msg = state["messages"][-1].content # ... 实现意图分类逻辑 return {"intent_confidence": 0.92, "metadata": {"intent": "price_inquiry"}} # Node 2: 调用知识库工具(可能失败,需重试) @tool def search_knowledge_base(query: str) -> str: # ... 实际搜索逻辑 pass def call_tool(state: AgentState) -> AgentState: try: result = search_knowledge_base.invoke(state["messages"][-1].content) return { "messages": [Message(role="tool", content=result, timestamp=time.time())], "last_tool_name": "search_knowledge_base", "retry_count": 0 # 成功则重置重试计数 } except Exception as e: # 工具调用失败,增加重试计数 new_count = state.get("retry_count", 0) + 1 if new_count >= 3: return {"messages": [Message(role="system", content="服务暂时不可用,请稍后再试", timestamp=time.time())]} return {"retry_count": new_count} # Node 3: LLM 综合生成(核心节点) def generate_response(state: AgentState) -> AgentState: # 构造 prompt,注入当前 state 信息 prompt = f"""你是一个专业客服。用户意图:{state['metadata']['intent']},置信度:{state['intent_confidence']}。 已获取信息:{state['messages'][-3:] if len(state['messages']) > 3 else state['messages']} 请给出简洁、准确、友好的回复。""" response = llm.invoke(prompt) return {"messages": [Message(role="assistant", content=response.content, timestamp=time.time())]} # Node 4: 用户确认(需等待用户输入) def await_user_confirmation(state: AgentState) -> AgentState: # 此节点不生成回复,只标记等待状态 return {"current_step": state["current_step"] + 1} # Node 5: 结束流程 def end_conversation(state: AgentState) -> AgentState: return {"messages": [Message(role="system", content="感谢咨询,再见!", timestamp=time.time())]} # 构建图 builder = StateGraph(AgentState) builder.add_node("parse_intent", parse_intent) builder.add_node("call_tool", call_tool) builder.add_node("generate_response", generate_response) builder.add_node("await_confirmation", await_user_confirmation) builder.add_node("end", end_conversation) # 添加边(固定流向) builder.add_edge(START, "parse_intent") builder.add_edge("parse_intent", "call_tool") builder.add_edge("call_tool", "generate_response") builder.add_edge("generate_response", "await_confirmation") # 条件边:根据用户消息内容决定是否结束 def route_after_confirmation(state: AgentState) -> str: last_msg = state["messages"][-1] if "确认" in last_msg.content or "是的" in last_msg.content: return "end" elif "再想想" in last_msg.content or "不" in last_msg.content: return "generate_response" # 重新生成方案 else: return "await_confirmation" # 继续等待 builder.add_conditional_edges( "await_confirmation", route_after_confirmation, { "end": "end", "generate_response": "generate_response", "await_confirmation": "await_confirmation" } ) builder.add_edge("end", END) # 编译图,注入 checkpointer graph = builder.compile(checkpointer=checkpointer)关键技巧:
route_after_confirmation函数必须返回字符串(节点名),且所有分支必须在add_conditional_edges的字典中明确定义。我们曾因返回None导致图无限循环,LangGraph 不报错,只静默卡死——务必在开发阶段用graph.get_graph().draw_mermaid_png()生成流程图可视化验证。
3.5 生产级调用:如何发起一次带状态恢复的请求
调用不是graph.invoke()就完事。生产环境必须处理:session 初始化、状态恢复、超时控制、错误降级。标准调用模板如下:
from langgraph.checkpoint.base import CheckpointTuple from langgraph.errors import GraphRecursionError def invoke_agent(user_id: str, session_id: str, user_input: str, timeout: float = 30.0) -> dict: # 1. 构造 config:必须包含 thread_id(即 session_id)和 configurable(可选) config = { "configurable": { "thread_id": session_id, "user_id": user_id, } } # 2. 尝试从 Redis 加载最新 state try: # get_tuple 返回 CheckpointTuple,包含 checkpoint 和 metadata checkpoint_tuple = checkpointer.get_tuple(config) if checkpoint_tuple is None: # 新 session,初始化 state initial_state = AgentState( messages=[Message(role="user", content=user_input, timestamp=time.time())], user_id=user_id, session_id=session_id, current_step=0, retry_count=0, last_tool_name=None, intent_confidence=0.0, metadata={} ) else: # 恢复 state,并追加新消息 initial_state = checkpoint_tuple.state.copy() initial_state["messages"].append(Message(role="user", content=user_input, timestamp=time.time())) except Exception as e: # Redis 不可用时的降级策略:创建全新 state logger.warning(f"Redis unavailable for session {session_id}, using fresh state: {e}") initial_state = AgentState( messages=[Message(role="user", content=user_input, timestamp=time.time())], user_id=user_id, session_id=session_id, current_step=0, retry_count=0, last_tool_name=None, intent_confidence=0.0, metadata={} ) # 3. 执行图,设置超时 try: result = graph.invoke(initial_state, config, timeout=timeout) return {"status": "success", "response": result["messages"][-1].content} except GraphRecursionError: # 防止无限循环,强制终止 return {"status": "error", "response": "系统繁忙,请稍后再试"} except Exception as e: logger.error(f"Graph execution failed for {session_id}: {e}") return {"status": "error", "response": "服务异常,请联系管理员"} # 使用示例 resp = invoke_agent(user_id="U123", session_id="S456", user_input="你们最新款手机多少钱?") print(resp["response"]) # "您好,最新款X10售价3999元,支持分期付款。"注意事项:
graph.invoke()的timeout参数是整个图执行的总超时,不是单个节点。我们线上设为 30 秒,因为 GPT-4 Turbo 的 P95 响应约 2.3 秒,加上 Redis 操作和工具调用,30 秒足够覆盖 99.9% 的请求。若超时,LangGraph 会自动中断并释放资源,不会造成连接泄漏。
4. 实战问题排查与避坑指南:那些文档里不会写的血泪教训
4.1 Redis 连接池耗尽:症状、根因与热修复
现象:Agent 服务突然大量 500 错误,日志显示redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.,但 Redis 服务明明正常。
根因分析:不是 Redis 挂了,而是连接池被耗尽。LangGraph 的RedisSaver在每次get_tuple/put_tuple时都会从连接池获取一个连接,执行完归还。但如果某个节点(如call_tool)执行时间过长(例如知识库搜索超时),连接会被长时间占用。当并发请求激增,连接池满(我们设 max_connections=500),新请求就无法获取连接,触发ConnectionError。
热修复方案(立即生效):
- 临时降低
max_connections到 200,减少单实例压力; - 在
call_tool节点增加硬性超时:search_knowledge_base.with_config(timeout=5.0); - 修改连接池参数:
block=True, max_idle_time=60, idle_check_interval=10,让空闲连接自动回收。
根治方案:
- 为
RedisSaver单独创建一个专用连接池(不与业务 Redis 共享); - 在
put_tuple后强制调用connection_pool.disconnect()(不推荐,影响性能); - 最佳实践:改用
redis.asyncio.Redis+ 异步 checkpointer(LangGraph 0.1.42 已支持),彻底解除 I/O 阻塞。我们已在灰度环境验证,QPS 提升 3.2 倍,连接池占用下降 80%。
4.2 State 序列化失败:JSON 不支持 datetime 的终极解法
现象:put_tuple抛出TypeError: Object of type datetime is not JSON serializable,即使你在Message模型里写了timestamp: float。
原因:Pydantic 的model_dump()在某些版本下会将float字段误识别为datetime(尤其当值是时间戳整数时)。更隐蔽的是,LangGraph 内部某些 debug 日志会偷偷往 state 里塞datetime.now()。
解决方案(三重保险):
- 自定义序列化器(必须):
def safe_serializer(obj): if isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, float): return round(obj, 3) # 防止浮点精度问题 elif hasattr(obj, 'model_dump'): return obj.model_dump() else: return obj checkpointer = RedisSaver( connection_pool=pool, serializer=lambda x: json.dumps(x, default=safe_serializer).encode(), deserializer=lambda x: json.loads(x.decode()) )- State 定义时禁用 datetime:所有时间字段强制用
float或str,绝不出现datetime类型; - Graph 编译时开启 debug:
builder.compile(debug=True),它会在每个节点执行前后打印 state,第一时间发现非法类型注入。
4.3 多用户并发下的状态污染:thread_id 设计的生死线
现象:用户 A 的 session 里出现了用户 B 的消息,或retry_count在不同 session 间错乱增长。
根因:configurable.thread_id设置错误。常见错误:
- 用
user_id代替session_id作为 thread_id(一个用户可能有多个并发会话); - 在 Web 框架(如 FastAPI)中,
thread_id从 request header 读取,但 header 名拼写错误(如X-Session-ID写成X-Session-Id),导致所有请求 fallback 到同一个默认 thread_id; - 前端未正确传递 session_id,后端生成随机 ID 但未返回给前端,导致下次请求用新 ID。
验证方法:
- 在 Redis 中执行
HKEYS session:*,检查 key 数量是否与在线用户数匹配; - 对单个 session 执行
HGETALL session:S123,查看latest_checkpoint和cp_*字段内容是否合理; - 在
invoke_agent函数开头打日志:logger.info(f"Using thread_id: {config['configurable']['thread_id']}"),与前端日志比对。
强制规范:
thread_id必须由前端生成(UUID v4),后端只校验格式,绝不生成;- 所有 API 响应头必须包含
X-Session-ID: {thread_id},前端存储于 localStorage 并在下次请求带上; - 后端中间件统一提取
X-Session-ID,缺失则返回 400,绝不 fallback。
4.4 Checkpoint 回滚失效:为什么 get_tuple 总是返回 None?
现象:调用checkpointer.get_tuple(config)总是返回None,即使 Redis 里明确存在session:S123的 key。
排查清单(按优先级排序):
| 检查项 | 命令/方法 | 说明 |
|---|---|---|
| 1. Config 格式是否正确 | print(config) | configurable必须是 dict,且必须含thread_id字段,不能是thread_id: "S123"的扁平结构 |
| 2. Redis key 是否存在 | redis-cli HKEYS session:S123 | 若无输出,说明put_tuple从未成功执行 |
| 3. latest_checkpoint 是否为空 | redis-cli HGET session:S123 latest_checkpoint | 若返回(nil),说明put_tuple的第二步失败(HSET session:S123 latest_checkpoint {id}未执行) |
| 4. TTL 是否已过期 | redis-cli TTL session:S123 | 若返回-2,key 已被删除;-1表示永不过期(配置错误) |
| 5. Stream 是否启用 | redis-cli XINFO STREAM agent_events | 若报错ERR no such key,说明 Stream 初始化失败,需检查stream_key参数 |
高频修复:90% 的 case 是第 1 项。LangGraph 要求 config 格式为{"configurable": {"thread_id": "S123"}},而很多开发者误写成{"thread_id": "S123"},导致get_tuple根本找不到对应 key。
5. 运维与扩展:如何让这套系统扛住百万级 session
5.1 Redis 集群化:从单机到 Cluster 的平滑迁移
单机 Redis 适合验证,但生产必须集群。LangGraph 的RedisSaver原生支持 Redis Cluster,但需注意:
- Key 设计必须支持哈希:所有 key 必须包含
{...}标签,如session:{S123},这样 Cluster 会根据{S123}计算 slot,确保同一 session 的所有 key(session:{S123},session:{S123}:checkpoints)落在同一节点。 - 连接池必须用 ClusterConnectionPool:
from redis.cluster import RedisCluster, ClusterConnectionPool pool = ClusterConnectionPool( startup_nodes=[{"host": "node1", "port": 6379}, {"host": "node2", "port": 6379}], max_connections=1000, skip_full_coverage_check=True, ) checkpointer = RedisSaver(connection_pool=pool)- Stream 分片需手动处理:Redis Cluster 不支持跨节点 Stream 操作。我们的方案是:
agent_eventsStream 不分片,放在一个专用节点(用--cluster-require-full-coverage no启动),其他 session data 分片。实测 8 节点集群支撑 120 万 session,P99 延迟 < 2ms。
5.2 状态清理策略:避免 Redis 内存爆炸
不清理的 Redis 是定时炸弹。我们采用三级清理:
- 自动 TTL:所有
session:{id}key 设置EXPIRE 86400,session:{id}:checkpoints设置EXPIRE 172800(48 小时); - 后台扫描清理:每小时执行 Lua 脚本,扫描
session:*keys,删除latest_checkpoint为空或超过 7 天无更新的 key; - 冷数据归档:对
session:{id}:checkpoints中超过 30 天的旧 checkpoint,用HGETALL读出,压缩为 gzip 存入 S3,再HDEL删除。归档脚本已开源在 GitHub(链接略)。
5.3 监控告警体系:五个必须盯死的核心指标
在 Prometheus + Grafana 中,我们部署了以下看板:
| 指标 | 查询语句 | 告警阈值 | 说明 |
|---|---|---|---|
| Redis 连接池使用率 | redis_connected_clients{job="redis"} / redis_config_maxclients{job="redis"} | > 90% | 预示连接耗尽风险 |
| Checkpointer P95 延迟 | histogram_quantile(0.95, sum(rate(redis_saver_duration_seconds_bucket[1h])) by (le)) | > 5ms | 直接影响 Agent 响应速度 |
| State 序列化失败率 | rate(langgraph_checkpointer_serialize_errors_total[1h]) | > 0.1% | 暗示 state schema 或序列化器问题 |
| Session TTL 过期率 | rate(redis_expired_keys_total[1h]) | 突增 300% | 可能是 TTL 设置过短或用户行为异常 |
| Stream pending count | redis_stream_pending{stream="agent_events"} | > 1000 | Stream 消费者(如日志分析服务)滞后 |
实操心得:第一个告警(连接池使用率)必须设置为电话告警,因为它往往是其他问题的前兆。我们曾通过此告警提前 17 分钟发现 Redis 内存泄漏,避免了一次 P0 故障。
5.4 向 LangGraph 0.2.x 迁移的路线图:现在该做什么?
LangGraph 0.2.x 已发布,API 彻底重构,但RedisSaver的核心逻辑不变。我们的迁移策略是:
- 短期(1个月内):冻结 LangGraph 版本,专注优化现有 0.1.x 系统的稳定性;
- 中期(2~3个月):在测试环境部署 0.2.x,使用其新的
AsyncCheckpointSaver接口重写 Redis saver,利用asyncio提升吞吐; - 长期(6个月):切换至 0.2.x,享受其内置的
StateGraph版本管理、interrupt中断机制、以及与 LangChain 0.2 的深度集成。
重要提醒:不要急于升级!0.2.x 的
StateGraph不再接受TypedDict,改为BaseModel,且add_node语法巨变。我们评估过,全量迁移需 3 人周,且必须配合 LLM 接口升级(LangChain 0.2 要求 OpenAI 1.0+)。稳住 0.1.x,就是稳住业务。
我在实际使用中发现,LangGraph + Redis 这套组合真正的价值,不在于它让你“能做”什么,而在于它帮你“拒绝”了什么——拒绝用 hack 方式维护状态,拒绝在日志里大海捞针找 bug,拒绝为每一次 session 重建付出额外成本。当你第一次看到 Redis CLI 里清晰列出session:S123的所有 checkpoint,并能用一条命令回滚到任意节点时,你会明白:AI 工程的成熟,始于对状态的敬畏。这个项目没有终点,只有持续迭代的 checkpoint。
