当前位置: 首页 > news >正文

让代码知识库“活”起来:给 Ollama + RAG 代码仓库加上增量更新与自动同步

本文承接上一篇《问代码仓库任何问题:用 Ollama Embedding + RAG 搭一个本地代码知识库》,默认你已经完成基础 RAG 环境与初始索引构建。代码基于本地 Ollama + Qwen3.5 实测整理。


一、为什么你的代码知识库需要“增量更新”?

上一篇我们实现了一个完整的本地代码知识库:

  • 使用Ollama 的 embedding 模型向量化代码片段;
  • 使用ChromaDB存储向量;
  • 使用Qwen3.5做最终问答;
  • 可以在命令行中,对整个仓库问任何问题。

但有一个现实问题:

代码每天都在变,知识库如果不跟着更新,很快就“过期”了。

典型痛点:

  • 仓库一大(几万到几十万行),重建一次索引要好几分钟
  • 每次改两行代码,都去python build_index.py全量重建,非常浪费时间;
  • 新人入职 / 项目交接时,如果知识库没更新,会给出过时答案

这一篇的目标就是:

在不推翻上一篇架构的前提下,给代码知识库加上**“增量索引 + 自动同步”**能力:

  • 只处理变更文件
  • 提交代码时自动更新索引;
  • 整个过程尽可能“无感”。

我们来看下前端效果:


二、整体设计思路:用文件哈希做“变更感知”

为了实现“只索引变更文件”,我们需要解决三个问题:

  1. 怎么判断一个文件“变了”?​
  2. 怎么只删除/更新这些文件对应的向量?​
  3. 怎么在开发流程中自动触发更新?​

这里我们用一套相对简单、但非常实用的方案:

  1. 对每个已索引文件,记录一份内容哈希(SHA-256)​
  2. 每次准备更新索引时:
    • 遍历一遍仓库里的目标文件(如*.py, *.md, *.ts);
    • 计算当前哈希,与上次记录的哈希做对比;
    • 哈希变了 → 说明内容变了 → 需要重建该文件的所有 chunk;
    • 文件在记录里有,但磁盘上已经不存在 → 属于被删除文件,需要从向量库里移除。
  3. 把这份“文件哈希表”和“最后索引时间”写入一个 JSON 状态文件(比如index_state.json),下次增量更新时继续用。

好处:

  • 实现简单,不依赖 git,可以用于任何代码目录(甚至不在 git 仓库里的文档目录);
  • 只要文件内容变了一点点,就会被识别出来;
  • 删除文件也会被正确处理,不会在知识库里留下“僵尸文档”。

三、在原有 CodeRAG 上加一层增量索引能力

下面这段代码可以看作是原先CodeRAG的“进阶版”
在保持原有ingest()/ask()接口风格的前提下,补充了:

  • 状态管理:index_state.json
  • incremental_ingest():只索引变更文件
  • full_ingest():全量重建时顺便刷新状态

你可以新建一个文件code_rag_incremental.py(或直接把原来的code_rag.py替换掉),内容如下:

import os import glob import json import hashlib import datetime from typing import List, Tuple, Dict import chromadb import ollama from tqdm import tqdm class IncrementalCodeRAG: """ 支持增量更新的代码知识库 RAG: - full_ingest(): 全量构建索引(初次或重建用) - incremental_ingest(): 增量更新(只处理变更文件) - ask(): 检索 + 调 Qwen3.5 回答 """ def __init__(self, repo_path: str, db_path: str = "./code_rag_db", state_file: str = "./index_state.json"): self.repo_path = os.path.abspath(repo_path) self.db_path = db_path self.state_file = state_file self.embed_model = "nomic-embed-text" self.chat_model = "qwen3.5:7b-instruct-q4_0" # 初始化向量库 self.client = chromadb.PersistentClient(path=db_path) self.collection = self.client.get_or_create_collection( name="code_collection", metadata={"hnsw:space": "cosine"} ) # 加载/初始化状态 self._load_state() # ========= 状态管理相关 ========= def _load_state(self): if os.path.exists(self.state_file): with open(self.state_file, "r", encoding="utf-8") as f: self.state = json.load(f) else: self.state = { "last_build_time": datetime.datetime.now().isoformat(), "file_hashes": {} # {rel_path: sha256} } self._save_state() def _save_state(self): with open(self.state_file, "w", encoding="utf-8") as f: json.dump(self.state, f, ensure_ascii=False, indent=2) def _file_sha256(self, path: str) -> str: try: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() except Exception: return "" # ========= 基础 IO 与切分 ========= def _read_file(self, file_path: str) -> str: try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"✗ 读取失败: {file_path} - {e}") return "" def _split_chunks(self, text: str, max_len: int = 800) -> List[str]: lines = text.split("\n") chunks, cur, cur_len = [], [], 0 for line in lines: l = len(line) if cur_len + l > max_len and cur: chunks.append("\n".join(cur)) cur, cur_len = [], 0 cur.append(line) cur_len += l if cur: chunks.append("\n".join(cur)) return chunks def _iter_files(self, patterns: List[str] = None) -> List[str]: if patterns is None: patterns = ["*.py", "*.md", "*.js", "*.ts", "*.java", "*.go"] all_files = [] for pat in patterns: all_files.extend( glob.glob(os.path.join(self.repo_path, "**​", pat), recursive=True) ) return all_files # ========= 全量索引 ========= def full_ingest(self, patterns: List[str] = None, batch_size: int = 64): print(f"🚀 开始全量索引:{self.repo_path}") # 清空旧数据 existing = self.collection.get() if existing.get("ids"): self.collection.delete(ids=existing["ids"]) files = self._iter_files(patterns) print(f"✓ 发现 {len(files)} 个文件,开始切分 + 向量化") all_ids, all_docs, all_metas = [], [], [] file_hashes: Dict[str, str] = {} for file_path in files: rel = os.path.relpath(file_path, self.repo_path) content = self._read_file(file_path) if not content.strip(): continue chunks = self._split_chunks(content) file_hashes[rel] = self._file_sha256(file_path) for idx, chunk in enumerate(chunks): doc_id = f"{rel}::{idx}" all_ids.append(doc_id) all_docs.append(chunk) all_metas.append({"file_path": rel, "chunk_index": idx}) for i in tqdm(range(0, len(all_docs), batch_size), desc="向量化"): batch_docs = all_docs[i:i + batch_size] resp = ollama.embeddings(model=self.embed_model, prompt=batch_docs) embeddings = resp["embeddings"] if "embeddings" in resp else resp self.collection.add( ids=all_ids[i:i + batch_size], documents=batch_docs, metadatas=all_metas[i:i + batch_size], embeddings=embeddings, ) # 更新状态 self.state["last_build_time"] = datetime.datetime.now().isoformat() self.state["file_hashes"] = file_hashes self._save_state() print("✅ 全量索引完成") # ========= 增量索引 ========= def _scan_changes(self, patterns: List[str] = None) -> Tuple[List[str], List[str]]: """ 返回 (需要更新的绝对路径列表, 被删除的相对路径列表) """ files = self._iter_files(patterns) old_hashes: Dict[str, str] = self.state.get("file_hashes", {}) current_hashes: Dict[str, str] = {} changed_abs: List[str] = [] deleted_rel: List[str] = [] # 现有文件:计算哈希并比对 for file_path in files: rel = os.path.relpath(file_path, self.repo_path) h = self._file_sha256(file_path) current_hashes[rel] = h if (rel not in old_hashes) or (old_hashes[rel] != h): changed_abs.append(file_path) # 找出“状态里有但磁盘上没有”的(已删除文件) for rel in list(old_hashes.keys()): abs_path = os.path.join(self.repo_path, rel) if not os.path.exists(abs_path): deleted_rel.append(rel) return changed_abs, deleted_rel, current_hashes def incremental_ingest(self, patterns: List[str] = None, batch_size: int = 64): print("🚀 开始增量索引...") changed_abs, deleted_rel, current_hashes = self._scan_changes(patterns) print(f"✓ 变更文件:{len(changed_abs)} 个,已删除文件:{len(deleted_rel)} 个") # 1. 删除已删除文件对应的向量 if deleted_rel: to_delete_ids = [] for rel in deleted_rel: # 粗暴一点:直接根据前缀匹配 id # 实际上可以用 where={"file_path": rel} 更优雅 res = self.collection.get() for _id, meta in zip(res["ids"], res["metadatas"]): if meta.get("file_path") == rel: to_delete_ids.append(_id) if to_delete_ids: self.collection.delete(ids=to_delete_ids) print(f"✓ 已删除 {len(to_delete_ids)} 个向量块") # 2. 重新索引变更文件 all_ids, all_docs, all_metas = [], [], [] for file_path in changed_abs: rel = os.path.relpath(file_path, self.repo_path) content = self._read_file(file_path) if not content.strip(): continue chunks = self._split_chunks(content) # 先删除该文件旧的所有块 existing = self.collection.get() del_ids = [ _id for _id, meta in zip(existing["ids"], existing["metadatas"]) if meta.get("file_path") == rel ] if del_ids: self.collection.delete(ids=del_ids) for idx, chunk in enumerate(chunks): doc_id = f"{rel}::{idx}" all_ids.append(doc_id) all_docs.append(chunk) all_metas.append({"file_path": rel, "chunk_index": idx}) if all_docs: for i in tqdm(range(0, len(all_docs), batch_size), desc="向量化变更文件"): batch_docs = all_docs[i:i + batch_size] resp = ollama.embeddings(model=self.embed_model, prompt=batch_docs) embeddings = resp["embeddings"] if "embeddings" in resp else resp self.collection.add( ids=all_ids[i:i + batch_size], documents=batch_docs, metadatas=all_metas[i:i + batch_size], embeddings=embeddings, ) print(f"✓ 已更新 {len(all_docs)} 个向量块") # 3. 刷新状态 self.state["last_build_time"] = datetime.datetime.now().isoformat() # 更新/合并哈希表 for rel, h in current_hashes.items(): self.state["file_hashes"][rel] = h for rel in deleted_rel: self.state["file_hashes"].pop(rel, None) self._save_state() print("✅ 增量索引完成") # ========= 检索 + 问答 ========= def query(self, question: str, top_k: int = 5): q_emb = ollama.embeddings(model=self.embed_model, prompt=question) q_vec = q_emb["embedding"] if "embedding" in q_emb else q_emb["embeddings"][0] res = self.collection.query( query_embeddings=[q_vec], n_results=top_k, ) docs = res["documents"][0] metas = res["metadatas"][0] scores = res["distances"][0] return [ {"text": d, "meta": m, "score": s} for d, m, s in zip(docs, metas, scores) ] def ask(self, question: str, top_k: int = 5, max_context_chars: int = 3000) -> str: candidates = self.query(question, top_k=top_k) context_parts = [] for item in candidates: fp = item["meta"]["file_path"] idx = item["meta"]["chunk_index"] sc = item["score"] context_parts.append( f"--- 文件: {fp} 片段#{idx} (相似度: {sc:.3f}) ---\n{item['text']}" ) context = "\n\n".join(context_parts) if len(context) > max_context_chars: context = context[:max_context_chars] + "\n...\n[上下文已截断]" prompt = f""" 你是一个资深软件工程师,现在有一个代码仓库的部分片段如下: {context} 用户问题是: {question} 请你基于这些代码片段,给出尽量准确的回答: 1. 用中文说明相关实现大致逻辑; 2. 如果涉及具体函数/类,请指明所在文件及大致位置; 3. 如有不确定的地方,要说明“不确定”,不要瞎编; 4. 可以适当给出重构或优化建议。 """ resp = ollama.chat( model=self.chat_model, messages=[{"role": "user", "content": prompt}], temperature=0.2, ) return resp["message"]["content"].strip()

四、实际使用流程:从“第一次全量”到“之后每次增量”

1. 第一次:全量索引

# build_full_index.py from code_rag_incremental import IncrementalCodeRAG if __name__ == "__main__": rag = IncrementalCodeRAG(repo_path="./your_repo") rag.full_ingest()

运行:

python build_full_index.py

完成后会多出:

  • code_rag_db/:Chroma 向量库数据
  • index_state.json:索引状态 + 每个文件的哈希

2. 之后:只做增量索引

# build_incremental.py from code_rag_incremental import IncrementalCodeRAG if __name__ == "__main__": rag = IncrementalCodeRAG(repo_path="./your_repo") rag.incremental_ingest()

每次修改代码后执行:

python build_incremental.py

就会:

  • 自动识别变更文件与删除文件;
  • 更新对应的向量块;
  • 刷新状态。

3. 问答保持不变

# ask_code_incremental.py from code_rag_incremental import IncrementalCodeRAG if __name__ == "__main__": rag = IncrementalCodeRAG(repo_path="./your_repo") q = "用户注册逻辑在哪些文件?主要步骤是什么?" print("问题:", q) print(rag.ask(q))

你可以直接把上一篇的ask_code.py替换为这个版本,整体用法不变。


五、把增量索引挂到 Git 提交流程里(pre-commit Hook)

如果你想彻底“无感”,可以在 git 提交前自动跑一遍增量索引。

1)新建.git/hooks/pre-commit(或编辑已有文件):

#!/bin/bash REPO_PATH=$(git rev-parse --show-toplevel) cd "$REPO_PATH" || exit 0 # 只在存在增量脚本时执行 if [ -f "code_rag_incremental.py" ]; then echo "运行 CodeRAG 增量索引..." python - <<EOF from code_rag_incremental import IncrementalCodeRAG rag = IncrementalCodeRAG(repo_path="$REPO_PATH") rag.incremental_ingest() EOF echo "CodeRAG 索引已更新" fi exit 0

2)赋予可执行权限:

chmod +x .git/hooks/pre-commit

之后你每次执行:

git commit -m "feat: xxx"

在真正提交前都会自动触发一次增量索引。你的“本地代码知识库”就始终和主干代码保持同步。


六、一些实战小建议

  • 何时需要全量重建?

    • 改了 chunk 切分策略
    • 调整了向量模型(比如从nomic-embed-text换成别的)
    • 清理了大量历史文件,想“从头整理”一遍
      → 这时候用full_ingest()跑一次即可。
  • 索引频率建议

    • 小团队开发:
      • 每次提交前自动增量,足够;
      • 不必每次保存代码就重建。
    • 大团队 / 大仓库:
      • 可以做一个“夜间全量 + 日间多次增量”的组合:
        • 白天:pre-commit 跑增量
        • 凌晨:CI 定时全量重建一次,防止状态漂移
  • 文件类型选择

    • 必选:.py / .ts / .js / .java / .go / .md
    • 可选:.yaml / .yml / .toml / .ini(配置相关)
    • 建议排除:node_modules / dist / build / .venv / .git等目录
http://www.jsqmd.com/news/456947/

相关文章:

  • 织梦dede后台登陆成功后返回登陆界面的解决办法
  • 织梦DedeCMS如何去掉首页域名后面的index.html
  • 敲降细胞裂解液如何优化用于分子机制研究的蛋白样本?
  • 上位机1000条/秒数据不丢不卡:SQLite持久化队列最优方案实战
  • OpenClaw和八大国产 “龙虾“智能体工具深度对比
  • 靠谱CNC自动编程厂家推荐|三轴编程效率拉满
  • 加密jar,防止反编译泄露
  • 嘉年华旅行社电话查询:如何有效联系与背景了解 - 品牌推荐
  • 驭“数”前行,智“惠”矿山——神东车辆安全技术管控平台引领煤炭行业安全管理新变革
  • 2026年一体化泵站厂家推荐:河北妍博环保设备有限公司,全系一体化泵站及截流井解决方案 - 品牌推荐官
  • 2026年被动元器件优质厂家推荐:东莞普利特科技,工业/车规/国产/进口全品类覆盖 - 品牌推荐官
  • Highcharts曲线图(Spline Chart)使用指南|连续自然平滑趋势的可视化艺术图表
  • 高性价比液压传动实验台厂家精选排行:教学实验台/教学陈列柜厂家/模型静态无语音解说陈列柜/模型静态陈列柜/气动PLC控制实验台/选择指南 - 优质品牌商家
  • 小龙虾openclaw的竞品,KimiClaw深度解析
  • 2026武汉财税服务优选推荐:慧援实业有限公司,专注财税筹划/代理/规划/咨询全周期服务 - 品牌推荐官
  • 织梦后台修改文件:DedeCMS:CSRF Token Check Failed提示
  • 2026年工业水处理树脂回收推荐:廊坊乾纳环保科技,专业回收超纯水/阴阳离子交换等废旧树脂 - 品牌推荐官
  • 小龙虾openclaw的竞品,QClaw / WorkBuddy深度解析
  • 织梦dedecms出现Fatal error:Call to undefined function ParCv()的解决
  • 小龙虾openclaw的竞品,ArkClaw深度解析
  • 2026企业股权争议处理推荐:上海市浩信律师事务所,专业处理合作企业投资股权争议等案件 - 品牌推荐官
  • 小龙虾openclaw的竞品,CoPaw深度解析
  • 山东苗木种植厂家高性价比精选推荐:黑心菊/鼠尾草/万寿菊/三色堇/二月兰/四季海棠/四季美女樱/天竺葵/太阳花/选择指南 - 优质品牌商家
  • 2026家用咖啡机推荐:美的小家电半自动/全自动/便携/入门款全解析,科技引领品质生活 - 品牌推荐官
  • 《空性词哲学研究——概念的空灵与思想的实存》深度研究报告
  • 2026年粉体输送系统厂家推荐:东莞星瑞机械设备有限公司,正压/负压/粒料输送设备全系供应 - 品牌推荐官
  • 小龙虾openclaw的竞品,Xiaomi miclaw深度解析
  • 小龙虾openclaw的竞品,ZeroClaw深度解析
  • 2025年环保胶黏剂实力推荐:山东绿康建材集团,云石胶/硅酮胶/美缝剂等全系产品供应 - 品牌推荐官
  • 小龙虾openclaw的竞品,PicoClaw深度解析