小白程序员必看:手把手教你搭建RAG-SQL Router智能问答系统(收藏版)
你有没有遇到过这样的困境:用户问你一个问题,你得先判断是该去文档库里翻翻,还是该查查数据库?更头疼的是,如果判断错了,给出的答案要么不准确,要么干脆答非所问。
今天我想和你聊聊一个实战项目——RAG-SQL Router。这不是什么高深莫测的理论,而是一个能真正解决问题的智能系统。更重要的是,我会把完整的实现思路和代码都分享给你,让你也能动手搭建一个。
一、为什么需要这样一个系统?
先说说实际场景。假设你在做一个企业内部的智能问答助手:
场景1:非结构化数据查询
- “公司最新的休假政策是什么?”
- “产品功能文档里关于API鉴权的部分怎么说的?”
这类问题的答案藏在PDF、Word文档、Wiki页面里——典型的非结构化数据,最适合用RAG(检索增强生成)来处理。
场景2:结构化数据查询
- “去年第四季度销售额最高的三个地区是哪些?”
- “目前有多少活跃用户?”
这些问题需要的是精确的数字,答案在数据库里——需要Text-to-SQL来解决。
问题来了:当用户随便问一个问题,系统怎么知道该用哪种方式回答?
这就是RAG-SQL Router要解决的核心问题:让AI自己判断该走哪条路。
二、系统架构:Agent如何做决策?
整个系统的核心是一个路由Agent,它的工作流程是这样的:
智能体根据用户问题分析意图后,自主选择文档检索或数据库查询工具来获取信息并返回结果。
关键组件解析
- Router Agent(路由智能体)
这是整个系统的大脑。它基于LlamaIndex的Workflow框架构建,能够:
- 理解用户问题的语义
- 判断问题类型(文档检索还是数据查询)
- 选择合适的工具
- 甚至可以同时调用多个工具
- RAG工具(向量检索引擎)
负责处理非结构化数据:
- 使用LlamaCloud作为向量数据库
- 支持PDF、Word等文档格式
- 通过语义相似度检索相关内容
- SQL工具(自然语言转SQL引擎)
负责处理结构化数据:
- 将自然语言转换为SQL查询
- 执行数据库查询
- 返回结构化结果
- Cleanlab Codex(质量保障层)
这是个亮点!很多人做Agent,但没人关心输出是否靠谱。Cleanlab Codex提供:
- 自动检测不准确或无用的回答
- 为每个回答提供可信度评分
- 实时验证查询和响应
- 允许专家直接改进回答,无需改代码
三、动手实现:完整代码解析
让我带你一步步搭建这个系统。我会用最实际的代码,而不是空谈理论。
第一步:环境准备
# 创建项目目录 mkdir rag-sql-router cd rag-sql-router # 创建虚拟环境 python -m venv venv source venv/bin/activate # Windows用: venv\Scripts\activate # 安装依赖 pip install llama-index llama-index-llms-openai \ llama-index-embeddings-openai \ llama-index-indices-managed-llama-cloud \ sqlalchemy streamlit nest-asyncio第二步:配置API密钥
创建.env文件:
OPENAI_API_KEY=your_openai_key LLAMA_CLOUD_API_KEY=your_llamacloud_key LLAMA_CLOUD_ORG_ID=your_org_id LLAMA_CLOUD_PROJECT_NAME=your_project LLAMA_CLOUD_INDEX_NAME=your_index第三步:搭建SQL查询引擎
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, insert from llama_index.core.query_engine import NLSQLTableQueryEngine from llama_index.core import SQLDatabase # 创建示例数据库 engine = create_engine("sqlite:///:memory:") metadata = MetaData() # 定义城市统计表 city_stats = Table( 'city_stats', metadata, Column('city', String, primary_key=True), Column('population', Integer), Column('country', String), ) metadata.create_all(engine) # 插入示例数据 rows = [ {"city": "Toronto", "population": 2930000, "country": "Canada"}, {"city": "Tokyo", "population": 13960000, "country": "Japan"}, {"city": "Berlin", "population": 3645000, "country": "Germany"}, ] with engine.connect() as conn: for row in rows: conn.execute(insert(city_stats).values(**row)) conn.commit() # 创建SQL数据库对象 sql_database = SQLDatabase(engine, include_tables=["city_stats"]) # 创建自然语言SQL查询引擎 sql_query_engine = NLSQLTableQueryEngine( sql_database=sql_database, tables=["city_stats"], )这段代码做了什么?
- 创建了一个内存SQLite数据库(适合演示,生产环境换成MySQL/PostgreSQL)
- 定义了城市统计表,包含城市名、人口、国家字段
- 插入了一些示例数据
- 用
NLSQLTableQueryEngine包装数据库,它能把"人口最多的城市"这样的问题转成SQL
第四步:搭建RAG检索引擎
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex import os # 连接到LlamaCloud索引 rag_index = LlamaCloudIndex( name=os.getenv("LLAMA_CLOUD_INDEX_NAME"), project_name=os.getenv("LLAMA_CLOUD_PROJECT_NAME"), organization_id=os.getenv("LLAMA_CLOUD_ORG_ID"), api_key=os.getenv("LLAMA_CLOUD_API_KEY"), ) # 创建查询引擎 rag_query_engine = rag_index.as_query_engine()LlamaCloud是什么?
简单说,它是一个托管的向量数据库服务:
- 你上传文档(PDF、DOCX等)
- 它自动切分、向量化、建索引
- 你只需要调用API查询
当然,你也可以用Qdrant、Pinecone、Weaviate等替代。
第五步:将查询引擎包装成工具
from llama_index.core.tools import QueryEngineTool # SQL工具 sql_tool = QueryEngineTool.from_defaults( query_engine=sql_query_engine, name="sql_query_engine", description=( "用于查询城市统计数据,包括人口、国家等信息。" "适合回答关于数字、排名、统计类的问题。" ), ) # RAG工具 rag_tool = QueryEngineTool.from_defaults( query_engine=rag_query_engine, name="document_search_engine", description=( "用于搜索文档内容,适合回答关于政策、流程、" "说明等需要从文档中查找信息的问题。" ), )为什么要包装成工具?
这里的description非常关键!Agent会读这些描述来判断该用哪个工具。所以描述要:
- 清晰明确:说清楚工具能做什么
- 区分明显:让Agent能轻松分辨使用场景
- 举例说明:提示适用的问题类型
第六步:构建Router Workflow
这是整个系统最核心的部分:
from llama_index.core.workflow import ( Event, StartEvent, StopEvent, Workflow, step, ) from llama_index.llms.openai import OpenAI from typing import List, Any # 定义事件类型 class PrepEvent(Event): """准备阶段完成事件""" pass class ToolCallEvent(Event): """工具调用事件""" tool_calls: List[Any] # 定义Router Workflow class RouterWorkflow(Workflow): def __init__(self, tools: List[QueryEngineTool], **kwargs): super().__init__(**kwargs) self.tools = {tool.metadata.name: tool for tool in tools} self.llm = OpenAI(model="gpt-4") @step async def prepare_chat(self, ev: StartEvent) -> PrepEvent: """准备对话消息""" # 获取用户查询 user_msg = ev.query # 构建系统提示 system_prompt = self._build_system_prompt() # 存储到上下文 self.query = user_msg self.messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_msg} ] return PrepEvent() @step async def handle_llm_call(self, ev: PrepEvent) -> ToolCallEvent | StopEvent: """调用LLM决策""" # 准备工具定义给LLM tools_def = [ { "type": "function", "function": { "name": name, "description": tool.metadata.description, "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "用户查询" } }, "required": ["query"] } } } for name, tool in self.tools.items() ] # 调用LLM response = await self.llm.achat( messages=self.messages, tools=tools_def ) # 检查是否有工具调用 if response.message.tool_calls: return ToolCallEvent(tool_calls=response.message.tool_calls) else: # 没有工具调用,直接返回答案 return StopEvent(result=response.message.content) @step async def handle_tool_calls(self, ev: ToolCallEvent) -> StopEvent: """执行工具调用""" results = [] for tool_call in ev.tool_calls: tool_name = tool_call.function.name tool_args = eval(tool_call.function.arguments) # 执行工具 tool = self.tools[tool_name] result = await tool.aquery(tool_args["query"]) results.append({ "tool": tool_name, "result": str(result) }) # 组合结果 final_answer = self._combine_results(results) return StopEvent(result=final_answer) def _build_system_prompt(self) -> str: """构建系统提示""" return """你是一个智能助手,能够访问以下工具: 1. SQL查询引擎:用于查询结构化数据 2. 文档搜索引擎:用于搜索文档内容 根据用户问题,选择合适的工具。如果需要,可以同时使用多个工具。 """ def _combine_results(self, results: List[dict]) -> str: """组合多个工具的结果""" if len(results) == 1: return results[0]["result"] combined = "根据查询结果:\n\n" for i, res in enumerate(results, 1): combined += f"{i}. 从{res['tool']}得到: {res['result']}\n" return combined这个Workflow是怎么工作的?
- prepare_chat: 接收用户问题,准备系统提示和对话消息
- handle_llm_call: 调用LLM,让它决定用哪个工具(或多个工具)
- handle_tool_calls: 实际执行工具调用,获取结果
- 如果有多个结果,组合起来返回
关键点在于:
- Event驱动:每个step返回一个Event,触发下一个step
- 异步执行:所有step都是async,支持并发
- 灵活路由:LLM可以选择0个、1个或多个工具
第七步:创建Streamlit界面
import streamlit as st import asyncio # 页面配置 st.set_page_config(page_title="RAG-SQL Router", page_icon="🤖") st.title(" 智能路由问答系统") st.markdown("**问我任何问题,我会自动选择最佳方式回答!**") # 初始化工具和workflow if "workflow" not in st.session_state: # 创建工具 tools = [sql_tool, rag_tool] # 初始化workflow st.session_state.workflow = RouterWorkflow( tools=tools, timeout=60.0 ) # 聊天历史 if "messages" not in st.session_state: st.session_state.messages = [] # 显示历史消息 for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # 用户输入 if prompt := st.chat_input("在这里输入你的问题..."): # 显示用户消息 st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # 调用workflow获取答案 with st.chat_message("assistant"): with st.spinner("思考中..."): # 运行workflow result = asyncio.run( st.session_state.workflow.run(query=prompt) ) # 显示答案 st.markdown(result) # 保存助手回答 st.session_state.messages.append({"role": "assistant", "content": result}) # 侧边栏:显示可用工具 with st.sidebar: st.subheader("🔧 可用工具") st.write("1. SQL查询引擎") st.caption("查询城市统计数据") st.write("2. 文档搜索引擎") st.caption("搜索文档内容") if st.button("清空对话"): st.session_state.messages = [] st.rerun()第八步:运行系统
streamlit run app.py打开浏览器,访问http://localhost:8501,试试这些问题:
SQL类问题:
- “哪个城市人口最多?”
- “有多少个欧洲城市?”
RAG类问题:
- “文档里提到的政策要点是什么?”
- “关于API使用的说明在哪里?”
混合问题:
- “东京的人口是多少?同时告诉我文档里关于东京的介绍。”
四、加入Cleanlab Codex:让输出更可靠
前面的系统已经能工作了,但有个问题:怎么知道AI的回答靠不靠谱?
这就是Cleanlab Codex的价值。它能:
- 自动检测问题回答
- 答非所问
- 信息不完整
- 事实错误
- 提供可信度评分
- 每个回答都有0-1的分数
- 分数低于阈值触发告警
- 支持专家反馈
- SME(领域专家)可以直接标注
- 系统自动学习改进
集成代码示例:
from cleanlab_codex import CleanlabCodex # 初始化Codex codex = CleanlabCodex(api_key=os.getenv("CLEANLAB_API_KEY")) # 在workflow中使用 @step async def validate_response(self, ev: StopEvent) -> StopEvent: """验证响应质量""" # 获取原始回答 response = ev.result # 验证质量 validation = codex.validate( query=self.query, response=response, context=self.retrieved_context ) # 添加可信度评分 confidence = validation.trustworthiness_score if confidence < 0.7: response += f"\n\n 可信度: {confidence:.2%} (建议人工确认)" else: response += f"\n\n 可信度: {confidence:.2%}" return StopEvent(result=response)五、实际应用场景
这套系统不是玩具,我们来看看能解决什么实际问题:
场景1:企业知识库
问题: 公司有大量文档(员工手册、产品文档、流程规范)和业务数据(销售、用户、财务)。员工经常不知道去哪找信息。
解决方案:
- RAG工具索引所有文档
- SQL工具连接业务数据库
- 员工直接问问题,系统自动路由
效果:
- 减少80%的重复咨询
- 提升员工自助查询效率
- 数据和文档统一入口
场景2:客户服务系统
问题: 客服需要回答产品使用问题(文档)和订单状态(数据库)。
解决方案:
- RAG工具索引产品文档、FAQ
- SQL工具连接订单系统
- 客服输入问题,系统提供参考答案
效果:
- 新客服上手快
- 答案标准化
- 响应时间缩短50%
场景3:数据分析助手
问题: 业务人员不会写SQL,但需要经常查数据。
解决方案:
- SQL工具连接数据仓库
- RAG工具提供分析方法论文档
- 自然语言查询,自动生成SQL
效果:
- 降低对数据团队的依赖
- 提升数据驱动决策效率
- 减少重复分析工作
六、关键经验和坑
搭建这个系统时,我踩过不少坑,分享几个关键经验:
1. 工具描述要精准
坑: 最开始我的SQL工具描述是"用于查询数据"——太模糊了!结果Agent经常选错。
解决: 改成"用于查询城市统计数据,包括人口、国家等信息。适合回答关于数字、排名、统计类的问题。"——具体、清晰、有例子。
2. 处理好并发调用
坑: 用户问"东京人口多少?文档里怎么说的?"——需要同时调用两个工具,但结果怎么组合?
解决:
- Workflow支持并发step
- 在
_combine_results里做好结果聚合 - 让LLM再做一次总结
3. 数据库连接要健壮
坑: SQLite内存数据库重启就没了,生产环境不能用。
解决:
# 生产环境用持久化数据库 engine = create_engine( "postgresql://user:pass@host:5432/db", pool_pre_ping=True, # 检查连接有效性 pool_size=10, # 连接池 )4. Token消耗要控制
坑: 每次查询都把全部上下文传给LLM,token消耗巨大。
解决:
- 只传必要的上下文
- 使用更便宜的模型做路由(gpt-3.5-turbo)
- 缓存常见问题的答案
5. 错误处理要完善
坑: SQL语法错误、网络超时、API限流都会导致系统崩溃。
解决:
try: result = await tool.aquery(query) except SQLAlchemyError as e: result = f"数据库查询失败: {str(e)}" except TimeoutError: result = "查询超时,请稍后重试" except Exception as e: result = f"系统错误: {str(e)}" # 记录日志 logger.error(f"Tool error: {e}", exc_info=True)七、性能优化建议
如果你要把这套系统用到生产环境,这些优化必不可少:
1. 缓存机制
from functools import lru_cache import hashlib @lru_cache(maxsize=1000) def cached_query(query_hash: str): """缓存查询结果""" # 实际查询逻辑 pass # 使用时 query_hash = hashlib.md5(query.encode()).hexdigest() result = cached_query(query_hash)2. 批量查询
对于相似查询,批量处理:
async def batch_query(queries: List[str]): """批量执行查询""" tasks = [workflow.run(query=q) for q in queries] return await asyncio.gather(*tasks)3. 监控和日志
import logging from datetime import datetime logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('rag_sql_router.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 在关键位置记录 logger.info(f"Query received: {query}") logger.info(f"Tool selected: {tool_name}") logger.info(f"Response time: {elapsed:.2f}s")4. 限流保护
from functools import wraps import time def rate_limit(max_calls: int, time_window: int): """限流装饰器""" calls = [] def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): now = time.time() # 清理过期记录 calls[:] = [c for c in calls if c > now - time_window] if len(calls) >= max_calls: raise Exception("请求过于频繁,请稍后再试") calls.append(now) return await func(*args, **kwargs) return wrapper return decorator @rate_limit(max_calls=10, time_window=60) async def handle_query(query: str): """处理查询,每分钟最多10次""" pass八、总结
看到这里,你大概已经能感受到:RAG-SQL Router 这东西看起来像“多加了一个路由层”,但真正解决的是落地时最容易被忽视的一件事——把“该去哪儿找答案”这一步交给系统,而不是交给用户。
很多企业内部问答做不起来,并不是因为模型不够强,而是因为入口不够统一:同一句话问出来,有时候是制度条款,有时候是数据口径,有时候还夹杂着流程步骤。人能凭经验判断“先查文档还是先查库”,但系统如果没有这个判断能力,就会变成两种尴尬:
- 只做RAG:回答得像“引用资料”,听起来挺像那么回事,但数字经不起核对;
- 只做Text-to-SQL:能查到数,但对“为什么这么算”“规则写在哪”完全没有解释能力。
Router 的价值,就是让这两套能力不再互相拖后腿,而是互相补位:需要确定性的时候走SQL,需要背景解释的时候走RAG,复杂一点的问题就两条路一起跑,最后再把结果合到一张桌子上给你。
普通人如何抓住AI大模型的风口?
领取方式在文末
为什么要学习大模型?
目前AI大模型的技术岗位与能力培养随着人工智能技术的迅速发展和应用 , 大模型作为其中的重要组成部分 , 正逐渐成为推动人工智能发展的重要引擎 。大模型以其强大的数据处理和模式识别能力, 广泛应用于自然语言处理 、计算机视觉 、 智能推荐等领域 ,为各行各业带来了革命性的改变和机遇 。
目前,开源人工智能大模型已应用于医疗、政务、法律、汽车、娱乐、金融、互联网、教育、制造业、企业服务等多个场景,其中,应用于金融、企业服务、制造业和法律领域的大模型在本次调研中占比超过30%。
随着AI大模型技术的迅速发展,相关岗位的需求也日益增加。大模型产业链催生了一批高薪新职业:
人工智能大潮已来,不加入就可能被淘汰。如果你是技术人,尤其是互联网从业者,现在就开始学习AI大模型技术,真的是给你的人生一个重要建议!
最后
只要你真心想学习AI大模型技术,这份精心整理的学习资料我愿意无偿分享给你,但是想学技术去乱搞的人别来找我!
在当前这个人工智能高速发展的时代,AI大模型正在深刻改变各行各业。我国对高水平AI人才的需求也日益增长,真正懂技术、能落地的人才依旧紧缺。我也希望通过这份资料,能够帮助更多有志于AI领域的朋友入门并深入学习。
真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发
【附赠一节免费的直播讲座,技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等,欢迎大家~】
大模型全套学习资料展示
自我们与MoPaaS魔泊云合作以来,我们不断打磨课程体系与技术内容,在细节上精益求精,同时在技术层面也新增了许多前沿且实用的内容,力求为大家带来更系统、更实战、更落地的大模型学习体验。
希望这份系统、实用的大模型学习路径,能够帮助你从零入门,进阶到实战,真正掌握AI时代的核心技能!
01教学内容
从零到精通完整闭环:【基础理论 →RAG开发 → Agent设计 → 模型微调与私有化部署调→热门技术】5大模块,内容比传统教材更贴近企业实战!
大量真实项目案例:带你亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事!
02适学人群
应届毕业生:无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。
零基础转型:非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界。
业务赋能突破瓶颈:传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型。
vx扫描下方二维码即可
【附赠一节免费的直播讲座,技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等,欢迎大家~】
本教程比较珍贵,仅限大家自行学习,不要传播!更严禁商用!
03入门到进阶学习路线图
大模型学习路线图,整体分为5个大的阶段:
04视频和书籍PDF合集
从0到掌握主流大模型技术视频教程(涵盖模型训练、微调、RAG、LangChain、Agent开发等实战方向)
新手必备的大模型学习PDF书单来了!全是硬核知识,帮你少走弯路(不吹牛,真有用)
05行业报告+白皮书合集
收集70+报告与白皮书,了解行业最新动态!
0690+份面试题/经验
AI大模型岗位面试经验总结(谁学技术不是为了赚$呢,找个好的岗位很重要)
07 deepseek部署包+技巧大全
由于篇幅有限
只展示部分资料
并且还在持续更新中…
真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发
【附赠一节免费的直播讲座,技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等,欢迎大家~】
