Agent开发10个常见陷阱及避免方法(血泪总结)
系列:问题解决方案库(2/5)
难度:⭐⭐⭐ 中级
预计阅读时间:12分钟
适用人群:正在开发Agent的开发者
经验来源:实际项目中的踩坑记录
引言
开发AI Agent听起来很酷,但实际操作中会遇到很多意想不到的问题。
经过多个Agent项目的实战,我总结了10个最常见的陷阱。
如果你正在或准备开发Agent,这篇文章能帮你避开这些坑!
陷阱1:工具定义不规范
❌ 错误做法
from langchain.tools import Tool # 工具描述太简单 tool = Tool( name="search", func=search_web, description="搜索" # ← 太模糊 )🔍 问题分析
Agent依赖工具描述来决定:
- 何时调用这个工具
- 传入什么参数
- 期望得到什么结果
描述不清会导致:
- Agent不调用工具
- 传入错误参数
- 无限循环调用
✅ 正确做法
from langchain.tools import Tool from typing import Optional def search_web(query: str, num_results: int = 5) -> str: """ 从互联网搜索相关信息 Args: query: 搜索关键词,应该简洁明确 num_results: 返回结果数量,默认5条 Returns: 搜索结果摘要,每条包含标题和简介 """ # 实现搜索逻辑 results = perform_search(query, num_results) return format_results(results) tool = Tool( name="WebSearch", func=search_web, description=( "当需要获取实时信息、新闻、或知识库中没有的内容时使用。" "输入应为简洁的搜索关键词,如'Python最新版本'。" "返回最多5条相关结果的摘要。" "注意:不要用于数学计算或逻辑推理。" ) )最佳实践:
- 工具名称使用驼峰命名(WebSearch而非web_search)
- 描述包含:使用时机、输入格式、输出格式、限制条件
- 添加类型提示和文档字符串
- 提供使用示例
陷阱2:Prompt设计不当
❌ 错误做法
prompt = """ 你是一个助手。回答问题。 """🔍 问题分析
- 角色定义模糊
- 没有行为约束
- 缺少输出格式要求
- 容易受到注入攻击
✅ 正确做法
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder system_prompt = """ 你是一个专业的投资研究助手,专注于A股市场分析。 【能力范围】 - 分析股票基本面数据 - 解读财务报表 - 提供投资建议(仅供参考) 【行为约束】 - 不提供具体的买卖建议 - 不预测短期股价走势 - 所有分析基于公开数据 - 遇到不确定信息时明确说明 【输出格式】 1. 先给出结论(1-2句话) 2. 再列出关键数据支撑 3. 最后说明风险提示 【注意事项】 - 使用专业但易懂的语言 - 数据要注明来源和时间 - 保持客观中立 """ prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder(variable_name="history"), ("human", "{question}"), ])Prompt设计原则:
- 明确角色:你是谁,做什么的
- 划定边界:能做什么,不能做什么
- 规范输出:格式、风格、长度
- 防御注入:防止用户绕过限制
陷阱3:错误处理缺失
❌ 错误做法
def get_stock_price(symbol: str) -> float: """获取股票价格""" response = requests.get(f"https://api.example.com/price/{symbol}") return response.json()["price"] # ← 没有任何错误处理🔍 问题分析
- API可能超时
- 返回格式可能变化
- 网络可能中断
- Agent会崩溃或返回错误信息给用户
✅ 正确做法
from typing import Optional import requests from requests.exceptions import RequestException, Timeout def get_stock_price(symbol: str, timeout: int = 10) -> Optional[str]: """ 获取股票最新价格 Args: symbol: 股票代码,如 '000001.SZ' timeout: 超时时间(秒) Returns: 价格字符串,失败时返回错误信息 """ try: response = requests.get( f"https://api.example.com/price/{symbol}", timeout=timeout ) response.raise_for_status() data = response.json() # 验证返回数据 if "price" not in data: return f"错误:API返回格式异常,缺少price字段" price = data["price"] return f"{symbol} 最新价格:{price}元" except Timeout: return f"错误:请求超时,请稍后重试" except RequestException as e: return f"错误:网络请求失败 - {str(e)}" except Exception as e: return f"错误:未知错误 - {str(e)}" # 在Agent中使用 tool = Tool( name="StockPrice", func=get_stock_price, description="获取股票最新价格。输入股票代码,返回价格或错误信息。" )错误处理要点:
- 捕获所有可能的异常
- 返回用户友好的错误信息
- 记录详细日志(用于调试)
- 设置合理的超时时间
- 考虑重试机制
陷阱4:记忆管理混乱
❌ 错误做法
from langchain.memory import ConversationBufferMemory # 每次创建新的Memory对象 def chat(question: str): memory = ConversationBufferMemory() # ← 每次都新建 chain = ConversationChain(llm=llm, memory=memory) return chain.run(question) # 结果:每轮对话都是独立的,没有上下文🔍 问题分析
- Memory对象生命周期错误
- 多用户会话混淆
- 内存泄漏(长期运行)
- 无法持久化
✅ 正确做法
from langchain.memory import ConversationBufferWindowMemory from langchain.chains import ConversationChain class AgentSession: """Agent会话管理器""" def __init__(self, session_id: str, max_history: int = 10): self.session_id = session_id self.memory = ConversationBufferWindowMemory( k=max_history, # 只保留最近10轮 return_messages=True ) self.chain = ConversationChain( llm=llm, memory=self.memory, verbose=False ) def chat(self, question: str) -> str: """处理一轮对话""" response = self.chain.run(input=question) return response def clear_memory(self): """清空记忆""" self.memory.clear() def get_summary(self) -> str: """获取对话摘要""" return self.memory.buffer # 使用示例 session = AgentSession(session_id="user_123") response1 = session.chat("我叫小明") response2 = session.chat("我今年25岁") response3 = session.chat("我叫什么名字?") # AI会记得进阶:持久化存储
import redis from langchain.memory import RedisChatMessageHistory class PersistentAgentSession: """支持持久化的Agent会话""" def __init__(self, session_id: str, redis_url: str = "redis://localhost:6379"): self.session_id = session_id # 使用Redis存储历史 self.history = RedisChatMessageHistory( session_id=session_id, url=redis_url ) self.memory = ConversationBufferWindowMemory( chat_memory=self.history, k=20, # 保留最近20轮 return_messages=True ) self.chain = ConversationChain( llm=llm, memory=self.memory )记忆管理最佳实践:
- 限制历史长度(避免上下文过长)
- 为每个用户创建独立会话
- 定期清理过期会话
- 重要信息持久化存储
- 提供清空记忆的接口
陷阱5:工具调用死循环
❌ 错误现象
Agent: 我需要查询天气 → 调用Weather工具 Tool: 返回"北京今天晴" Agent: 我需要确认天气 → 再次调用Weather工具 Tool: 返回"北京今天晴" Agent: 我需要再次确认 → 又调用Weather工具 ...(无限循环)🔍 问题分析
- Agent没有理解工具返回的结果
- Prompt没有明确的停止条件
- 缺乏最大迭代次数限制
✅ 正确做法
from langchain.agents import AgentExecutor, create_react_agent # 方法1:设置最大迭代次数 agent_executor = AgentExecutor( agent=agent, tools=tools, max_iterations=5, # ← 最多迭代5次 handle_parsing_errors=True, verbose=True ) # 方法2:在System Prompt中明确规则 system_prompt = """ 你是一个有用的助手。 【重要规则】 1. 如果工具已经返回了答案,直接使用它回答用户 2. 不要重复调用同一个工具超过2次 3. 如果无法获取信息,诚实地告诉用户 4. 最多进行3轮工具调用,然后必须给出最终答案 【工作流程】 思考 → 行动 → 观察 → 思考 → ... → 最终答案 """ # 方法3:自定义回调监控 from langchain.callbacks import BaseCallbackHandler class LoopDetectionHandler(BaseCallbackHandler): """检测并阻止死循环""" def __init__(self, max_same_tool_calls: int = 2): self.tool_call_count = {} self.max_same_tool_calls = max_same_tool_calls def on_tool_start(self, serialized, input_str, **kwargs): tool_name = serialized['name'] self.tool_call_count[tool_name] = self.tool_call_count.get(tool_name, 0) + 1 if self.tool_call_count[tool_name] > self.max_same_tool_calls: raise Exception( f"检测到死循环:工具 '{tool_name}' 已被调用 " f"{self.tool_call_count[tool_name]} 次" ) agent_executor = AgentExecutor( agent=agent, tools=tools, callbacks=[LoopDetectionHandler(max_same_tool_calls=2)] )陷阱6:返回值格式错误
❌ 错误做法
def calculate(expression: str): """计算数学表达式""" result = eval(expression) return result # ← 返回数字类型 # Agent期望字符串,收到数字后解析失败🔍 问题分析
- LangChain Agent期望工具返回字符串
- 返回其他类型会导致解析错误
- 格式不一致影响Agent理解
✅ 正确做法
def calculate(expression: str) -> str: """ 计算数学表达式 Args: expression: 数学表达式,如 '2 + 2' 或 '3 * (4 + 5)' Returns: 计算结果的字符串表示 """ try: # 安全计算(禁用危险函数) result = eval(expression, {"__builtins__": {}}, {}) return f"计算结果:{result}" # ← 始终返回字符串 except Exception as e: return f"计算错误:{str(e)}" # 更复杂的返回格式 def search_news(keyword: str) -> str: """搜索新闻""" articles = fetch_news(keyword) if not articles: return f"未找到关于'{keyword}'的新闻" # 格式化输出 output = f"找到 {len(articles)} 条相关新闻:\n\n" for i, article in enumerate(articles[:5], 1): output += f"{i}. {article['title']}\n" output += f" {article['summary']}\n" output += f" 来源:{article['source']} | " output += f"时间:{article['date']}\n\n" return output返回值格式规范:
- 始终返回字符串
- 结构化输出(便于Agent解析)
- 包含必要的上下文信息
- 错误时返回清晰的错误消息
- 避免返回JSON(除非必要)
陷阱7:并发处理问题
❌ 错误做法
# 多个用户同时访问 @app.post("/chat") def chat(request: ChatRequest): session = get_session(request.user_id) return session.chat(request.question) # 问题:共享状态导致数据混乱🔍 问题分析
- 全局变量被多线程修改
- 会话状态冲突
- 资源竞争
- 数据泄露风险
✅ 正确做法
from fastapi import FastAPI from threading import Lock app = FastAPI() # 线程安全的会话管理 class SessionManager: def __init__(self): self.sessions = {} self.lock = Lock() def get_session(self, user_id: str) -> AgentSession: with self.lock: if user_id not in self.sessions: self.sessions[user_id] = AgentSession(user_id) return self.sessions[user_id] def cleanup_expired(self): """清理过期会话""" with self.lock: # 实现清理逻辑 pass session_manager = SessionManager() @app.post("/chat") async def chat(request: ChatRequest): session = session_manager.get_session(request.user_id) response = await session.chat_async(request.question) return {"response": response} # 或使用异步锁 import asyncio class AsyncSessionManager: def __init__(self): self.sessions = {} self.lock = asyncio.Lock() async def get_session(self, user_id: str): async with self.lock: if user_id not in self.sessions: self.sessions[user_id] = AsyncAgentSession(user_id) return self.sessions[user_id]并发安全要点:
- 使用锁保护共享状态
- 每个用户独立会话
- 异步环境使用asyncio.Lock
- 定期清理过期资源
- 考虑使用Redis等外部存储
陷阱8:安全性考虑不足
❌ 错误做法
# 直接执行用户输入 def execute_code(code: str): return eval(code) # ← 严重安全漏洞! # 或未验证的工具参数 def send_email(to: str, content: str): send(to, content) # ← 可能被用于垃圾邮件🔍 问题分析
- 代码注入攻击
- 敏感信息泄露
- 滥用工具造成损失
- 违反合规要求
✅ 正确做法
import re from typing import List # 1. 输入验证 def validate_email_address(email: str) -> bool: """验证邮箱地址格式""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) def send_email_tool(to: str, subject: str, content: str) -> str: """发送邮件(带安全检查)""" # 验证邮箱格式 if not validate_email_address(to): return "错误:无效的邮箱地址" # 检查内容长度 if len(content) > 5000: return "错误:邮件内容过长(最多5000字符)" # 过滤敏感词 sensitive_words = ["密码", "银行卡", "身份证"] for word in sensitive_words: if word in content: return "错误:邮件内容包含敏感信息" # 记录日志 log_email_sent(to, subject) # 发送邮件 try: actual_send_email(to, subject, content) return f"邮件已发送至 {to}" except Exception as e: return f"发送失败:{str(e)}" # 2. 权限控制 class ToolPermission: """工具权限管理""" def __init__(self): self.permissions = { "admin": ["all"], "user": ["search", "calculate", "query"], "guest": ["search"] } def check_permission(self, user_role: str, tool_name: str) -> bool: allowed = self.permissions.get(user_role, []) return tool_name in allowed or "all" in allowed # 3. 速率限制 from functools import wraps import time def rate_limit(max_calls: int, period: int = 60): """限制调用频率""" calls = [] def decorator(func): @wraps(func) def wrapper(*args, **kwargs): now = time.time() calls.append(now) # 移除过期的调用记录 while calls and calls[0] < now - period: calls.pop(0) if len(calls) > max_calls: return "错误:调用频率过高,请稍后重试" return func(*args, **kwargs) return wrapper return decorator @rate_limit(max_calls=10, period=60) # 每分钟最多10次 def search_tool(query: str): # ...安全最佳实践:
- 永远不要直接执行用户输入
- 所有输入都要验证和清洗
- 实施权限控制和速率限制
- 记录所有敏感操作日志
- 定期安全审计
陷阱9:性能瓶颈
❌ 错误做法
# 每次都重新加载模型 def chat(question: str): llm = load_llm() # ← 耗时操作 return llm.invoke(question) # 或串行处理多个请求 for question in questions: result = agent.run(question) # ← 一个接一个🔍 问题分析
- 重复初始化开销大
- 串行处理效率低
- 未使用缓存
- 响应时间长
✅ 正确做法
# 1. 单例模式(复用LLM实例) class LLMSingleton: _instance = None @classmethod def get_instance(cls): if cls._instance is None: cls._instance = ChatOpenAI(model="gpt-3.5-turbo") return cls._instance llm = LLMSingleton.get_instance() # 2. 缓存机制 from functools import lru_cache import hashlib @lru_cache(maxsize=1000) def cached_chat(question: str) -> str: """缓存常见问题""" return llm.invoke(question).content def smart_chat(question: str) -> str: """智能缓存""" # 生成问题指纹 fingerprint = hashlib.md5(question.encode()).hexdigest() # 检查缓存 if fingerprint in cache: return cache[fingerprint] # 调用LLM response = llm.invoke(question).content # 存入缓存 cache[fingerprint] = response return response # 3. 批量处理 import asyncio async def batch_process(questions: List[str]) -> List[str]: """并行处理多个问题""" tasks = [agent.ainvoke({"input": q}) for q in questions] results = await asyncio.gather(*tasks) return [r["output"] for r in results] # 4. 流式输出(提升用户体验) async def stream_chat(question: str): """流式返回结果""" async for chunk in llm.astream(question): yield chunk.content # FastAPI中使用 from fastapi.responses import StreamingResponse @app.post("/chat/stream") async def stream_chat_endpoint(request: ChatRequest): return StreamingResponse( stream_chat(request.question), media_type="text/plain" )性能优化要点:
- 复用LLM实例(避免重复加载)
- 缓存常见问题
- 批量并行处理
- 流式输出改善体验
- 监控性能指标
陷阱10:测试覆盖不全
❌ 错误做法
# 只测试正常流程 def test_agent(): result = agent.run("你好") assert "你好" in result🔍 问题分析
- 未测试边界情况
- 未测试错误处理
- 未测试并发场景
- 上线后才发现bug
✅ 正确做法
import pytest from unittest.mock import Mock, patch class TestAgent: """Agent全面测试""" def setup_method(self): """每个测试前的准备""" self.agent = create_test_agent() # 1. 正常功能测试 def test_basic_chat(self): """测试基本对话""" result = self.agent.run("你好") assert result is not None assert len(result) > 0 # 2. 边界情况测试 def test_empty_input(self): """测试空输入""" result = self.agent.run("") assert "请输入" in result or len(result) > 0 def test_very_long_input(self): """测试超长输入""" long_text = "测试" * 10000 result = self.agent.run(long_text) assert result is not None # 3. 错误处理测试 def test_tool_failure(self): """测试工具失败""" with patch('tools.search_web', side_effect=Exception("网络错误")): result = self.agent.run("搜索Python教程") assert "错误" in result or "失败" in result def test_timeout(self): """测试超时处理""" with patch('llm.invoke', side_effect=TimeoutError()): result = self.agent.run("测试") assert "超时" in result or "稍后" in result # 4. 并发测试 def test_concurrent_requests(self): """测试并发请求""" import concurrent.futures def chat_request(i): return self.agent.run(f"问题{i}") with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(chat_request, i) for i in range(20)] results = [f.result() for f in futures] assert len(results) == 20 assert all(r is not None for r in results) # 5. 集成测试 def test_full_workflow(self): """测试完整工作流""" # 模拟真实用户交互 conversation = [ "我想了解人工智能", "能详细介绍一下吗?", "有哪些应用场景?", "谢谢你的帮助" ] session = AgentSession("test_user") responses = [] for msg in conversation: response = session.chat(msg) responses.append(response) assert response is not None # 验证对话连贯性 assert len(responses) == len(conversation) # 6. 性能测试 def test_response_time(self): """测试响应时间""" import time start = time.time() result = self.agent.run("简单问题") elapsed = time.time() - start assert elapsed < 5.0 # 响应时间应小于5秒 print(f"响应时间:{elapsed:.2f}秒") # 运行测试 if __name__ == "__main__": pytest.main([__file__, "-v"])测试覆盖清单:
- [ ] 正常功能测试
- [ ] 边界情况测试(空输入、超长输入)
- [ ] 错误处理测试(网络错误、超时)
- [ ] 并发测试(多用户同时访问)
- [ ] 集成测试(完整工作流)
- [ ] 性能测试(响应时间、吞吐量)
- [ ] 安全测试(注入攻击、权限绕过)
总结
| 陷阱 | 严重程度 | 解决难度 | 关键措施 |
|---|---|---|---|
| 工具定义不规范 | ⭐⭐⭐ | 易 | 详细描述+示例 |
| Prompt设计不当 | ⭐⭐⭐⭐ | 中 | 明确角色+边界 |
| 错误处理缺失 | ⭐⭐⭐⭐⭐ | 易 | 全面异常捕获 |
| 记忆管理混乱 | ⭐⭐⭐⭐ | 中 | 会话隔离+持久化 |
| 工具调用死循环 | ⭐⭐⭐⭐⭐ | 中 | 迭代限制+监控 |
| 返回值格式错误 | ⭐⭐⭐ | 易 | 统一返回字符串 |
| 并发处理问题 | ⭐⭐⭐⭐⭐ | 难 | 锁+独立会话 |
| 安全性不足 | ⭐⭐⭐⭐⭐ | 难 | 输入验证+权限 |
| 性能瓶颈 | ⭐⭐⭐ | 中 | 缓存+并行 |
| 测试不全 | ⭐⭐⭐⭐ | 中 | 全面测试覆盖 |
🤔 互动环节
投票:你遇到过哪些陷阱?
- [ ] 工具调用死循环
- [ ] 记忆管理问题
- [ ] 并发安全问题
- [ ] 其他(留言说明)
留言:你还遇到过哪些Agent开发的坑?留言分享,帮助更多人避坑!
预告:下一篇《向量检索优化技巧》,敬请期待!
相关链接
- LangChain官方文档
- GitHub项目
