当前位置: 首页 > news >正文

LangChain作业四---Memory 综合实战:构建具备短期 + 长期记忆的聊天机器人

Memory 综合实战:构建具备短期 + 长期记忆的聊天机器人

需求分解

核心功能:

  1. 多轮对话 + 短期记忆:使用 Redis 记住当前会话的上下文

  2. 自动记忆用户偏好:从对话中提取用户信息

  3. 个性化回复:从 PostgreSQL 读取长期画像,定制回复风格

  4. 任务连续性:跨会话保持重要信息

架构设计

存储职责划分:

存储

职责

数据类型

生命周期

Redis

短期记忆

对话消息列表

会话级,支持 TTL 过期

PostgreSQL

长期记忆

用户画像 JSON

永久,跨会话保留

环境准备

安装依赖:

# Redis 相关 uv add redis langchain-community # PostgreSQL 相关 uv add psycopg2-binary sqlalchemy # 或者使用 asyncpg(异步版本) # uv add asyncpg sqlalchemy[asyncio]

PostgreSQL 建表语句:

"C:\Program Files\PostgreSQL\18\bin\psql.exe" --version "C:\Program Files\PostgreSQL\18\bin\psql.exe" -U postgres
-- 创建用户画像表 CREATE TABLE IF NOT EXISTS user_profiles ( user_id VARCHAR(100) PRIMARY KEY, name VARCHAR(100), occupation VARCHAR(200), domain_knowledge JSONB DEFAULT '[]', current_project VARCHAR(500), preferences JSONB DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 创建更新时间触发器 CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW; END; $$ language 'plpgsql'; CREATE TRIGGER update_user_profiles_updated_at BEFORE UPDATE ON user_profiles FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -- 创建索引(可选,提高查询性能) CREATE INDEX idx_user_profiles_name ON user_profiles(name); CREATE INDEX idx_user_profiles_occupation ON user_profiles(occupation);

完整代码实现

""" Memory 综合实战:具备短期 + 长期记忆的聊天机器人 - 短期记忆:Redis(对话历史) - 长期记忆:PostgreSQL(用户画像) """ import os from datetime import datetime from typing import Optional from dotenv import load_dotenv # LangChain 相关 from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import JsonOutputParser, StrOutputParser from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_core.runnables import RunnableLambda from langchain_community.chat_message_histories import RedisChatMessageHistory from pydantic import BaseModel, Field # 数据库相关 import redis from sqlalchemy import create_engine, Column, String, DateTime, JSON, text from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.engine import URL load_dotenv() # 1. 模型配置 llm = ChatOpenAI( model=os.getenv("DASHSCOPE_MODEL_NAME"), api_key=os.getenv("DASHSCOPE_API_KEY"), base_url=os.getenv("DASHSCOPE_BASE_URL"), temperature=0.3, ) # 用于信息抽取的模型(温度设为0以获得稳定输出) extract_llm = ChatOpenAI( model=os.getenv("DASHSCOPE_MODEL_NAME"), api_key=os.getenv("DASHSCOPE_API_KEY"), base_url=os.getenv("DASHSCOPE_BASE_URL"), temperature=0, ) # ==================== 2. 数据库连接配置 ==================== # Redis 连接 URL REDIS_URL = "redis://:{password}@{host}:{port}/{db}".format( password=os.getenv("REDIS_PASSWORD"), host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), db=os.getenv("REDIS_DB", "15") ) # PostgreSQL 连接 URL(使用 URL.create 自动处理特殊字符,比如密码里的 @) pg_url_object = URL.create( drivername="postgresql+psycopg2", username=os.getenv("PG_USER"), password=os.getenv("PG_PASSWORD"), host=os.getenv("PG_HOST"), port=int(os.getenv("PG_PORT")), database=os.getenv("PG_DB"), ) # SQLAlchemy 引擎和会话 engine = create_engine(pg_url_object, echo=False) SessionLocal = sessionmaker(bind=engine) Base = declarative_base() # 3. 数据模型 # Pydantic 模型(用于业务逻辑) class UserPreferences(BaseModel): """用户偏好""" response_length: str = Field(default="medium") detail_level: str = Field(default="intermediate") language: str = Field(default="zh-CN") class UserProfile(BaseModel): """用户画像(Pydantic 模型)""" user_id: str name: Optional[str] = None occupation: Optional[str] = None domain_knowledge: list[str] = Field(default_factory=list) current_project: Optional[str] = None preferences: UserPreferences = Field(default_factory=UserPreferences) created_at: Optional[datetime] = None updated_at: Optional[datetime] = None class ExtractedInfo(BaseModel): """从对话中抽取的信息""" name: Optional[str] = None occupation: Optional[str] = None skills: list[str] = Field(default_factory=list) project: Optional[str] = None has_new_info: bool = False # SQLAlchemy 模型(用于数据库 ORM) class UserProfileDB(Base): """用户画像(数据库模型)""" __tablename__ = "user_profiles" user_id = Column(String(100), primary_key=True) name = Column(String(100), nullable=True) occupation = Column(String(200), nullable=True) domain_knowledge = Column(JSON, default=list) current_project = Column(String(500), nullable=True) preferences = Column(JSON, default=dict) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # 创建表(如果不存在) Base.metadata.create_all(engine) # 4. PostgreSQL 用户画像存储 class PostgresUserProfileStore: """ 基于 PostgreSQL 的用户画像存储服务(长期记忆) """ def __init__(self): self.SessionLocal = SessionLocal def save(self, profile: UserProfile) -> None: """保存用户画像到 PostgreSQL""" with self.SessionLocal() as session: # 查找是否存在 db_profile = session.query(UserProfileDB).filter( UserProfileDB.user_id == profile.user_id ).first() if db_profile: # 更新现有记录 db_profile.name = profile.name db_profile.occupation = profile.occupation db_profile.domain_knowledge = profile.domain_knowledge db_profile.current_project = profile.current_project db_profile.preferences = profile.preferences.model_dump() else: # 创建新记录 db_profile = UserProfileDB( user_id=profile.user_id, name=profile.name, occupation=profile.occupation, domain_knowledge=profile.domain_knowledge, current_project=profile.current_project, preferences=profile.preferences.model_dump() ) session.add(db_profile) session.commit() def load(self, user_id: str) -> Optional[UserProfile]: """从 PostgreSQL 加载用户画像""" with self.SessionLocal() as session: db_profile = session.query(UserProfileDB).filter( UserProfileDB.user_id == user_id ).first() if db_profile is None: return None return UserProfile( user_id=db_profile.user_id, name=db_profile.name, occupation=db_profile.occupation, domain_knowledge=db_profile.domain_knowledge or [], current_project=db_profile.current_project, preferences=UserPreferences(**(db_profile.preferences or {})), created_at=db_profile.created_at, updated_at=db_profile.updated_at ) def get_or_create(self, user_id: str) -> UserProfile: """获取用户画像,不存在则创建""" profile = self.load(user_id) if profile is None: profile = UserProfile(user_id=user_id) self.save(profile) return profile def delete(self, user_id: str) -> bool: """删除用户画像""" with self.SessionLocal() as session: result = session.query(UserProfileDB).filter( UserProfileDB.user_id == user_id ).delete() session.commit() return result > 0 def list_all_users(self) -> list[str]: """列出所有用户 ID""" with self.SessionLocal() as session: results = session.query(UserProfileDB.user_id).all() return [r[0] for r in results] # 5. Redis 短期记忆管理 class RedisSessionManager: """ 基于 Redis 的会话管理器(短期记忆) 使用 LangChain 内置的 RedisChatMessageHistory """ def __init__(self, ttl: int = 3600): """ Args: ttl: 会话过期时间(秒),默认1小时 """ self.redis_url = REDIS_URL self.ttl = ttl def get_history(self, session_id: str) -> RedisChatMessageHistory: """ 获取会话的消息历史 Args: session_id: 会话 ID(格式建议:user_id:session_id) Returns: RedisChatMessageHistory 实例 """ return RedisChatMessageHistory( session_id=session_id, url=self.redis_url, ttl=self.ttl ) def clear_session(self, session_id: str) -> None: """清空指定会话的历史""" history = self.get_history(session_id) history.clear() # 6. 信息抽取器 class InfoExtractor: """从对话中抽取用户信息""" def __init__(self, llm): self.prompt = ChatPromptTemplate.from_messages([ ("system", """从用户消息中提取以下信息(仅提取明确提到的,不要推测): - name: 用户名字 - occupation: 职业 - skills: 技能列表 - project: 当前项目 - has_new_info: 是否包含新信息(如果消息中有任何上述信息则为 true) 输出 JSON 格式,未提到的字段为 null 或空列表。"""), ("human", "{message}") ]) self.chain = self.prompt | llm | JsonOutputParser() def extract(self, message: str) -> ExtractedInfo: """从消息中抽取用户信息""" try: result = self.chain.invoke({"message": message}) return ExtractedInfo(**result) except Exception as e: print(f"信息抽取失败: {e}") return ExtractedInfo() # 7. 聊天机器人主类 class MemoryBot: """ 具备短期 + 长期记忆的聊天机器人 - 短期记忆(Redis):存储当前会话的对话历史 - 长期记忆(PostgreSQL):存储用户画像,跨会话保留 """ def __init__(self, session_ttl: int = 3600): """ Args: session_ttl: 会话过期时间(秒),默认1小时 """ # 存储服务 self.profile_store = PostgresUserProfileStore() # 长期记忆 self.session_manager = RedisSessionManager(ttl=session_ttl) # 短期记忆 self.extractor = InfoExtractor(extract_llm) # 主对话 Prompt self.chat_prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个智能助手,具备记忆能力。 ## 用户画像(长期记忆) {user_profile} ## 注意事项 - 根据用户背景调整回复风格 - 使用用户熟悉的技术举例 - 保持自然的对话风格 - 记住当前对话的上下文"""), MessagesPlaceholder(variable_name="history"), # 短期记忆 ("human", "{input}"), ]) self.chat_chain = self.chat_prompt | llm | StrOutputParser() def _format_profile(self, profile: UserProfile) -> str: """将用户画像格式化为文本""" parts = [] if profile.name: parts.append(f"- 姓名: {profile.name}") if profile.occupation: parts.append(f"- 职业: {profile.occupation}") if profile.domain_knowledge: parts.append(f"- 技能: {', '.join(profile.domain_knowledge)}") if profile.current_project: parts.append(f"- 当前项目: {profile.current_project}") return "\n".join(parts) if parts else "暂无用户信息" def _update_profile(self, user_id: str, message: str) -> None: """从消息中抽取信息并更新长期画像""" extracted = self.extractor.extract(message) if not extracted.has_new_info: return # 从 PostgreSQL 获取现有画像 profile = self.profile_store.get_or_create(user_id) # 更新字段 if extracted.name: profile.name = extracted.name if extracted.occupation: profile.occupation = extracted.occupation if extracted.skills: # 合并技能(去重) profile.domain_knowledge = list(set( profile.domain_knowledge + extracted.skills )) if extracted.project: profile.current_project = extracted.project # 保存到 PostgreSQL self.profile_store.save(profile) def chat(self, user_id: str, session_id: str, message: str) -> str: """ 进行对话 Args: user_id: 用户 ID(用于长期画像) session_id: 会话 ID(用于短期记忆) message: 用户消息 Returns: AI 回复 """ # 1. 异步更新用户画像(长期记忆) self._update_profile(user_id, message) # 2. 从 PostgreSQL 获取用户画像 profile = self.profile_store.get_or_create(user_id) profile_text = self._format_profile(profile) # 3. 注入用户画像到 Prompt def inject_profile(inputs: dict) -> dict: return {**inputs, "user_profile": profile_text} chain_with_profile = RunnableLambda(inject_profile) | self.chat_chain # 4. 包装 Redis 短期记忆 chain_with_history = RunnableWithMessageHistory( chain_with_profile, lambda sid: self.session_manager.get_history(sid), input_messages_key="input", history_messages_key="history", ) # 5. 调用并返回 config = {"configurable": {"session_id": session_id}} response = chain_with_history.invoke({"input": message}, config=config) return response def get_profile(self, user_id: str) -> Optional[UserProfile]: """获取用户画像""" return self.profile_store.load(user_id) def clear_session(self, session_id: str) -> None: """清空会话历史""" self.session_manager.clear_session(session_id) # 8. 使用示例 def main(): """主函数 - 演示聊天机器人""" # 创建机器人(会话1小时后过期) bot = MemoryBot(session_ttl=3600) user_id = "user_001" session_id = f"{user_id}:session_001" # 建议格式:user_id:session_id print("=" * 60) print("Memory Bot - 短期记忆(Redis) + 长期记忆(PostgreSQL)") print("=" * 60) # 模拟多轮对话 conversations = [ "你好,我叫小明", "我是一名Python后端开发工程师", "我正在做一个电商推荐系统,用的是FastAPI和PostgreSQL", "我之前说我叫什么名字?做什么工作?", "帮我写一段代码,实现用户登录的API", ] for msg in conversations: print(f"\n用户: {msg}") response = bot.chat(user_id, session_id, msg) print(f"AI: {response}") print("-" * 40) # 显示最终用户画像(从 PostgreSQL 读取) print("\n" + "=" * 60) print("最终用户画像(PostgreSQL):") profile = bot.get_profile(user_id) if profile: print(profile.model_dump_json(indent=2)) # 演示跨会话记忆 print("\n" + "=" * 60) print("模拟新会话(测试长期记忆):") new_session_id = f"{user_id}:session_002" response = bot.chat(user_id, new_session_id, "你还记得我是谁吗?我在做什么项目?") print(f"用户: 你还记得我是谁吗?我在做什么项目?") print(f"AI: {response}") if __name__ == "__main__": main()
http://www.jsqmd.com/news/1075401/

相关文章:

  • ANTM股票可视化:Plotly交互+Mplfinance专业K线实战
  • LG Ultrafine 亮度调节工具:解决Windows下显示器亮度控制的智能方案
  • FIFA 23 Live Editor终极指南:打造你的完美足球世界
  • 5大核心功能深度解析:G-Helper如何让华硕笔记本性能飙升
  • 深度解析猫抓浏览器扩展:从M3U8嗅探到加密流处理的10个核心技术
  • 负责任AI工程落地:六个可编码的实践维度
  • 10104黄大年茶思屋榜文101期 第4题 大模型上下文窗口高效无损扩容技术
  • 零基础学AI人工智能:10.3 ANN人工神经网络
  • iOS安全测试框架Needle:自动化漏洞挖掘与移动应用安全评估实战指南
  • 终极AI视频插值指南:使用Flowframes轻松提升视频帧率的完整教程
  • 小红书广告视频记录
  • 遗传算法实操避坑指南:实数编码、自适应变异与精英保留
  • 量子密码分析研究
  • FPGA数据流编程与HLS优化实战指南
  • 告别打卡焦虑:5分钟掌握Android自动打卡终极方案
  • 架构设计理念与核心哲学
  • MetaboAnalystR 4.3.0架构解析:500+函数构建的代谢组学分析技术框架
  • 2026 年易柯森特:北京民营企业借工程监理优化施工管理
  • 终极指南:689款开源macOS应用全收录,打造你的专属生产力工具箱!
  • 5大核心优势:为什么LibreSignage是中小型场所数字标牌的最佳选择
  • 注塑模与冲压模
  • 当手机里的待办事项堆积如山——我在 HarmonyOS 上给列表装了个多选删除功能
  • 5分钟搞定Linux启动盘制作:Deepin Boot Maker终极指南
  • 5分钟掌握Android台球辅助神器:精准瞄准终极指南
  • 3分钟掌握Obsidian Excel表格转换:终极Markdown表格解决方案
  • 如何利用开源工具高效绕过iOS 15-16激活锁:专业解决方案指南
  • 一、前置环境校验
  • C++ NRVO
  • Mac NTFS读写终极方案:3分钟免费解决跨平台文件传输难题
  • PostgreSQL PERCENT_RANK() 窗口函数完全解析