当前位置: 首页 > news >正文

AIAgent

AIAgent 是一个 AI 代理类,用于处理企业微信消息。

from deepagents import create_deep_agent from langchain_mcp_adapters.tools import load_mcp_tools from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage from mcp.client.session import ClientSession from mcp.client.streamable_http import streamable_http_client from pkg.config import cfg from pkg.log import get_logger class AIAgent: logger = get_logger("ai_agent") def __init__(self): self.model: ChatOpenAI = None # type: ignore self._mcp_session: ClientSession = None # type: ignore self._mcp_server_url = f"http://127.0.0.1:{cfg.service_port}/mcp/" async def start(self): if not self.model: self.model = ChatOpenAI( base_url=cfg.agent_base_url, api_key=cfg.agent_api_key, # type: ignore model=cfg.agent_model, ) async def shutdown(self): self.model = None # type: ignore async def _create_root_agent(self, session: ClientSession): tools = await load_mcp_tools(session) root_agent = create_deep_agent( model=self.model, tools=tools, system_prompt=f"你是一个智能助手,名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题,并用温和积极的语气回答问题。回答的格式应该符合markdown规范。", ) return root_agent async def astream(self, input: str, thread_id: str = ""): self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}") async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id): async with ClientSession(read, write) as session: await session.initialize() root_agent = await self._create_root_agent(session) async for chunk in root_agent.astream( input={"messages": [HumanMessage(content=input)]} ): # 从 chunk 字典中提取 AIMessage 的 content if isinstance(chunk, dict): messages = chunk.get("model", {}).get("messages", []) if not messages: continue for msg in messages: if hasattr(msg, "content") and msg.content: yield str(msg.content) elif hasattr(chunk, "content"): yield str(chunk.content) # type: ignore else: yield str(chunk) async def ainvoke(self, input: str, thread_id: str = ""): self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}") async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id): async with ClientSession(read, write) as session: await session.initialize() root_agent = await self._create_root_agent(session) resp = await root_agent.ainvoke( input={"messages": [HumanMessage(content=input)]} ) return resp["messages"][-1].content

main

# main.py from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import uvicorn from contextlib import asynccontextmanager from pkg.qywx import qywx_client from pkg.config import cfg from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp from ai_agent import aiops @asynccontextmanager async def lifespan(app: FastAPI): await aiops.start() await qywx_client.start() # 先获取 MCP app(这会创建 session_manager) mcp_app = datetime_mcp.streamable_http_app() # 在 FastAPI lifespan 中启动 MCP session manager async with datetime_mcp.session_manager.run(): # 挂载 MCP app app.mount("/mcp", mcp_app) yield await aiops.shutdown() await qywx_client.shutdown() app = FastAPI( lifespan=lifespan ) if __name__ == "__main__": uvicorn.run("main:app", host=cfg.service_host, port=cfg.service_port)
http://www.jsqmd.com/news/1093821/

相关文章:

  • @ConditionalOnProperty 注解功能和使用场景说明完整示例演示
  • TI Fuel Tank MKII电池扩展板:为LaunchPad打造智能移动电源解决方案
  • 农机制动性能检测仪设计方案
  • k6性能测试实战指南:从入门到企业级应用
  • 当AI编程工具开始“挑网络”:Anthropic封禁第三方调用背后,开发者的网络出口为何成为关键变量
  • 构建自主可控的Web安全防线:ModSecurity与OWASP CRS集成实战指南
  • 从“被动响应”到“主动行动”的架构革命
  • BLE Link Layer【Bit Ordering】:为什么 b0 b1 b2 = 110 表示 3,而不是 6?
  • Claude 3.5 Sonnet技术解析:Tool Use与推理可视化实战
  • 计算机毕业设计之 基于深度学习的航空机票价格预测软件实现
  • 扬州清宸康养180道菜不重样?真相究竟是怎样,快来一探究竟!
  • minimax m3实际使用体验
  • 好用的会务软件怎么选?会助力智能会务系统打造一站式会议平台
  • 实力强的降英文AI工具工具
  • ModelEngine QA对生成技术:如何实现60%留用率的高质量训练数据
  • OpenMontage全链路AI视频生成实战:从流程编排到工程化落地
  • springCloud集成seata2.x
  • 12.DTS中增加GPIO信息
  • 视频台词停顿太多?一键自动去除空白间隙
  • K8s 多 Master 重启:流程梳理与问题排查
  • 做了一个月Skills,我才理解Agent可靠性的本质
  • 三、Prometheus安装和配置node-exporter服务
  • LED 隧道灯老旧改造工程 功率测算与施工核验技术规范
  • OpenMontage全链路AI视频生成:从环境部署到生产实践指南
  • 好用的检测机DD马达哪家靠谱
  • 《VMware 安装 Ubuntu Linux 全过程 + 排错总结》
  • GaussDB(DWS)数据仓库性能压测与调优实战:从0到1全记录
  • 【从0到1构建一个ClaudeAg _
  • 为什么建议中小企业优先考虑开源ERP
  • 终极SPT-AKI存档编辑器:5分钟掌握逃离塔科夫离线版完整修改指南