智能搜索系统流程分析文档
📊 系统架构总览
graph TBA[前端客户端] --> B[FastAPI服务器]B --> C{API路由分发}C --> D[3. 任务执行]C --> E[2. 文件上传]C --> F[8a. 文件列表]C --> G[8b. 文件下载]C --> H[7. WebSocket]D --> I[异步任务执行]I --> J[4. 主智能体]J --> K{决策}K --> L[工具调用]K --> M[子智能体]K --> N[结果输出]L --> O[生成Markdown]L --> P[转PDF]L --> Q[读取文件]M --> R[5a. 网络搜索]M --> S[5b. 数据库查询]M --> T[5c. RAGFlow]R --> U[Tavily搜索]S --> V[SQL工具]T --> W[RAGFlow工具]J --> X[6. Monitor监控]X --> Y[WebSocket推送]Y --> A
🔄 流程说明
1️⃣ 服务启动
入口: api/server.py
流程:
- 初始化 FastAPI + CORS
- 创建
output/和updated/目录 - 绑定事件循环到 WebSocket 管理器
- 初始化主智能体
- 启动 Uvicorn (端口 8000)
2️⃣ 文件上传
接口: POST /api/upload
流程:
- 接收文件和
thread_id - 创建目录:
updated/session_{thread_id}/ - 流式写入文件
- 返回文件列表
特性: 多格式支持、流式写入、会话隔离
3️⃣ 任务执行
接口: POST /api/task
执行链路:
请求 → 获取thread_id → 异步启动 → 创建工作目录 → 复制上传文件
→ 设置上下文 → 推送目录 → 构建提示词 → 流式执行
→ 实时监控 → 推送结果 → 清理资源
核心逻辑:
- 创建会话目录:
output/session_{session_id}/ - 复制上传文件到工作目录
- 设置 ContextVars (线程安全)
- 流式执行
main_agent.astream()
4️⃣ 主智能体
角色: 团队协调者 (配置文件: prompt/prompts.yml)
职责: 任务分解、子智能体调度、文件生成管理
决策类型:
| 类型 | 工具 | 功能 |
|---|---|---|
| 工具调用 | generate_markdown | 生成 Markdown |
| convert_md_to_pdf | MD 转 PDF | |
| read_file_content | 读取多格式文件 | |
| 子智能体 | task | 分发专业任务 |
| 结果输出 | - | 返回最终结果 |
规则: 先搜索后生成、禁止占位符、内容≥1000字
5️⃣ 子智能体
5a. 网络搜索 (Tavily)
- 工具:
internet_search - 策略: ≥3个角度,最多5次检索,由浅入深
- 场景: 公开信息、行业知识、市场数据
5b. 数据库查询 (MySQL)
- 工具:
list_sql_tables→get_table_data→execute_sql_query - 内容: 药品信息、库存、销售数据
- 流程: 列表 → 预览 → 精确查询
5c. RAGFlow 知识库
- 工具:
get_assistant_list→create_ask_delete - 流程: 获取助手 → 选择 → 提问 (≥3个问题)
- 场景: 企业内部知识、专有资料
6️⃣ 实时监控 (Monitor)
设计: 单例模式 ToolMonitor
功能: 工具进度、子智能体通知、结果推送、WebSocket 通信
事件类型:
| 事件 | 时机 | 数据 |
|---|---|---|
| tool_start | 工具执行 | tool_name, args |
| assistant_call | 子智能体调用 | assistant_name, args |
| task_result | 任务完成 | result |
| session_created | 目录创建 | path |
| error | 异常 | error_message |
推送机制: WebSocket优先 → 脚本模式 → 控制台保底
7️⃣ WebSocket 通信
接口: GET /ws/{thread_id}
生命周期:
握手 → 注册(manager.connect) → 监听循环(ping/pong) → 断开清理
特性:
- 会话隔离:
active_connections[thread_id] - 心跳机制: 防超时断开
- 定向推送: 按 thread_id 发送消息
8️⃣ 文件管理
8a. 文件列表 (GET /api/files)
- 路径安全检查 (防遍历攻击)
- 递归遍历 + 元数据收集
- 按修改时间倒序
8b. 文件下载 (GET /api/download)
- 验证路径在
output/内 - FileResponse 流式传输
- 浏览器自动下载
安全: 严格的路径检查 is_relative_to()
🔑 核心设计特点
1. 异步并发架构
- 多会话支持: 每个客户端独立 thread_id
- 非阻塞执行:
asyncio.create_task后台运行 - 流式处理:
astream()实时返回结果 - 事件循环管理: 精确控制协程执行环境
2. 智能体协作模式
主智能体 (协调者)├── 网络搜索助手 (公开信息)├── 数据库查询助手 (企业数据)└── RAGFlow助手 (内部知识)
3. 工具增强系统
- 文件生成: Markdown → PDF 转换链
- 文件读取: 多格式统一接口
- 路径管理: 相对路径自动解析
- 上下文感知: ContextVars 线程安全
4. 实时监控反馈
- 进度可视化: 工具执行状态实时推送
- 子任务追踪: 子智能体调用通知
- 错误处理: 异常信息即时反馈
- 双向通信: WebSocket 全双工
5. 安全隔离机制
- 会话隔离: 独立工作目录
- 路径安全: 严格的路径遍历检查
- 上下文管理: ContextVars 避免数据泄露
- 资源清理: finally 块确保资源释放
6. 灵活扩展能力
- 插件化工具: LangChain 工具装饰器
- 可配置提示词: YAML 配置文件
- 子智能体热插拔: 字典配置即可添加
- 多模型支持: 通过
.env切换 LLM
📂 项目目录结构
deep_search_pro/
├── agent/ # 智能体模块
│ ├── main_agent.py # 主智能体 (协调者)
│ ├── llm.py # LLM 模型初始化
│ ├── prompts.py # 提示词加载器
│ └── subagents/ # 子智能体
│ ├── network_search_agent.py # 网络搜索
│ ├── database_query_agent.py # 数据库查询
│ └── knowledge_base_agent.py # RAGFlow知识库
│
├── api/ # API 服务层
│ ├── server.py # FastAPI 路由和服务器
│ ├── monitor.py # 监控和 WebSocket 管理
│ └── context.py # 上下文管理 (ContextVars)
│
├── tools/ # 工具集
│ ├── markdown_tools.py # Markdown 生成
│ ├── pdf_tools.py # PDF 转换
│ ├── upload_file_read_tool.py # 文件读取
│ ├── db_tools.py # 数据库工具
│ ├── tavily_tool.py # Tavily 搜索
│ ├── ragflow_tools.py # RAGFlow 交互
│ └── upload_file_read_tool.py # 文件读取
│
├── prompt/ # 配置文件
│ └── prompts.yml # 智能体提示词配置
│
├── utils/ # 工具函数
│ ├── path_utils.py # 路径解析
│ └── word_converter.py # Word 转换
│
├── output/ # 输出目录 (按会话隔离)
│ └── session_{id}/
│
├── updated/ # 上传目录 (按会话隔离)
│ └── session_{id}/
│
├── .env # 环境变量配置
├── requirements.txt # 依赖列表
└── 项目步骤 # 项目文档
🚀 典型使用场景
场景 1: 生成市场分析报告
用户请求: "生成一份空调市场分析报告,保存为 PDF"执行流程:
1. 主智能体接收任务
2. 调用网络搜索助手 → 收集市场数据
3. 调用数据库查询助手 → 获取销售数据
4. 调用 RAGFlow 助手 → 获取内部资料
5. 汇总所有信息
6. 调用 generate_markdown → 生成 MD 文档
7. 调用 convert_md_to_pdf → 转换为 PDF
8. 推送最终结果和文件位置
场景 2: 查询库存信息
用户请求: "查询当前空调库存情况"执行流程:
1. 主智能体分析任务类型
2. 调用数据库查询助手
3. 执行 SQL 查询库存表
4. 返回查询结果
5. 无需生成文件,直接反馈
场景 3: 分析上传文档
用户上传: product_specs.pdf
用户请求: "分析上传的产品规格文档"执行流程:
1. 文件保存到 updated/session_{id}/
2. 复制到 output/session_{id}/ 工作目录
3. 主智能体调用 read_file_content 读取 PDF
4. 分析文档内容
5. 可结合网络搜索补充信息
6. 生成分析报告
💡 技术栈总结
| 类别 | 技术 | 用途 |
|---|---|---|
| Web 框架 | FastAPI | REST API 和 WebSocket |
| ASGI 服务器 | Uvicorn | 异步服务器 |
| AI 框架 | LangChain | 智能体和工具系统 |
| 工作流 | LangGraph | 智能体状态管理 |
| LLM | Qwen-Max (OpenAI 兼容) | 大语言模型 |
| 搜索 | Tavily API | 网络信息检索 |
| 文档处理 | python-docx, pypdf, pandas | 多格式文件处理 |
| 实时通信 | WebSocket | 双向消息推送 |
| 配置管理 | PyYAML, python-dotenv | 配置文件加载 |
📝 总结
智能搜索系统是一个企业级深度搜索增强系统,核心特点:
✅ 多智能体协作 - 主从架构,专业分工
✅ 实时通信 - WebSocket 全双工推送
✅ 异步并发 - 支持多客户端同时使用
✅ 工具增强 - 丰富的文件处理能力
✅ 安全隔离 - 会话级目录和路径保护
✅ 灵活扩展 - 插件化工具和可配置提示词
系统通过智能的任务分解和资源调度,实现了从信息获取到文档生成的完整自动化工作流!
作者:Work Hard Work Smart
出处:http://www.cnblogs.com/linlf03/
欢迎任何形式的转载,未经作者同意,请保留此段声明!
