当前位置: 首页 > news >正文

LangGraph实战:用SQLite和InMemoryStore给你的AI助手加上短期与长期记忆(附完整代码)

LangGraph实战:构建具备双记忆系统的AI助手

在构建对话系统时,让AI记住用户偏好和历史交互是提升体验的关键。LangGraph作为新兴的AI编排框架,其独特之处在于提供了分层记忆管理机制——通过SQLite实现短期记忆持久化,结合InMemoryStore构建长期知识库。本文将手把手教你实现一个能记住用户披萨偏好的智能助手。

1. 理解LangGraph的双层记忆架构

现代对话系统需要处理两种不同类型的记忆:

  • 短期记忆:保存当前对话的上下文(如最近8条消息)
  • 长期记忆:存储跨会话的用户偏好和重要事实(如"用户喜欢意大利披萨")

LangGraph通过两种技术组件实现这一架构:

记忆类型存储方案典型应用场景技术实现示例
短期记忆状态检查点对话历史维护SqliteSaver
长期记忆可检索存储用户偏好记忆InMemoryStore

这种分离设计带来三个显著优势:

  1. 上下文隔离:避免不同对话线程间的记忆污染
  2. 性能优化:短期记忆高频更新,长期记忆按需检索
  3. 成本控制:仅将重要信息存入长期存储

2. 环境准备与依赖安装

开始前确保Python≥3.12环境,并安装必要依赖:

pip install langgraph langchain-openai langgraph-checkpoint-sqlite

项目结构建议如下:

/pizza_assistant ├── memory_demo.py # 主程序 ├── memory.sqlite # SQLite数据库文件 └── requirements.txt

关键依赖说明:

  • langgraph-checkpoint-sqlite:提供SQLite-backed的检查点存储
  • langchain-openai:可选,用于语义检索的嵌入模型

3. 实现短期记忆系统

短期记忆的核心是维护对话状态。我们通过TypedDict定义状态结构:

from typing import TypedDict, Annotated, List from operator import add class FlowState(TypedDict, total=False): user_id: str messages: Annotated[List[str], add] # 自动合并消息列表 retrieved: List[str] # 长期记忆检索结果

3.1 配置SQLite检查点

import sqlite3 from langgraph.checkpoint.sqlite import SqliteSaver def build_app(db_path="memory.sqlite"): conn = sqlite3.connect(db_path, check_same_thread=False) saver = SqliteSaver(conn) # 构建图时会传入这个saver graph = StateGraph(FlowState) ... return graph.compile(checkpointer=saver)

这段代码创建了自动持久化的对话状态管理器,具有以下特性:

  • 每次状态变更自动保存到SQLite
  • 支持通过thread_id恢复历史对话
  • 内置压缩机制防止数据膨胀

3.2 对话历史修剪策略

为防止上下文窗口爆炸,需要实现消息修剪逻辑:

MAX_MSG = 8 # 保留最近8条消息 def trim_messages(state: FlowState): messages = state.get("messages", []) if len(messages) <= MAX_MSG: return state # 保留最新消息并生成摘要 kept = messages[-MAX_MSG:] summary = f"历史压缩:已省略{len(messages)-MAX_MSG}条早期消息" return {**state, "messages": kept, "summary": summary}

4. 构建长期记忆系统

长期记忆的核心是支持语义检索的存储系统。我们使用InMemoryStore:

from langgraph.store.memory import InMemoryStore store = InMemoryStore( index={ "embed": OpenAIEmbeddings(), # 语义嵌入模型 "dims": 768, # 向量维度 "fields": ["text"] # 被索引字段 } )

4.1 记忆写入机制

当用户表达重要偏好时,将其存入长期记忆:

def save_preference(user_id: str, utterance: str): key = f"pref:{hash(utterance)}" # 生成唯一键 store.put( namespace=("user", user_id), # 按用户隔离 key=key, value={"text": utterance, "type": "preference"} )

4.2 语义检索实现

def search_memory(user_id: str, query: str, limit=3): return list(store.search( namespace=("user", user_id), query=query, limit=limit ))

检索过程会自动:

  1. 将查询文本向量化
  2. 计算与存储内容的相似度
  3. 返回最相关的记忆项

5. 完整工作流集成

现在将两层记忆系统整合到对话流程中:

graph LR A[用户输入] --> B(更新短期记忆) B --> C{是否需要长期记忆} C -->|是| D[语义检索] C -->|否| E[生成响应] D --> E E --> F{是否重要信息} F -->|是| G[存入长期记忆] F -->|否| H[结束]

对应代码实现:

from langgraph.graph import StateGraph, START, END builder = StateGraph(FlowState) # 定义节点 builder.add_node("ingest", process_input) builder.add_node("retrieve", retrieve_memories) builder.add_node("generate", generate_response) # 构建流程 builder.add_edge(START, "ingest") builder.add_edge("ingest", "retrieve") builder.add_edge("retrieve", "generate") builder.add_edge("generate", END) # 编译应用 app = builder.compile( checkpointer=saver, # 短期记忆 store=store # 长期记忆 )

6. 实战测试

启动对话测试:

# 第一次对话:表达偏好 result1 = app.invoke( {"user_id": "alice"}, config={"configurable": {"thread_id": "t1", "say": "我爱意大利披萨"}} ) # 第二次对话:利用记忆 result2 = app.invoke( {"user_id": "alice"}, config={"configurable": {"thread_id": "t2", "say": "推荐晚餐吃什么"}} ) print(result2["retrieved"]) # 输出: ["我爱意大利披萨"]

7. 高级优化技巧

7.1 混合检索策略

def hybrid_search(user_id, query): # 先尝试语义搜索 results = semantic_search(user_id, query) if not results: # 回退到关键词匹配 results = keyword_search(user_id, query) return results

7.2 记忆加权机制

def save_with_weight(user_id, key, value, weight=1.0): store.put( namespace=("user", user_id), key=key, value={**value, "_weight": weight}, metadata={"last_accessed": datetime.now()} )

7.3 定期记忆整理

def cleanup_memory(user_id): old_items = store.scan( namespace=("user", user_id), filter=lambda x: x.metadata.last_accessed < (NOW - timedelta(days=30)) ) for item in old_items: store.delete(namespace, item.key)

8. 生产环境建议

  1. 持久化方案:将InMemoryStore替换为Redis或PostgreSQL
  2. 嵌入模型:使用本地化模型如bge-small避免API依赖
  3. 监控指标
    • 记忆命中率
    • 检索延迟
    • 存储增长趋势
  4. 测试策略
    • 记忆一致性测试
    • 跨会话上下文保持测试
    • 压力测试:模拟长时间对话
# 监控装饰器示例 def monitor_memory(func): def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) latency = time.time() - start monitor.gauge("memory_op_latency", latency) return result return wrapper

这套实现不仅适用于披萨推荐场景,经过适当调整可应用于:

  • 个性化购物助手
  • 医疗咨询机器人
  • 教育领域的自适应学习系统

关键是要根据具体场景调整记忆的存储策略和检索方式。比如电商场景可能需要增加产品ID关联,而教育场景则需要知识点关联度计算。

http://www.jsqmd.com/news/646509/

相关文章:

  • Python与AKShare实战:构建A股板块轮动监测系统
  • 家庭宽带+旧电脑也能赚钱?手把手教你搭建24小时挂机副业
  • springboot酒店管理系统小程序(文档+源码)_kaic
  • TypeScript的infer推断联合类型的分布条件类型
  • 【多模态大模型容灾备份黄金标准】:20年AI基础设施专家亲授3层异构备份架构与RTO<2分钟实战方案
  • OpenModelica进阶技巧:如何导入第三方库并运行ExothermicReaction案例
  • 电子工程师必看:深度负反馈电路的5个实战应用技巧(附电路图)
  • 告别复杂操作!Win11 OpenClaw一键部署,本地AI自动干活,小白也能上手
  • Jellyfin Android TV客户端版本兼容性终极指南:如何解决连接失败问题
  • 射频工程师的脚本利器:如何用Matlab自动处理ADS仿真数据,优化双输入Doherty功放性能
  • 基于ECMS的混合动力汽车Simulink模型:能量管理研究之利器
  • SQL如何简化长SQL子查询结构_利用CTE公用表表达式优化
  • AI设计助手真能替代UI/UX设计师?2026奇点大会实测数据揭示人机协同临界点
  • AI爆火!产品经理的逆袭之路:掌握这5大技能,升职加薪不是梦!
  • 别再死记硬背了!用Java Socket写一个能翻译的UDP词典服务器(附完整源码)
  • OfflineInsiderEnroll:无需微软账户,Windows预览版体验终极方案
  • HGDB创建只读用户
  • 多模态LLM推理链路混沌实验全记录,深度复现跨模态对齐失效、特征坍缩与token洪水攻击
  • 从零搭建飞控仿真:手把手教你用Simulink实现姿态角速度到机体角速度的转换模块
  • GD32H7 SPI驱动实战:手把手教你用SPI3连接外部Flash(W25Q128)并实现读写
  • 2026奇点智能技术大会前瞻(全球仅8家获准接入的新闻生成API首次披露)
  • 2026年4月成都装修公司十大实力排行:口碑、工艺、环保与报价透明全维度深度测评解析 - 成都人评鉴
  • swoole的onConnect, onReceive, onClose 什么时候触发的庖丁解牛
  • MySQL8.0窗口函数实战:从基础语法到高级数据分析场景
  • 手把手教你用SHAP给Stacking模型“做体检”:两种可视化思路全解析(含Python避坑指南)
  • 云原生时代的可观测性平台构建与日志链路追踪
  • 从训练到上架:手把手完成一个Android端PaddleOCR v5移动识别应用
  • 别再手动调色了!用Matlab bar3和colormap实现数据高度自动赋色(附完整代码)
  • PX4飞控调试新思路:告别printf,用UART7串口打造你的专属调试信息通道
  • 生成式AI数据飞轮构建全链路拆解(从标注→反馈→迭代→跃迁的工业级路径)