当前位置: 首页 > news >正文

【RAG】【ingestion03】摄取管道与文档管理示例

1. 案例目标

本案例演示了如何在LlamaIndex的摄取管道中集成文档存储(docstore)以实现文档管理功能。主要目标包括:

  • 展示如何使用文档存储来跟踪和管理文档
  • 演示如何检测和处理重复文档
  • 展示如何识别文档变更并仅处理已更改的文档
  • 演示如何保存和加载摄取管道状态

通过这个案例,用户可以了解如何构建一个智能的文档处理系统,该系统能够自动识别文档变化,避免重复处理,提高处理效率。

2. 技术栈与核心依赖

LlamaIndex

Redis

MongoDB

HuggingFace

Python

Jupyter Notebook

核心依赖包:

llama-index-storage-docstore-redis llama-index-storage-docstore-mongodb llama-index-embeddings-huggingface

这些依赖提供了文档存储、嵌入模型和文档处理功能的支持。

3. 环境配置

步骤1: 安装必要的依赖

%pip install llama-index-storage-docstore-redis %pip install llama-index-storage-docstore-mongodb %pip install llama-index-embeddings-huggingface

步骤2: 创建测试数据

# Make some test data !mkdir -p data !echo "This is a test file: one!" > data/test1.txt !echo "This is a test file: two!" > data/test2.txt

步骤3: 加载文档

from llama_index.core import SimpleDirectoryReader # load documents with deterministic IDs documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()

注意:使用filename_as_id=True确保每个文档有确定的ID,这是文档管理的关键。

4. 案例实现

4.1 创建带文档存储的摄取管道

创建一个包含文档存储的摄取管道,用于跟踪和管理文档:

from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.ingestion import IngestionPipeline from llama_index.core.storage.docstore import SimpleDocumentStore from llama_index.storage.docstore.redis import RedisDocumentStore from llama_index.storage.docstore.mongodb import MongoDocumentStore from llama_index.core.node_parser import SentenceSplitter pipeline = IngestionPipeline( transformations=[ SentenceSplitter(), HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), ], docstore=SimpleDocumentStore(), )

4.2 运行管道处理初始文档

nodes = pipeline.run(documents=documents) print(f"Ingested {len(nodes)} Nodes")

4.3 保存和加载管道状态

保存管道状态以便后续使用:

pipeline.persist("./pipeline_storage")

加载已保存的管道状态:

pipeline = IngestionPipeline( transformations=[ SentenceSplitter(), HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), ] ) # restore the pipeline pipeline.load("./pipeline_storage")

4.4 测试文档管理功能

创建新文档并修改现有文档:

!echo "This is a test file: three!" > data/test3.txt !echo "This is a NEW test file: one!" > data/test1.txt

重新加载所有文档并运行管道:

documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data() nodes = pipeline.run(documents=documents) print(f"Ingested {len(nodes)} Nodes")

4.5 验证处理结果

检查哪些节点被处理:

for node in nodes: print(f"Node: {node.text}")

验证文档存储中的文档数量:

print(len(pipeline.docstore.docs))

5. 案例效果

运行此案例后,您将看到以下效果:

  • 初始运行时,所有文档都被处理并存储在文档存储中
  • 第二次运行时,只有新增和修改的文档被处理,未修改的文档被跳过
  • 文档存储中始终跟踪所有文档的唯一版本
  • 管道状态可以保存和恢复,便于持续使用
文档加载

读取所有文档

文档比较

检查文档哈希值

处理决策

跳过/处理文档

文档处理

分割和嵌入

状态更新

更新文档存储

新增文档

test3.txt

将被处理

修改文档

test1.txt

将被重新处理

未修改文档

test2.txt

将被跳过

文档管理工作流程

文档 → 计算哈希值 → 检查文档存储 → 比较哈希值 → 处理/跳过 → 更新存储

6. 案例实现思路

本案例的核心实现思路是通过文档存储来跟踪文档状态,避免重复处理未修改的文档:

6.1 文档标识与哈希

每个文档通过doc_id进行标识,并计算文档内容的哈希值。当文档内容发生变化时,其哈希值也会改变,这使得系统能够检测到文档变更。

6.2 文档状态管理

文档存储维护一个doc_id到document_hash的映射,用于跟踪每个文档的最新状态。当处理新文档时,系统会检查文档存储中是否已存在相同doc_id的记录。

6.3 处理策略

根据文档状态,系统采用不同的处理策略:

  • 新文档:直接处理并添加到文档存储
  • 修改文档:重新处理并更新文档存储中的哈希值
  • 未修改文档:跳过处理,直接使用已有结果

6.4 管道状态持久化

通过persist()和load()方法,可以将管道状态(包括文档存储和缓存)保存到磁盘并重新加载,这使得系统可以在重启后保持状态连续性。

7. 扩展建议

7.1 集成向量存储

将文档存储与向量存储结合,实现完整的文档管理:

  • 添加向量存储到摄取管道
  • 实现文档的upsert操作(更新或插入)
  • 支持基于向量的文档检索

7.2 使用分布式文档存储

使用分布式文档存储提高可扩展性:

  • Redis集群作为文档存储
  • MongoDB分片集群
  • 其他NoSQL数据库

7.3 添加文档版本控制

实现更精细的文档版本控制:

  • 保留文档历史版本
  • 支持版本回滚
  • 文档变更历史追踪

7.4 实现增量处理

优化增量处理机制:

  • 批量处理变更文档
  • 并行处理独立文档
  • 处理优先级管理

7.5 添加监控和报告

增强系统的可观测性:

  • 文档处理统计
  • 处理性能监控
  • 错误处理和报告
  • 处理进度可视化

8. 总结

本案例展示了如何在LlamaIndex的摄取管道中集成文档存储功能,实现智能的文档管理。通过文档哈希比较,系统能够自动识别文档变更,只处理新增或修改的文档,避免重复处理未修改的文档。

这个案例的核心价值在于:

  • 提供了一种高效的文档变更检测机制
  • 避免了不必要的重复处理,提高了系统效率
  • 支持管道状态的持久化和恢复
  • 为构建大规模文档处理系统提供了基础

这种文档管理机制特别适用于需要定期处理大量文档的场景,如文档索引更新、知识库维护等,可以显著提高处理效率,减少资源消耗。

http://www.jsqmd.com/news/740836/

相关文章:

  • 告别手忙脚乱:用这些Verdi快捷键和窗口操作技巧,让你的仿真效率翻倍
  • 紧急!医疗设备量产前最后72小时:C语言采集线程死锁自愈方案(含FreeRTOS优先级翻转熔断机制源码)
  • 如何快速突破百度网盘限速:Python直链解析工具完整指南
  • 算法训练营第19天|1047. 删除字符串中的所有相邻重复项
  • 【Python分布式机器学习训练配置黄金标准】:20年ML基础设施专家亲授——避坑指南+5大核心参数调优清单
  • 分布式大模型推理实战:TP/PP/EP并行策略深度解析与架构选型指南
  • 3种强大方案:将旧电视盒子变身高性能Linux服务器的终极指南
  • 全域数学·数术本源·高维代数卷(72分册)【乖乖数学】
  • 告别手动刷图!E7Helper如何让你在《第七史诗》中解放双手
  • [具身智能-539]:云端就是一个大市场,什么都可以拿来卖,基础设施、平台、软件、远程API RPC, 工具,模型,智能体,游戏,装备、算力、能力、数据,“智慧”都被打包成了标准化的商品进行买卖
  • 2026 降 AI 软件排行:99.26% 达标率的嘎嘎降AI 凭什么稳坐第一?
  • 体验Taotoken平台在高峰时段的API请求成功率与路由效果
  • Windows 11终极怀旧游戏复活指南:用IPXWrapper轻松启用IPX/SPX协议
  • HAGeo系统:启发式辅助构造提升几何定理自动证明效率
  • 类与面向对象
  • 4.28~4.30【Q】
  • 智能自动化抖音评论采集:革命性的双引擎数据提取方案
  • 阅读 Hyperf 的 Server 类,看它如何监听 Swoole 的 onRequest 事件。
  • 从‘人工智障’到‘智能助手’:手把手教你用Python实现一个会‘提问’的主动学习分类器
  • TTS多模态验证系统:语音安全与图像生成技术解析
  • Windows下C语言程序报错3221226356?别慌,手把手教你定位并修复这个内存访问错误
  • 扩散模型与S3-DiT架构:多模态生成式AI技术解析
  • 【RISC-V调试性能瓶颈诊断术】:从CSR读写延迟到调试模块DSCR状态机异常的逐层穿透解析
  • GRADE基准:跨学科图像编辑效果统一评估体系
  • 成本十分之一,性能追平激光雷达?我们拆了一颗国产4D毫米波雷达(含MMIC芯片实拍)
  • AI广告优化:是效率利器,还是隐藏陷阱?深度剖析其可靠性
  • AI/ML安全代码质量评估体系与防护实践
  • 开源机械臂OpenClaw-EcoBot:低成本高自由度机器人开发实践
  • 全域数学视角下N维广义数系的推广与本源恒等式构建【乖乖数学】
  • 2 分钟出稿到 30 分钟出稿,2026 降 AI 软件排行 7 款速度梯队大公开。