工业级大模型学习之路021:LangChain零基础入门教程(第四篇):文档加载与文本分块技术
一、文档处理是 RAG 系统的基石
1.1 为什么文档处理决定了 RAG 系统的上限?
RAG 系统的核心逻辑是 **"检索相关文档片段 → 喂给大模型生成回答"**,整个流程的质量完全依赖于文档处理环节:
- 如果文档解析失败,再好的检索和生成模型也无法获取有效信息
- 如果分块不合理,要么检索到的片段上下文不完整,要么包含太多无关信息
- 工业级统计显示:文档处理环节的优化能带来 30%-50% 的 RAG 回答准确率提升,是投入产出比最高的优化点
1.2 文档处理的完整流水线
一个工业级文档处理流水线包含以下 5 个步骤:
原始文档 → 文档加载 → 文本清洗 → 文本分块 → 元数据增强 → 分块存储- 文档加载:将不同格式的文件(PDF、Word、Markdown 等)转换为纯文本
- 文本清洗:去除乱码、多余空格、页眉页脚、重复内容等噪声
- 文本分块:将长文本切割成适合大模型上下文窗口和检索的小块
- 元数据增强:为每个分块添加来源、页码、章节、作者等元信息
- 分块存储:将分块和元数据保存到文件或向量数据库
1.3 文档加载的核心挑战
不同格式的文档有不同的解析难点,没有万能的解析工具:
| 文档格式 | 主要挑战 | 常见问题 |
|---|---|---|
| 扫描件识别、表格提取、页眉页脚去除、跨页断句 | 乱码、表格变成纯文本乱序、图片无法提取 | |
| Word | 格式解析、嵌入式对象、修订痕迹 | 格式丢失、批注被当成正文 |
| Markdown | 标题层级解析、代码块处理、链接提取 | 标题层级混乱、代码块被拆分 |
| Excel/CSV | 表格结构解析、多 sheet 处理 | 表格行被拆分、数值格式丢失 |
| PPT | 幻灯片顺序、备注提取、图片文字 | 备注丢失、文本框顺序混乱 |
1.4 文本分块的核心原则
分块不是简单的按长度切割,必须遵循以下三大原则:
原则 1:语义完整性
一个分块应该包含一个完整的语义单元(如一个段落、一个知识点),避免将一句话或一个概念拆分到两个分块中。
错误分块:
分块1:RAG技术的核心思想是将外部知识库检索与大模型生成相结合,它可以有效解决大模型的 分块2:知识过时和幻觉问题。正确分块:
分块1:RAG技术的核心思想是将外部知识库检索与大模型生成相结合,它可以有效解决大模型的知识过时和幻觉问题。原则 2:上下文相关性
分块大小要与检索粒度和大模型上下文窗口匹配:
- 太小:丢失上下文信息,检索精度下降
- 太大:包含太多无关信息,稀释核心内容,增加大模型处理成本
工业级推荐分块大小:
| 场景 | 推荐分块大小 | 重叠大小 |
|---|---|---|
| 通用问答 | 512-1024 字符 | 50-100 字符 |
| 技术文档 | 1024-2048 字符 | 100-200 字符 |
| 法律合同 | 2048-4096 字符 | 200-400 字符 |
| 小说 / 长文本 | 4096-8192 字符 | 400-800 字符 |
原则 3:可追溯性
每个分块必须保留完整的元数据,能够追溯到原始文档的具体位置(如页码、章节、行号),这是实现引用标注和答案溯源的基础。
1.5 主流分块策略对比
| 分块策略 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 固定长度分块 | 按字符数或 Token 数切割 | 实现简单、速度快 | 容易破坏语义完整性 | 快速原型、简单文本 |
| 递归字符分块 | 按分隔符优先级递归切割(段落→句子→单词) | 尽量保留语义完整性 | 仍可能破坏长句子 | 通用场景(最常用) |
| 语义分块 | 基于嵌入向量的相似度切割 | 完美保留语义完整性 | 速度慢、依赖嵌入模型 | 高质量 RAG 系统 |
| 句子窗口分块 | 以句子为单位分块,检索时扩展上下文 | 检索精度高、上下文完整 | 分块数量多、检索慢 | 精准问答场景 |
| 父子分块 | 小分块用于检索,大分块用于生成 | 兼顾检索精度和上下文完整性 | 实现复杂 | 工业级 RAG 系统 |
二、LangChain 文档处理核心 API 详解
2.1 文档对象Document
LangChain 中所有文档都表示为Document对象,包含两个核心字段:
page_content:文档的文本内容metadata:字典类型,存储文档的元数据(来源、页码、作者等)
from langchain_core.documents import Document # 创建一个文档对象 doc = Document( page_content="这是文档的内容", metadata={ "source": "test.pdf", "page": 1, "author": "张三" } ) print(doc.page_content) print(doc.metadata)2.2 文档加载器DocumentLoader
LangChain 提供了 100 + 种文档加载器,支持几乎所有常见的文档格式。所有加载器都实现了统一的接口:
load():同步加载所有文档,返回list[Document]lazy_load():惰性加载,逐个返回文档,适合大文件
2.2.1 纯文本加载器TextLoader
from langchain_community.document_loaders import TextLoader # 加载纯文本文件 loader = TextLoader( file_path="data/test.txt", encoding="utf-8", autodetect_encoding=True # 自动检测编码 ) documents = loader.load() print(f"加载了{len(documents)}个文档") print(documents[0].page_content[:100])2.2.2 Markdown 加载器MarkdownLoader
支持解析 Markdown 的标题层级、代码块、列表等结构:
from langchain_community.document_loaders import MarkdownLoader # 加载Markdown文件 loader = MarkdownLoader( file_path="data/test.md", mode="elements" # 按元素解析,返回标题、段落、代码块等不同类型的文档 ) documents = loader.load() for doc in documents: print(f"类型:{doc.metadata['type']},内容:{doc.page_content[:50]}")2.2.3 PDF 加载器PyPDFLoader
最常用的 PDF 加载器,基于 PyPDF2 实现,支持页码提取:
from langchain_community.document_loaders import PyPDFLoader # 加载PDF文件 loader = PyPDFLoader(file_path="data/test.pdf") # 每页返回一个Document对象 documents = loader.load() for doc in documents: print(f"页码:{doc.metadata['page']},内容:{doc.page_content[:100]}")2.2.5 通用加载器UnstructuredLoader
支持所有常见格式的万能加载器,基于 Unstructured.IO 库,是工业级首选:
from langchain_community.document_loaders import UnstructuredLoader # 自动识别文件格式 loader = UnstructuredLoader( file_path="data/test.pdf", strategy="fast", # 解析策略:fast(快速)/ hi_res(高精度,支持OCR) include_page_breaks=True, # 包含分页符 extract_images_in_pdf=False # 是否提取PDF中的图片 ) documents = loader.load() print(f"加载了{len(documents)}个文档")2.3 文本分块器TextSplitter
LangChain 提供了多种文本分块器,所有分块器都实现了统一的接口:
split_documents(documents: list[Document]):将文档列表切割成分块列表split_text(text: str):将纯文本切割成字符串列表
2.3.1 递归字符分块器RecursiveCharacterTextSplitter
工业级最常用的分块器,按以下分隔符优先级递归切割:
["\n\n", "\n", " ", ""]优先按段落切割,段落太长按行切割,行太长按空格切割,最后按字符切割。
from langchain_text_splitters import RecursiveCharacterTextSplitter # 创建分块器 text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # 分块大小(字符数) chunk_overlap=100, # 分块重叠大小 separators=["\n\n", "\n", "。", "!", "?", " ", ""], # 中文优化的分隔符 length_function=len, # 长度计算函数 is_separator_regex=False # 是否使用正则表达式作为分隔符 ) # 切割文档 documents = loader.load() chunks = text_splitter.split_documents(documents) print(f"原始文档数:{len(documents)}") print(f"分块数:{len(chunks)}") print(f"第一个分块:{chunks[0].page_content}") print(f"第一个分块元数据:{chunks[0].metadata}")2.3.2 语义分块器SemanticChunker
基于嵌入向量的相似度将语义相关的内容分到同一个块中:
from langchain_text_splitters import SemanticChunker from langchain_openai import OpenAIEmbeddings # 创建语义分块器 semantic_splitter = SemanticChunker( embeddings=OpenAIEmbeddings(), # 嵌入模型 breakpoint_threshold_type="percentile", # 断点阈值类型 breakpoint_threshold_amount=95, # 阈值百分比 chunk_size=1000 # 目标分块大小 ) # 切割文档 chunks = semantic_splitter.split_documents(documents)2.3.3 句子分块器SentenceSplitter
按句子切割文本,适合句子窗口检索:
from langchain_text_splitters import SentenceSplitter sentence_splitter = SentenceSplitter( chunk_size=100, chunk_overlap=20, language="zh" # 中文支持 ) chunks = sentence_splitter.split_documents(documents)三、工业级文档处理最佳实践
3.1 文档预处理:清洗与标准化
原始文档中存在大量噪声,必须进行预处理才能用于 RAG 系统:
import re from langchain_core.documents import Document def clean_text(text: str) -> str: """文本清洗函数""" # 去除多余的空行和空格 text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r' +', ' ', text) text = re.sub(r'\t+', ' ', text) # 去除页眉页脚(示例:匹配"第X页 共Y页"格式) text = re.sub(r'第\d+页\s*共\d+页', '', text) # 去除特殊字符 text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text) # 去除首尾空白 text = text.strip() return text def clean_documents(documents: list[Document]) -> list[Document]: """清洗文档列表""" cleaned_docs = [] for doc in documents: cleaned_content = clean_text(doc.page_content) # 跳过空文档 if len(cleaned_content) > 10: cleaned_doc = Document( page_content=cleaned_content, metadata=doc.metadata ) cleaned_docs.append(cleaned_doc) return cleaned_docs3.2 元数据增强
为分块添加丰富的元数据,提升检索精度和可追溯性:
def enhance_metadata(documents: list[Document], source: str) -> list[Document]: """增强文档元数据""" enhanced_docs = [] for i, doc in enumerate(documents): metadata = { **doc.metadata, "source": source, "chunk_id": f"{source}_{i}", "chunk_index": i, "total_chunks": len(documents), "upload_time": time.time() } enhanced_doc = Document( page_content=doc.page_content, metadata=metadata ) enhanced_docs.append(enhanced_doc) return enhanced_docs3.3 批量处理与异常容错
工业级系统需要支持批量处理大量文档,并能处理单个文档解析失败的情况:
from pathlib import Path from utils.logger import logger from utils.exceptions import DocumentParseError def batch_load_documents(dir_path: str | Path) -> list[Document]: """批量加载目录下的所有文档""" dir_path = Path(dir_path) all_documents = [] # 支持的文件格式 supported_formats = [".txt", ".md", ".pdf", ".docx", ".doc"] for file_path in dir_path.iterdir(): if file_path.is_file() and file_path.suffix.lower() in supported_formats: try: logger.info(f"正在加载文档:{file_path.name}") documents = load_single_document(file_path) all_documents.extend(documents) logger.info(f"✅ 文档加载成功:{file_path.name},共{len(documents)}页") except Exception as e: logger.error(f"❌ 文档加载失败:{file_path.name},错误:{str(e)}") # 跳过失败的文档,继续处理其他文档 continue logger.info(f"批量加载完成,共加载{len(all_documents)}个文档") return all_documents3.4 分块参数调优指南
分块参数没有万能值,需要根据你的文档类型和业务场景进行调优:
- 先从默认值开始:
chunk_size=1000,chunk_overlap=100 - 测试不同的分块大小:512、1024、2048,对比检索准确率
- 调整重叠大小:通常为分块大小的 10%-20%
- 优化分隔符:针对中文添加
。、!、?等句子结束符 - 使用语义分块:如果对质量要求高,且能接受较慢的速度
四、项目整合:实现工业级文档处理模块
现在我们将今天所学的内容整合到前两天搭建的 LangChain 2026 框架中,实现一个完整的文档处理流水线。
4.1 第一步:新增依赖
在requirements.txt中添加文档处理相关依赖:
# 文档处理依赖
langchain-community
pypdf
python-docx
unstructured
markdown
python-magic-bin; sys_platform == "win32"
4.2 第二步:新增自定义异常
在utils/exceptions.py中添加文档处理相关异常:
# 在现有异常类后面添加 class DocumentParseError(FrameworkBaseException): """文档解析失败异常""" def __init__(self, file_path: str, details: str = ""): message = f"文档解析失败:{file_path}" if details: message += f",详细信息:{details}" super().__init__(message, error_code=1003) class ChunkError(FrameworkBaseException): """文本分块失败异常""" def __init__(self, details: str = ""): message = "文本分块失败" if details: message += f",详细信息:{details}" super().__init__(message, error_code=1004)4.3 第三步:实现文档处理核心模块
在core/目录下创建document_processor.py:
import time import re from pathlib import Path from typing import List from langchain_core.documents import Document from langchain_community.document_loaders import ( TextLoader, PyPDFLoader, Docx2txtLoader, UnstructuredFileLoader, UnstructuredExcelLoader ) # ✅ 只导入稳定版存在的分块器 from langchain_text_splitters import ( RecursiveCharacterTextSplitter, CharacterTextSplitter ) from config.settings import settings from utils.logger import logger from utils.exceptions import DocumentParseError, ChunkError class DocumentProcessor: """ 工业级文档处理器(最终稳定版) 所有实验性功能均做自动降级处理,确保无依赖也能正常运行 """ def __init__( self, chunk_size: int = 1000, chunk_overlap: int = 100, chunking_strategy: str = "recursive", use_semantic_chunking: bool = False ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.chunking_strategy = chunking_strategy self.use_semantic_chunking = use_semantic_chunking # 初始化分块器(自动处理实验性功能依赖) self._init_text_splitter() logger.info( f"✅ 文档处理器初始化完成 | " f"分块大小:{chunk_size} | " f"重叠大小:{chunk_overlap} | " f"分块策略:{self.chunking_strategy}" ) def _init_text_splitter(self): """初始化文本分块器(自动降级版)""" if self.use_semantic_chunking: # ✅ 优先使用本地 BGE 模型 try: from langchain_experimental.text_splitter import SemanticChunker from langchain_huggingface import HuggingFaceEmbeddings logger.info("正在初始化语义分块器(使用本地BGE模型)...") logger.info(f"正在加载本地模型:{settings.embedding_model_path}") # 使用本地模型路径 embeddings = HuggingFaceEmbeddings( model_name=settings.embedding_model_path, model_kwargs={"device": "cpu"}, encode_kwargs={"normalize_embeddings": True} ) self.text_splitter = SemanticChunker( embeddings=embeddings, breakpoint_threshold_type="percentile", breakpoint_threshold_amount=95, min_chunk_size=100 ) self.chunking_strategy = "semantic" logger.info("✅ 语义分块器初始化成功(使用本地BGE模型)") except ImportError as e: logger.warning(f"⚠️ 缺少依赖:{str(e)},自动降级为递归字符分块器") logger.warning("提示:如需使用语义分块,请运行:pip install langchain-experimental langchain-huggingface") self.use_semantic_chunking = False self.chunking_strategy = "recursive" self._init_recursive_splitter() except Exception as e: logger.warning(f"⚠️ 语义分块器初始化失败:{str(e)},自动降级为递归字符分块器") self.use_semantic_chunking = False self.chunking_strategy = "recursive" self._init_recursive_splitter() else: self._init_recursive_splitter() def _init_recursive_splitter(self): """初始化工业级递归字符分块器(中文深度优化)""" self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, separators=[ "\n\n## ", "\n\n### ", "\n\n#### ", "\n\n##### ", # Markdown标题优先级最高 "\n\n", "\n", # 段落和行 "。", "!", "?", ";", ":", # 中文句子结束符 ",", "、", " ", # 中文标点和空格 "", # 最后按单个字符分割 ], length_function=len, is_separator_regex=False, keep_separator=True, # 保留分隔符,保证语义完整 strip_whitespace=True # 自动去除首尾空白 ) self.chunking_strategy = "recursive" def _clean_text(self, text: str) -> str: """工业级文本清洗函数""" if not text: return "" # 1. 统一换行符 text = text.replace("\r\n", "\n").replace("\r", "\n") # 2. 去除多余空白 text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) # 3. 去除常见噪声 text = re.sub(r'第\d+页\s*共\d+页', '', text) text = re.sub(r'版权所有.*?保留所有权利', '', text, flags=re.DOTALL) text = re.sub(r'https?://\S+', '', text) text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) text = re.sub(r'1[3-9]\d{9}', '', text) # 4. 去除控制字符 text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text) return text.strip() def _load_single_document(self, file_path: Path) -> List[Document]: """加载单个文档(国内网络优化版) 移除所有需要自动下载模型的加载器,全部使用原生无依赖实现 """ try: suffix = file_path.suffix.lower() logger.debug(f"加载文件:{file_path.name}") # ✅ 所有加载器均使用无外部依赖的原生实现 if suffix == ".txt" or suffix == ".md": # Markdown直接用TextLoader加载,效果完全满足RAG需求 loader = TextLoader( file_path, encoding="utf-8", autodetect_encoding=True ) elif suffix == ".pdf": loader = PyPDFLoader(file_path) elif suffix in [".docx", ".doc"]: loader = Docx2txtLoader(file_path) elif suffix in [".xlsx", ".xls"]: # Excel使用原生加载器(需要openpyxl依赖) try: from langchain_community.document_loaders import OpenpyxlLoader loader = OpenpyxlLoader(file_path, read_only=True, data_only=True) except ImportError: logger.warning("⚠️ 未安装openpyxl,跳过Excel文件") return [] else: # 未知格式尝试用TextLoader加载 logger.warning(f"⚠️ 不支持的格式{suffix},尝试用文本方式加载") loader = TextLoader(file_path, encoding="utf-8", autodetect_encoding=True) documents = loader.load() # 清洗和过滤 cleaned_docs = [] for doc in documents: cleaned_content = self._clean_text(doc.page_content) if len(cleaned_content) >= 20: # 为Markdown添加特殊元数据 if suffix == ".md": doc.metadata["file_type"] = "markdown" cleaned_doc = Document( page_content=cleaned_content, metadata=doc.metadata ) cleaned_docs.append(cleaned_doc) return cleaned_docs except Exception as e: raise DocumentParseError(str(file_path), str(e)) from e def _enhance_metadata(self, chunks: List[Document], file_name: str) -> List[Document]: """增强分块元数据""" enhanced_chunks = [] for i, chunk in enumerate(chunks): metadata = { **chunk.metadata, "source": file_name, "chunk_id": f"{file_name.replace('.', '_')}_{i}", "chunk_index": i, "total_chunks": len(chunks), "chunk_length": len(chunk.page_content), "process_time": int(time.time()), "chunking_strategy": self.chunking_strategy } enhanced_chunk = Document( page_content=chunk.page_content, metadata=metadata ) enhanced_chunks.append(enhanced_chunk) return enhanced_chunks def process_file( self, file_path: str | Path, enhance_metadata: bool = True ) -> List[Document]: """处理单个文件""" file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"文件不存在:{file_path}") logger.info(f"开始处理:{file_path.name}") # ✅ 检查 text_splitter 是否已初始化 if not hasattr(self, 'text_splitter') or self.text_splitter is None: logger.error("❌ text_splitter 未初始化,使用默认递归分块器") self.chunking_strategy = "recursive" self._init_recursive_splitter() try: documents = self._load_single_document(file_path) if not documents: logger.warning(f"文档{file_path.name}无有效内容") return [] chunks = self.text_splitter.split_documents(documents) logger.info(f"分块完成:{len(chunks)}个分块") if enhance_metadata: chunks = self._enhance_metadata(chunks, file_path.name) logger.info(f"✅ 处理完成:{file_path.name}") return chunks except Exception as e: logger.error(f"❌ 处理失败:{file_path.name},错误:{str(e)}", exc_info=True) raise ChunkError(f"处理文档{file_path.name}失败") from e def process_directory( self, dir_path: str | Path, recursive: bool = False, enhance_metadata: bool = True ) -> List[Document]: """批量处理目录""" dir_path = Path(dir_path) if not dir_path.exists(): raise FileNotFoundError(f"目录不存在:{dir_path}") logger.info(f"开始批量处理目录:{dir_path}") all_chunks = [] supported_formats = [".txt", ".md", ".pdf", ".docx", ".doc", ".xlsx", ".xls"] glob_pattern = "**/*" if recursive else "*" for file_path in dir_path.glob(glob_pattern): if file_path.is_file() and file_path.suffix.lower() in supported_formats: try: chunks = self.process_file(file_path, enhance_metadata) all_chunks.extend(chunks) except Exception: continue logger.info(f"✅ 批量处理完成,共生成{len(all_chunks)}个分块") return all_chunks def save_chunks_to_file(self, chunks: List[Document], output_path: str | Path): """保存分块到JSONL文件""" import json output_path = Path(output_path) output_path.parent.mkdir(exist_ok=True, parents=True) with open(output_path, "w", encoding="utf-8") as f: for chunk in chunks: chunk_data = { "page_content": chunk.page_content, "metadata": chunk.metadata } f.write(json.dumps(chunk_data, ensure_ascii=False) + "\n") logger.info(f"分块已保存到:{output_path.resolve()}") def load_chunks_from_file(self, input_path: str | Path) -> List[Document]: """从JSONL文件加载分块""" import json input_path = Path(input_path) if not input_path.exists(): raise FileNotFoundError(f"分块文件不存在:{input_path}") chunks = [] with open(input_path, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): try: chunk_data = json.loads(line.strip()) chunk = Document( page_content=chunk_data["page_content"], metadata=chunk_data["metadata"] ) chunks.append(chunk) except json.JSONDecodeError as e: logger.warning(f"第{line_num}行解析失败,跳过:{str(e)}") logger.info(f"从{input_path}加载了{len(chunks)}个分块") return chunks