RAG(Retrieval-Augmented Generation,检索增强生成) 是一种将信息检索(Retrieval)与生成式模型(Generation)相结合的AI架构模式。其核心目标是在大模型生成内容之前,引入外部知识检索,从而提升回答的准确性与可控性。
RAG 的主要优势包括:
降低幻觉(Hallucination):通过引入真实数据作为上下文,减少模型“编造内容”的概率
支持私有知识体系:可接入企业内部数据(如文档、数据库等)
降低推理成本:无需频繁微调模型,通过检索即可动态补充知识
从工程实现角度来看,RAG 的基本流程可以抽象为:
数据 → 向量化(Embedding)→ 向量数据库 → 相似度检索 → 大模型生成 → 返文本

本质上是:
先从向量数据库中检索出与问题最相关的上下文数据,再将这些数据作为提示(Prompt)输入给大模型,由模型进行归纳与生成最终答案。
本文主要以Weaviate向量数据库展开演示;首先我们来了解一下这个数据库有2个大的功能一个是文本搜索:它和es数据库一样,都是用的倒排索引(把词映射到包含该词的文档id列表)。另外一个功能则是向量搜索:他也叫语义搜索,他的底层索引主要用的是HNSW (可分层导航小世界图,以例子:我需要在城里找个人,这相当于大海捞针;但是在HNSW索引就能快速找到,简单拆分为三步,第一层,顶层高速路层,主要特点是人少,主要住着超级名人,市长超级首富类型,第二层,中层区域名人层,例如区长,当地首富,人数中等,第三层,底层所有人都在那;他的索引逻辑其实就是通过先找到顶层->中层->底层的方式寻找),但这是索引,想要出来对应的数据还得靠相似度量度(支持余弦相似度、欧几里得距离);余弦相似度是两个向量夹角的余弦值,他的核心公式是
,可以理解一个多维向量根据公式代入后,越接近1则相似,接近-1则不相似。总结就是衡量两个向量的方向一致性取值范围:[-1, 1]越接近 1,表示语义越相似不受向量长度(大小)影响本质:判断“特征方向是否一致”,而不是看大小。欧几里得距离则是看两个点的距离离得有多远,他的核心公式是
。衡量两个向量在空间中的实际距离数值越小,表示越相似对向量的“数值差异”敏感本质:衡量“数值差距有多大”区别:欧几里得距离:看“差多少”;余弦相似度:看“像不像方向”。总结RAG 的本质可以理解为:通过向量检索找到“最相关的知识”,再交由大模型进行“理解与生成”而在这一过程中:HNSW 负责“快速找到候选数据”相似度函数(余弦/欧几里得) 负责“判断谁最接近”
接下来我们来安装一下weaviate
新建 docker-compose.yml
version: '3.4' services:weaviate:image: semitechnologies/weaviate:latestports:- "8080:8080"environment:QUERY_DEFAULTS_LIMIT: 25AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: "true"PERSISTENCE_DATA_PATH: "/var/lib/weaviate"DEFAULT_VECTORIZER_MODULE: "none"ENABLE_MODULES: ""CLUSTER_HOSTNAME: "node1"volumes:- weaviate_data:/var/lib/weaviatevolumes:weaviate_data:
docker-compose up -d
如果能访问下面页面就证明已经安装成功了

安装完成后,我们献上我们的python代码
from fastapi import APIRouter, HTTPException from app.models.schemas import AskRequest, AgentResponse,InsertRequest from app.services.mcp_client import McpClient from app.services.llm_client import LLMClient from app.services.rag_client import RagClient import os import jsonrouter = APIRouter()# 配置通过环境变量注入 MCP_BASE = os.getenv("MCP_BASE", "http://localhost:5265") LLM_BASE = os.getenv("LLM_BASE", "https://localhost:5001") LLM_API_KEY = os.getenv("LLM_API_KEY", "") rag_client = RagClient() mcp_client = McpClient(MCP_BASE) llm_client = LLMClient(LLM_BASE, api_key=LLM_API_KEY)@router.post("/ask", response_model=AgentResponse) async def ask(req: AskRequest):# 1) 列出 MCP 工具并转换try:mcp_schema = await mcp_client.list_tools()except Exception as ex:raise HTTPException(status_code=502, detail=f"MCP list_tools failed: {ex}")tools = mcp_client.convert_mcp_tools_to_llm(mcp_schema)# 2) 初次向 LLM 发送用户问题system_prompt = """你是一个智能助手。规则:1. 遇到数据查询、系统操作、业务逻辑,必须调用工具2. 不要凭空编造数据3. 工具返回后再总结给用户"""initial_messages = [{"role": "system", "content": system_prompt},{"role": "user", "content": req.query}]llm_resp = await llm_client.chat(model=req.model or "deepseek-chat",messages=initial_messages,tools=tools,tool_choice="auto", # ✅ 核心temperature=req.temperature,max_tokens=req.max_tokens)# 3) 解析 LLM 返回并判断是否要调用工具try:choice = llm_resp["choices"][0]message = choice["message"]except Exception:raise HTTPException(status_code=502, detail="Invalid response from LLM")# 检查可能的工具调用(兼容 message.tool_calls 形式)tool_calls = message.get("tool_calls") or message.get("tool_call") or []if tool_calls:tool_call = tool_calls[0]# 解析函数名与参数(不同实现可能字段不同)function_name = Nonearguments = {}if isinstance(tool_call, dict):# 兼容结构: tool_call.function.name / tool_call.function.argumentsfn = tool_call.get("function") or {}function_name = fn.get("name") or tool_call.get("name")args_raw = fn.get("arguments") or tool_call.get("arguments") or "{}"if isinstance(args_raw, str):try:arguments = json.loads(args_raw)except Exception:arguments = {}else:arguments = args_rawelse:raise HTTPException(status_code=502, detail="Unsupported tool_call format")# 4) 调用 MCP 中的工具try:tool_result = await mcp_client.call_tool(function_name, arguments)except Exception as ex:raise HTTPException(status_code=502, detail=f"MCP call_tool failed: {ex}")# 5) 将工具结果发送回 LLM,得到最终回答# 附上 assistant 的函数调用消息和工具结果消息messages_for_final = [{"role": "assistant","content": message.get("content") or "","tool_calls": message.get("tool_calls")},{"role": "tool","tool_call_id": tool_call.get("id"),"content": json.dumps(tool_result or {}, ensure_ascii=False)}]final_resp = await llm_client.chat(model=req.model or "gpt-like", messages=messages_for_final)try:final_content = final_resp["choices"][0]["message"]["content"]except Exception:raise HTTPException(status_code=502, detail="Invalid final response from LLM")return AgentResponse(answer=final_content, tool_called=function_name, tool_result=tool_result)else:# ✅ RAG:从 Weaviate 检索上下文rag_contexts = []try:rag_contexts = await rag_client.search(req.query)except Exception as ex:# RAG失败不影响主流程rag_contexts = []# ✅ 拼接上下文context_text = "\n".join([d["content"] for d in rag_contexts])final_query = req.queryif context_text:final_query = f"""请基于以下上下文回答问题:{context_text}问题:{req.query}"""initial_messages = [{"role": "user", "content": final_query}]llm_resp = await llm_client.chat(model=req.model or "deepseek-chat",messages=initial_messages,tools=tools,temperature=req.temperature,max_tokens=req.max_tokens)try:message = llm_resp["choices"][0]["message"]except:raise HTTPException(status_code=502, detail="LLM response invalid")# 如果没有工具调用,直接返回 LLM 的回复内容final_content = message.get("content") or ""return AgentResponse(answer=final_content)@router.post("/rag/insert") async def insert_rag(req: InsertRequest):try:if not req.texts:raise HTTPException(status_code=400, detail="texts不能为空")rag_client.insert_documents(req.texts)return {"success": True,"count": len(req.texts)}except Exception as ex:raise HTTPException(status_code=500, detail=f"RAG insert failed: {ex}")
下面是大模型调用
import httpx from typing import Any, Dict, List, Optionalclass LLMClient:def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 60):self.base_url = base_url.rstrip("/")self.api_key = api_keyself.timeout = timeoutasync def chat(self,model: str,messages: List[Dict],tools: List[Dict] = None,tool_choice: Any = None,temperature: float = 0.7,max_tokens: int = 512) -> Dict:url = f"{self.base_url}/v1/chat/completions"payload = {"model": model,"messages": messages,"temperature": temperature,"max_tokens": max_tokens}if tools is not None:payload["tools"] = toolsif tool_choice is not None:payload["tool_choice"] = tool_choiceheaders = {"Content-Type": "application/json"}if self.api_key:headers["Authorization"] = f"Bearer {self.api_key}"try:async with httpx.AsyncClient(timeout=self.timeout,verify=False, # ✅ 解决 https localhost 证书问题follow_redirects=True # ✅ 自动处理 307 ) as client:response = await client.post(url, json=payload, headers=headers)# ✅ 打印详细信息(关键)if response.status_code != 200:print("==== LLM ERROR ====")print("URL:", url)print("Status:", response.status_code)print("Response:", response.text)raise Exception(f"LLM request failed: {response.text}")return response.json()except httpx.RequestError as e:raise Exception(f"请求失败: {str(e)}")except Exception as e:raise Exception(f"LLM异常: {str(e)}")
rag类实现代码
import weaviate import os import requests import uuidclass RagClient:def __init__(self):self.client = weaviate.Client(url=os.getenv("WEAVIATE_URL", "http://111:8080"))async def search(self, query: str = None, vector: list = None, top_k: int = 3):q = self.client.query.get("Document", ["content"])# ✅ 优先使用 vectorif vector is None and query:vector = self.get_embedding([query])[0] # 👈 自动转向量if vector:q = q.with_near_vector({"vector": vector,"certainty": 0.7 # 👈 强烈建议加 })else:raise ValueError("query 和 vector 不能同时为空")result = (q.with_additional(["certainty", "distance"]).with_limit(top_k).do())docs = result.get("data", {}).get("Get", {}).get("Document", [])return [{"content": d.get("content"),"score": (d.get("_additional") or {}).get("certainty"),"distance": (d.get("_additional") or {}).get("distance")}for d in docsif (d.get("_additional") or {}).get("certainty", 0) >= 0.7]def get_embedding(self, texts):url = "https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding/text-embedding"headers = {"Authorization": "Bearer ", # ❗别写死"Content-Type": "application/json"}body = {"model": "text-embedding-v4","input": {"texts": texts}}response = requests.post(url, headers=headers, json=body, timeout=10)response.raise_for_status()data = response.json()if "output" not in data:raise Exception(f"embedding失败: {data}")return [item["embedding"] for item in data["output"]["embeddings"]]def insert_documents(self, texts: list):"""texts: ["文本1", "文本2", ...]"""if not texts:return# 1️⃣ 获取向量embeddings = self.get_embedding(texts)# 2️⃣ 批量写入 with self.client.batch as batch:batch.batch_size = 20for text, vector in zip(texts, embeddings):batch.add_data_object(data_object={"content": text},class_name="Document",uuid=str(uuid.uuid4()), # 唯一IDvector=vector # 👈 关键:手动传入向量 )print(f"✅ 成功插入 {len(texts)} 条数据")
由于我这里添加了mcp工具,所以以下是mcp代码
import httpx import json from typing import Any, Dictclass McpClient:def __init__(self, base_url: str, timeout: int = 10):self.base_url = base_url.rstrip("/")self._endpoint = f"{self.base_url}/api/mcp"self._id = 0self._timeout = timeoutasync def _call(self, method: str, params: Dict = None) -> Any:self._id += 1payload = {"jsonrpc": "2.0","method": method,"id": self._id}if params:payload["params"] = paramsasync with httpx.AsyncClient(timeout=self._timeout) as client:r = await client.post(self._endpoint, json=payload)r.raise_for_status()data = r.json()if "error" in data and data["error"] is not None:raise Exception(data["error"].get("message", str(data["error"])))return data.get("result")async def list_tools(self) -> Dict:return await self._call("tools/list")async def call_tool(self, name: str, arguments: Dict) -> Any:return await self._call("tools/call", {"name": name, "arguments": arguments})def convert_mcp_tools_to_llm(self, mcp_json: Dict) -> list:tools = []for tool in mcp_json.get("tools", []):tools.append({"type": "function","function": {"name": tool["name"],"description": tool.get("description", ""),"parameters": tool.get("inputSchema", {"type": "object"})}})return tools
实体类:
from pydantic import BaseModel from typing import Optional, Any,Listclass AskRequest(BaseModel):query: strmodel: Optional[str] = Nonetemperature: float = 0.7max_tokens: int = 512class AgentResponse(BaseModel):answer: strtool_called: Optional[str] = Nonetool_result: Optional[Any] = Noneclass InsertRequest(BaseModel):texts: List[str]
首先打开swagger,我们插入数据到我们的向量数据库,可以看到返回的是true,证明我们插入成功

下面我们进行搜索并让ai给我总结,如图:

从最终效果来看,系统已经完成了 向量数据库(Vector DB)与大模型(LLM)的协同调用闭环:
- 当用户提出问题(例如:“今天吃什么水果”)时
- 系统首先通过 向量检索(Vector Search) 在知识库中召回相关内容(如水果种类)
- 然后将召回结果作为上下文输入给大模型
- 最终由大模型进行语义理解与总结生成自然语言回答
这一流程本质上实现了典型的 RAG(Retrieval-Augmented Generation)架构。
MCP 工具机制(Tools)说明
在此基础上,引入了 MCP(Model Context Protocol)工具机制,用于扩展大模型的外部能力调用。
可以将 MCP 工具理解为:
一种标准化的远程工具调用协议,本质上是 LLM 的 Tools 扩展机制的工程化实现。
其核心能力包括:
- 工具注册与发现(Tool Discovery)
系统可以动态获取远程服务提供的 MCP 工具列表(Tools Registry) -
模型驱动决策(Tool Selection by LLM)
大模型根据当前用户输入及上下文,自动判断是否需要调用某个工具 -
远程调用执行(Remote Invocation)
一旦决定调用工具,系统通过协议(如 gRPC / HTTP)发起远程调用 -
结果回流与生成(Result Integration)
- 工具返回结果 → 作为上下文补充
- 或直接由模型进行总结生成最终响应
在微服务架构中的映射
在微服务体系下,MCP 工具可以自然映射为:
- 一个独立服务(Service)
- 一个 gRPC / HTTP API
- 或一个领域能力接口(Domain Capability)
例如:
| MCP 工具 | 实际服务 |
|---|---|
| FruitSearchTool | 向量检索服务 |
| OrderQueryTool | 订单系统 |
| MaintenanceTool | 设备点检系统 |
因此:
Agent 聚合服务本质上是一个“智能调度层”,通过 MCP 协议统一编排各个微服务能力。
当前实现流程总结
结合你的实现,可以抽象为以下执行链路:
-
获取 MCP 工具列表
GetAvailableTools()→ 从远程服务或注册中心拉取工具定义
-
构建 Prompt + Tools
LLM(Input + Context + Tools) - 模型决策
- 是否调用工具
- 调用哪个工具
- 构造参数
-
执行工具调用
Call MCP Tool (gRPC / HTTP) - 返回结果
- 直接返回
- 或再次交给 LLM 总结
以下可以看的出来,下面就是获取所有mcp工具
try:mcp_schema = await mcp_client.list_tools()except Exception as ex:raise HTTPException(status_code=502, detail=f"MCP list_tools failed: {ex}")tools = mcp_client.convert_mcp_tools_to_llm(mcp_schema)
注意下面由大模型决定调用工具的核心是写入工具参数和开启工具自动调用参数
llm_resp = await llm_client.chat(
model=req.model or "deepseek-chat",
messages=initial_messages,
tools=tools,//写入工具
tool_choice="auto", # ✅ 核心
temperature=req.temperature,
max_tokens=req.max_tokens
)
下面我们来实现mcp远程服务,MCP的概念这章就不多说,如果想了解可去 这种主要以实现为主主要以http为主,以下我主要列出几个重要部分,下面代码我主要以net来实现,从web业务类来看用net实现更佳
下面是我们mcp入口
[Route("api/[controller]")] [ApiController] public class McpController : ControllerBase {private readonly JsonRpcDispatcher _dispatcher;public McpController(JsonRpcDispatcher dispatcher){_dispatcher = dispatcher;}[HttpPost]public IActionResult Handle([FromBody] JsonRpcRequest request){//if (request.Jsonrpc != "2.0")// return Ok(request.Id, -32600, "Invalid JSON-RPC version");try{var result = _dispatcher.Dispatch(request);return Ok(new JsonRpcResponse{Id = request.Id,Result = result});}catch (Exception ex){return Ok(new JsonRpcResponse{Id = request.Id,Error = new JsonRpcError{Code = -32603,Message = ex.Message}});}} }
工具调用类
public class McpRpcMethods {private IToolService _toolService;public McpRpcMethods(IToolService toolService){_toolService = toolService;}[JsonRpcMethod("tools/list")]public object ToolsList(){var tools = _toolService.ListTools();return tools;}[JsonRpcMethod("tools/call")]public object ToolsCall(JsonElement param){var name = param.GetProperty("name").GetString();var args = param.GetProperty("arguments");var res = _toolService.CallTool(name,args);return res;} }
工具实现类
public class ToolService : IToolService{public object ListTools(){return new{tools = new[]{new{name = "get_weather",description = "查询天气预报,可根据城市名查询",inputSchema = new{type = "object",properties = new{city = new{type = "string",description = "城市名,例如中山"}},required = new[] { "city" }}}}};}public object CallTool(string name, JsonElement arguments){if (name == "get_weather"){var city = arguments.GetProperty("city").GetString();return new{content = $"{city}今天晴天,气温20度"};}throw new Exception("Tool not found");}}
在工具调用体系中,Prompt Engineering(提示词工程)是决定工具触发效果的核心因素之一。相比传统规则匹配,大模型是否能够正确触发 Tool Call,在很大程度上依赖于 Tool Schema 的表达质量。
其中,description 字段在整个工具体系中起到关键作用,它不仅是工具的语义说明,更是大模型进行决策的重要上下文信号。
因此,可以借助 AI 对工具进行自动化增强,例如:
- 自动生成更符合语义理解的
description - 优化工具参数的自然语言表达方式
- 提升 Tool 对 LLM 的可识别性与可调用性
本质上,这一步是在构建 面向大模型优化的 Tool Prompt Layer。
工具调用与 RAG 的编排策略
在当前实现中,整体执行流程采用的是一种较为直接的策略:
优先进行 Tool 调用,若 Tool 未命中,则回退至 RAG 向量检索,再由大模型进行统一总结。
这种方式在部分业务场景中是可行的,但从系统设计角度来看,并非最优解。
Agent RAG 的优化思路
在更进一步的架构演进中,可以引入 Agent RAG(智能代理增强检索生成)模式,对原有流程进行重构,使整个系统具备更强的动态决策能力。
其核心思想是:
将“检索、工具调用、生成”从线性流程升级为由 LLM 驱动的动态决策过程。
一种典型的 Agent RAG 实现方式如下:
- LLM 进行任务理解与意图识别
- 判断用户问题类型
- 决定是否需要调用工具(Tool Use)
- 工具优先调用(Tool First Strategy)
- 若命中工具,则执行远程调用(如 MCP / gRPC 服务)
- 增强检索(RAG Augmentation)
- 在工具返回结果基础上
- 或在无工具命中时
- 进行向量数据库检索补充上下文
- 统一生成(Final Synthesis)
- 由 LLM 对“工具结果 + RAG结果 + 原始问题”进行融合总结
- 输出最终答案
架构演进本质
从系统演进角度来看,Agent RAG 并不是一种固定实现,而是一种面向业务的动态编排范式,其核心变化在于:
- 从 “固定流程编排”
→ 升级为 “LLM驱动的动态调度” -
从 “工具优先 or RAG优先”
→ 升级为 “由模型自主决策路径”
工程实践结论
可以总结为:
Agent RAG 的本质不是单一技术实现,而是一种“以大模型为调度核心”的系统设计模式。
通过将 Tool Calling 与 RAG 统一纳入模型决策层,实现检索能力与执行能力的协同增强。
在实际工程中,没有唯一标准实现方式,关键在于:
选择最契合当前业务复杂度、响应延迟与可维护性的组合架构。
从整体来看,无论是知识库(RAG)还是 Tools(MCP / Function Calling)调用,本质上都属于应用层能力的组合与编排。
整体实现并不复杂,关键在于理解并掌握其核心范式:
- 知识库侧:基于向量检索实现语义增强(RAG)
- 工具侧:基于 Prompt + Schema 驱动的动态工具调用(Tools / MCP)
- 调度层:由大模型完成意图识别与路径决策
因此,从工程实现角度来看,核心工作更多集中在应用层架构设计与调用链路编排,而非底层算法复杂度。
AI 加持下的开发效率
在引入大模型能力之后,系统构建的门槛进一步降低:
只要具备基础的系统设计能力,并结合 AI 辅助开发(如 Prompt 生成、工具描述优化、代码生成等),即可较快构建出适用于企业业务场景的知识库与智能问答系统。
换句话说,当前阶段的关键不再是“是否能实现”,而是“如何设计更合适的组合方式”。
关于上下文(Context)管理
在完整的 Agent 系统中,上下文管理是不可或缺的一环,它直接影响模型的连续推理能力与任务理解能力。
当前示例中暂未引入完整的 Context 管理机制,该部分通常包括:
- 对话历史的结构化存储
- Token 长度控制与压缩策略
- 关键信息摘要(Summarization)
- 长短期记忆分层管理
从实现难度上来看,上下文管理本身并不复杂,核心在于:
如何在 Token 限制与信息完整性之间取得平衡
其实现方式也较为灵活,可以基于:
- 简单轮次截断
- 滑动窗口机制
- 摘要压缩
- 向量化记忆存储(Memory RAG)
总结
整体而言,无论是知识库还是工具调用,本质上都属于大模型应用层能力的工程化落地。
在掌握基本的 RAG 与 Tools 调用逻辑之后:
已经具备快速构建企业级知识库与智能 Agent 系统的基础能力。
后续只需逐步完善上下文管理、任务编排与记忆机制,即可持续提升系统的智能化水平。
