LangGraph实战:用SQLite和InMemoryStore给你的AI助手加上短期与长期记忆(附完整代码)
LangGraph实战:构建具备双记忆系统的AI助手
在构建对话系统时,让AI记住用户偏好和历史交互是提升体验的关键。LangGraph作为新兴的AI编排框架,其独特之处在于提供了分层记忆管理机制——通过SQLite实现短期记忆持久化,结合InMemoryStore构建长期知识库。本文将手把手教你实现一个能记住用户披萨偏好的智能助手。
1. 理解LangGraph的双层记忆架构
现代对话系统需要处理两种不同类型的记忆:
- 短期记忆:保存当前对话的上下文(如最近8条消息)
- 长期记忆:存储跨会话的用户偏好和重要事实(如"用户喜欢意大利披萨")
LangGraph通过两种技术组件实现这一架构:
| 记忆类型 | 存储方案 | 典型应用场景 | 技术实现示例 |
|---|---|---|---|
| 短期记忆 | 状态检查点 | 对话历史维护 | SqliteSaver |
| 长期记忆 | 可检索存储 | 用户偏好记忆 | InMemoryStore |
这种分离设计带来三个显著优势:
- 上下文隔离:避免不同对话线程间的记忆污染
- 性能优化:短期记忆高频更新,长期记忆按需检索
- 成本控制:仅将重要信息存入长期存储
2. 环境准备与依赖安装
开始前确保Python≥3.12环境,并安装必要依赖:
pip install langgraph langchain-openai langgraph-checkpoint-sqlite项目结构建议如下:
/pizza_assistant ├── memory_demo.py # 主程序 ├── memory.sqlite # SQLite数据库文件 └── requirements.txt关键依赖说明:
langgraph-checkpoint-sqlite:提供SQLite-backed的检查点存储langchain-openai:可选,用于语义检索的嵌入模型
3. 实现短期记忆系统
短期记忆的核心是维护对话状态。我们通过TypedDict定义状态结构:
from typing import TypedDict, Annotated, List from operator import add class FlowState(TypedDict, total=False): user_id: str messages: Annotated[List[str], add] # 自动合并消息列表 retrieved: List[str] # 长期记忆检索结果3.1 配置SQLite检查点
import sqlite3 from langgraph.checkpoint.sqlite import SqliteSaver def build_app(db_path="memory.sqlite"): conn = sqlite3.connect(db_path, check_same_thread=False) saver = SqliteSaver(conn) # 构建图时会传入这个saver graph = StateGraph(FlowState) ... return graph.compile(checkpointer=saver)这段代码创建了自动持久化的对话状态管理器,具有以下特性:
- 每次状态变更自动保存到SQLite
- 支持通过thread_id恢复历史对话
- 内置压缩机制防止数据膨胀
3.2 对话历史修剪策略
为防止上下文窗口爆炸,需要实现消息修剪逻辑:
MAX_MSG = 8 # 保留最近8条消息 def trim_messages(state: FlowState): messages = state.get("messages", []) if len(messages) <= MAX_MSG: return state # 保留最新消息并生成摘要 kept = messages[-MAX_MSG:] summary = f"历史压缩:已省略{len(messages)-MAX_MSG}条早期消息" return {**state, "messages": kept, "summary": summary}4. 构建长期记忆系统
长期记忆的核心是支持语义检索的存储系统。我们使用InMemoryStore:
from langgraph.store.memory import InMemoryStore store = InMemoryStore( index={ "embed": OpenAIEmbeddings(), # 语义嵌入模型 "dims": 768, # 向量维度 "fields": ["text"] # 被索引字段 } )4.1 记忆写入机制
当用户表达重要偏好时,将其存入长期记忆:
def save_preference(user_id: str, utterance: str): key = f"pref:{hash(utterance)}" # 生成唯一键 store.put( namespace=("user", user_id), # 按用户隔离 key=key, value={"text": utterance, "type": "preference"} )4.2 语义检索实现
def search_memory(user_id: str, query: str, limit=3): return list(store.search( namespace=("user", user_id), query=query, limit=limit ))检索过程会自动:
- 将查询文本向量化
- 计算与存储内容的相似度
- 返回最相关的记忆项
5. 完整工作流集成
现在将两层记忆系统整合到对话流程中:
graph LR A[用户输入] --> B(更新短期记忆) B --> C{是否需要长期记忆} C -->|是| D[语义检索] C -->|否| E[生成响应] D --> E E --> F{是否重要信息} F -->|是| G[存入长期记忆] F -->|否| H[结束]对应代码实现:
from langgraph.graph import StateGraph, START, END builder = StateGraph(FlowState) # 定义节点 builder.add_node("ingest", process_input) builder.add_node("retrieve", retrieve_memories) builder.add_node("generate", generate_response) # 构建流程 builder.add_edge(START, "ingest") builder.add_edge("ingest", "retrieve") builder.add_edge("retrieve", "generate") builder.add_edge("generate", END) # 编译应用 app = builder.compile( checkpointer=saver, # 短期记忆 store=store # 长期记忆 )6. 实战测试
启动对话测试:
# 第一次对话:表达偏好 result1 = app.invoke( {"user_id": "alice"}, config={"configurable": {"thread_id": "t1", "say": "我爱意大利披萨"}} ) # 第二次对话:利用记忆 result2 = app.invoke( {"user_id": "alice"}, config={"configurable": {"thread_id": "t2", "say": "推荐晚餐吃什么"}} ) print(result2["retrieved"]) # 输出: ["我爱意大利披萨"]7. 高级优化技巧
7.1 混合检索策略
def hybrid_search(user_id, query): # 先尝试语义搜索 results = semantic_search(user_id, query) if not results: # 回退到关键词匹配 results = keyword_search(user_id, query) return results7.2 记忆加权机制
def save_with_weight(user_id, key, value, weight=1.0): store.put( namespace=("user", user_id), key=key, value={**value, "_weight": weight}, metadata={"last_accessed": datetime.now()} )7.3 定期记忆整理
def cleanup_memory(user_id): old_items = store.scan( namespace=("user", user_id), filter=lambda x: x.metadata.last_accessed < (NOW - timedelta(days=30)) ) for item in old_items: store.delete(namespace, item.key)8. 生产环境建议
- 持久化方案:将InMemoryStore替换为Redis或PostgreSQL
- 嵌入模型:使用本地化模型如bge-small避免API依赖
- 监控指标:
- 记忆命中率
- 检索延迟
- 存储增长趋势
- 测试策略:
- 记忆一致性测试
- 跨会话上下文保持测试
- 压力测试:模拟长时间对话
# 监控装饰器示例 def monitor_memory(func): def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) latency = time.time() - start monitor.gauge("memory_op_latency", latency) return result return wrapper这套实现不仅适用于披萨推荐场景,经过适当调整可应用于:
- 个性化购物助手
- 医疗咨询机器人
- 教育领域的自适应学习系统
关键是要根据具体场景调整记忆的存储策略和检索方式。比如电商场景可能需要增加产品ID关联,而教育场景则需要知识点关联度计算。
