让代码知识库“活”起来:给 Ollama + RAG 代码仓库加上增量更新与自动同步
本文承接上一篇《问代码仓库任何问题:用 Ollama Embedding + RAG 搭一个本地代码知识库》,默认你已经完成基础 RAG 环境与初始索引构建。代码基于本地 Ollama + Qwen3.5 实测整理。
一、为什么你的代码知识库需要“增量更新”?
上一篇我们实现了一个完整的本地代码知识库:
- 使用Ollama 的 embedding 模型向量化代码片段;
- 使用ChromaDB存储向量;
- 使用Qwen3.5做最终问答;
- 可以在命令行中,对整个仓库问任何问题。
但有一个现实问题:
代码每天都在变,知识库如果不跟着更新,很快就“过期”了。
典型痛点:
- 仓库一大(几万到几十万行),重建一次索引要好几分钟;
- 每次改两行代码,都去
python build_index.py全量重建,非常浪费时间; - 新人入职 / 项目交接时,如果知识库没更新,会给出过时答案。
这一篇的目标就是:
在不推翻上一篇架构的前提下,给代码知识库加上**“增量索引 + 自动同步”**能力:
- 只处理变更文件;
- 提交代码时自动更新索引;
- 整个过程尽可能“无感”。
我们来看下前端效果:
二、整体设计思路:用文件哈希做“变更感知”
为了实现“只索引变更文件”,我们需要解决三个问题:
- 怎么判断一个文件“变了”?
- 怎么只删除/更新这些文件对应的向量?
- 怎么在开发流程中自动触发更新?
这里我们用一套相对简单、但非常实用的方案:
- 对每个已索引文件,记录一份内容哈希(SHA-256);
- 每次准备更新索引时:
- 遍历一遍仓库里的目标文件(如
*.py, *.md, *.ts); - 计算当前哈希,与上次记录的哈希做对比;
- 哈希变了 → 说明内容变了 → 需要重建该文件的所有 chunk;
- 文件在记录里有,但磁盘上已经不存在 → 属于被删除文件,需要从向量库里移除。
- 遍历一遍仓库里的目标文件(如
- 把这份“文件哈希表”和“最后索引时间”写入一个 JSON 状态文件(比如
index_state.json),下次增量更新时继续用。
好处:
- 实现简单,不依赖 git,可以用于任何代码目录(甚至不在 git 仓库里的文档目录);
- 只要文件内容变了一点点,就会被识别出来;
- 删除文件也会被正确处理,不会在知识库里留下“僵尸文档”。
三、在原有 CodeRAG 上加一层增量索引能力
下面这段代码可以看作是原先CodeRAG的“进阶版”:
在保持原有ingest()/ask()接口风格的前提下,补充了:
- 状态管理:
index_state.json incremental_ingest():只索引变更文件full_ingest():全量重建时顺便刷新状态
你可以新建一个文件code_rag_incremental.py(或直接把原来的code_rag.py替换掉),内容如下:
import os import glob import json import hashlib import datetime from typing import List, Tuple, Dict import chromadb import ollama from tqdm import tqdm class IncrementalCodeRAG: """ 支持增量更新的代码知识库 RAG: - full_ingest(): 全量构建索引(初次或重建用) - incremental_ingest(): 增量更新(只处理变更文件) - ask(): 检索 + 调 Qwen3.5 回答 """ def __init__(self, repo_path: str, db_path: str = "./code_rag_db", state_file: str = "./index_state.json"): self.repo_path = os.path.abspath(repo_path) self.db_path = db_path self.state_file = state_file self.embed_model = "nomic-embed-text" self.chat_model = "qwen3.5:7b-instruct-q4_0" # 初始化向量库 self.client = chromadb.PersistentClient(path=db_path) self.collection = self.client.get_or_create_collection( name="code_collection", metadata={"hnsw:space": "cosine"} ) # 加载/初始化状态 self._load_state() # ========= 状态管理相关 ========= def _load_state(self): if os.path.exists(self.state_file): with open(self.state_file, "r", encoding="utf-8") as f: self.state = json.load(f) else: self.state = { "last_build_time": datetime.datetime.now().isoformat(), "file_hashes": {} # {rel_path: sha256} } self._save_state() def _save_state(self): with open(self.state_file, "w", encoding="utf-8") as f: json.dump(self.state, f, ensure_ascii=False, indent=2) def _file_sha256(self, path: str) -> str: try: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() except Exception: return "" # ========= 基础 IO 与切分 ========= def _read_file(self, file_path: str) -> str: try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"✗ 读取失败: {file_path} - {e}") return "" def _split_chunks(self, text: str, max_len: int = 800) -> List[str]: lines = text.split("\n") chunks, cur, cur_len = [], [], 0 for line in lines: l = len(line) if cur_len + l > max_len and cur: chunks.append("\n".join(cur)) cur, cur_len = [], 0 cur.append(line) cur_len += l if cur: chunks.append("\n".join(cur)) return chunks def _iter_files(self, patterns: List[str] = None) -> List[str]: if patterns is None: patterns = ["*.py", "*.md", "*.js", "*.ts", "*.java", "*.go"] all_files = [] for pat in patterns: all_files.extend( glob.glob(os.path.join(self.repo_path, "**", pat), recursive=True) ) return all_files # ========= 全量索引 ========= def full_ingest(self, patterns: List[str] = None, batch_size: int = 64): print(f"🚀 开始全量索引:{self.repo_path}") # 清空旧数据 existing = self.collection.get() if existing.get("ids"): self.collection.delete(ids=existing["ids"]) files = self._iter_files(patterns) print(f"✓ 发现 {len(files)} 个文件,开始切分 + 向量化") all_ids, all_docs, all_metas = [], [], [] file_hashes: Dict[str, str] = {} for file_path in files: rel = os.path.relpath(file_path, self.repo_path) content = self._read_file(file_path) if not content.strip(): continue chunks = self._split_chunks(content) file_hashes[rel] = self._file_sha256(file_path) for idx, chunk in enumerate(chunks): doc_id = f"{rel}::{idx}" all_ids.append(doc_id) all_docs.append(chunk) all_metas.append({"file_path": rel, "chunk_index": idx}) for i in tqdm(range(0, len(all_docs), batch_size), desc="向量化"): batch_docs = all_docs[i:i + batch_size] resp = ollama.embeddings(model=self.embed_model, prompt=batch_docs) embeddings = resp["embeddings"] if "embeddings" in resp else resp self.collection.add( ids=all_ids[i:i + batch_size], documents=batch_docs, metadatas=all_metas[i:i + batch_size], embeddings=embeddings, ) # 更新状态 self.state["last_build_time"] = datetime.datetime.now().isoformat() self.state["file_hashes"] = file_hashes self._save_state() print("✅ 全量索引完成") # ========= 增量索引 ========= def _scan_changes(self, patterns: List[str] = None) -> Tuple[List[str], List[str]]: """ 返回 (需要更新的绝对路径列表, 被删除的相对路径列表) """ files = self._iter_files(patterns) old_hashes: Dict[str, str] = self.state.get("file_hashes", {}) current_hashes: Dict[str, str] = {} changed_abs: List[str] = [] deleted_rel: List[str] = [] # 现有文件:计算哈希并比对 for file_path in files: rel = os.path.relpath(file_path, self.repo_path) h = self._file_sha256(file_path) current_hashes[rel] = h if (rel not in old_hashes) or (old_hashes[rel] != h): changed_abs.append(file_path) # 找出“状态里有但磁盘上没有”的(已删除文件) for rel in list(old_hashes.keys()): abs_path = os.path.join(self.repo_path, rel) if not os.path.exists(abs_path): deleted_rel.append(rel) return changed_abs, deleted_rel, current_hashes def incremental_ingest(self, patterns: List[str] = None, batch_size: int = 64): print("🚀 开始增量索引...") changed_abs, deleted_rel, current_hashes = self._scan_changes(patterns) print(f"✓ 变更文件:{len(changed_abs)} 个,已删除文件:{len(deleted_rel)} 个") # 1. 删除已删除文件对应的向量 if deleted_rel: to_delete_ids = [] for rel in deleted_rel: # 粗暴一点:直接根据前缀匹配 id # 实际上可以用 where={"file_path": rel} 更优雅 res = self.collection.get() for _id, meta in zip(res["ids"], res["metadatas"]): if meta.get("file_path") == rel: to_delete_ids.append(_id) if to_delete_ids: self.collection.delete(ids=to_delete_ids) print(f"✓ 已删除 {len(to_delete_ids)} 个向量块") # 2. 重新索引变更文件 all_ids, all_docs, all_metas = [], [], [] for file_path in changed_abs: rel = os.path.relpath(file_path, self.repo_path) content = self._read_file(file_path) if not content.strip(): continue chunks = self._split_chunks(content) # 先删除该文件旧的所有块 existing = self.collection.get() del_ids = [ _id for _id, meta in zip(existing["ids"], existing["metadatas"]) if meta.get("file_path") == rel ] if del_ids: self.collection.delete(ids=del_ids) for idx, chunk in enumerate(chunks): doc_id = f"{rel}::{idx}" all_ids.append(doc_id) all_docs.append(chunk) all_metas.append({"file_path": rel, "chunk_index": idx}) if all_docs: for i in tqdm(range(0, len(all_docs), batch_size), desc="向量化变更文件"): batch_docs = all_docs[i:i + batch_size] resp = ollama.embeddings(model=self.embed_model, prompt=batch_docs) embeddings = resp["embeddings"] if "embeddings" in resp else resp self.collection.add( ids=all_ids[i:i + batch_size], documents=batch_docs, metadatas=all_metas[i:i + batch_size], embeddings=embeddings, ) print(f"✓ 已更新 {len(all_docs)} 个向量块") # 3. 刷新状态 self.state["last_build_time"] = datetime.datetime.now().isoformat() # 更新/合并哈希表 for rel, h in current_hashes.items(): self.state["file_hashes"][rel] = h for rel in deleted_rel: self.state["file_hashes"].pop(rel, None) self._save_state() print("✅ 增量索引完成") # ========= 检索 + 问答 ========= def query(self, question: str, top_k: int = 5): q_emb = ollama.embeddings(model=self.embed_model, prompt=question) q_vec = q_emb["embedding"] if "embedding" in q_emb else q_emb["embeddings"][0] res = self.collection.query( query_embeddings=[q_vec], n_results=top_k, ) docs = res["documents"][0] metas = res["metadatas"][0] scores = res["distances"][0] return [ {"text": d, "meta": m, "score": s} for d, m, s in zip(docs, metas, scores) ] def ask(self, question: str, top_k: int = 5, max_context_chars: int = 3000) -> str: candidates = self.query(question, top_k=top_k) context_parts = [] for item in candidates: fp = item["meta"]["file_path"] idx = item["meta"]["chunk_index"] sc = item["score"] context_parts.append( f"--- 文件: {fp} 片段#{idx} (相似度: {sc:.3f}) ---\n{item['text']}" ) context = "\n\n".join(context_parts) if len(context) > max_context_chars: context = context[:max_context_chars] + "\n...\n[上下文已截断]" prompt = f""" 你是一个资深软件工程师,现在有一个代码仓库的部分片段如下: {context} 用户问题是: {question} 请你基于这些代码片段,给出尽量准确的回答: 1. 用中文说明相关实现大致逻辑; 2. 如果涉及具体函数/类,请指明所在文件及大致位置; 3. 如有不确定的地方,要说明“不确定”,不要瞎编; 4. 可以适当给出重构或优化建议。 """ resp = ollama.chat( model=self.chat_model, messages=[{"role": "user", "content": prompt}], temperature=0.2, ) return resp["message"]["content"].strip()四、实际使用流程:从“第一次全量”到“之后每次增量”
1. 第一次:全量索引
# build_full_index.py from code_rag_incremental import IncrementalCodeRAG if __name__ == "__main__": rag = IncrementalCodeRAG(repo_path="./your_repo") rag.full_ingest()运行:
python build_full_index.py
完成后会多出:
code_rag_db/:Chroma 向量库数据index_state.json:索引状态 + 每个文件的哈希
2. 之后:只做增量索引
# build_incremental.py from code_rag_incremental import IncrementalCodeRAG if __name__ == "__main__": rag = IncrementalCodeRAG(repo_path="./your_repo") rag.incremental_ingest()每次修改代码后执行:
python build_incremental.py
就会:
- 自动识别变更文件与删除文件;
- 更新对应的向量块;
- 刷新状态。
3. 问答保持不变
# ask_code_incremental.py from code_rag_incremental import IncrementalCodeRAG if __name__ == "__main__": rag = IncrementalCodeRAG(repo_path="./your_repo") q = "用户注册逻辑在哪些文件?主要步骤是什么?" print("问题:", q) print(rag.ask(q))你可以直接把上一篇的ask_code.py替换为这个版本,整体用法不变。
五、把增量索引挂到 Git 提交流程里(pre-commit Hook)
如果你想彻底“无感”,可以在 git 提交前自动跑一遍增量索引。
1)新建.git/hooks/pre-commit(或编辑已有文件):
#!/bin/bash REPO_PATH=$(git rev-parse --show-toplevel) cd "$REPO_PATH" || exit 0 # 只在存在增量脚本时执行 if [ -f "code_rag_incremental.py" ]; then echo "运行 CodeRAG 增量索引..." python - <<EOF from code_rag_incremental import IncrementalCodeRAG rag = IncrementalCodeRAG(repo_path="$REPO_PATH") rag.incremental_ingest() EOF echo "CodeRAG 索引已更新" fi exit 02)赋予可执行权限:
chmod +x .git/hooks/pre-commit
之后你每次执行:
git commit -m "feat: xxx"
在真正提交前都会自动触发一次增量索引。你的“本地代码知识库”就始终和主干代码保持同步。
六、一些实战小建议
何时需要全量重建?
- 改了 chunk 切分策略
- 调整了向量模型(比如从
nomic-embed-text换成别的) - 清理了大量历史文件,想“从头整理”一遍
→ 这时候用full_ingest()跑一次即可。
索引频率建议
- 小团队开发:
- 每次提交前自动增量,足够;
- 不必每次保存代码就重建。
- 大团队 / 大仓库:
- 可以做一个“夜间全量 + 日间多次增量”的组合:
- 白天:pre-commit 跑增量
- 凌晨:CI 定时全量重建一次,防止状态漂移
- 可以做一个“夜间全量 + 日间多次增量”的组合:
- 小团队开发:
文件类型选择
- 必选:
.py / .ts / .js / .java / .go / .md - 可选:
.yaml / .yml / .toml / .ini(配置相关) - 建议排除:
node_modules / dist / build / .venv / .git等目录
- 必选:
