大模型学习之路004:RAG 零基础入门教程(第一篇):基础理论与文档处理流水线
一、RAG 基础理论
1.1为什么我们需要 RAG?—— 大模型的三大原生缺陷
在学习 RAG 之前,我们必须先搞清楚:RAG 是为了解决什么问题而诞生的?
所有的大语言模型(GPT、Claude、LLaMA、Qwen 等)都有三个无法通过自身优化彻底解决的原生缺陷:
缺陷 1:幻觉问题(Hallucination)
大模型本质上是 "下一个词预测器",它会根据统计规律生成看起来合理的文本,但这些内容可能完全是编造的。
- 例子:问 GPT-4o"2025 年诺贝尔物理学奖得主是谁",它可能会编造出一个不存在的名字和获奖理由
- 危害:在医疗、法律、金融等对准确性要求极高的场景,幻觉可能会造成严重后果
缺陷 2:知识截止问题
大模型的知识是固定在训练完成的那一刻的,无法获取训练之后的新知识。
- 例子:GPT-4o 的知识截止到 2024 年 7 月,它不知道 2024 年 8 月之后发生的任何事情
- 危害:无法回答实时性问题(如今天的天气、最新的政策、最近的新闻)
缺陷 3:私有数据隔离问题
大模型没有见过你的私有数据,无法回答与你的业务相关的问题。
- 例子:它不知道你们公司的员工手册、产品手册、内部规章制度
- 危害:无法直接应用于企业内部场景
1.2 什么是 RAG
检索增强生成(Retrieval-Augmented Generation, RAG)是目前解决大模型上述三大缺陷的最主流、成本最低的技术方案。
我们用一个非常形象的类比来理解 RAG:
- 普通大模型= 闭卷考试的学生:只能靠脑子里记住的知识答题,容易答错,也不知道最新的知识
- RAG 增强的大模型= 开卷考试的学生:允许先翻书(检索知识库)找到相关内容,再组织语言回答问题
RAG 的核心思想非常简单:先检索,后生成。它不修改大模型的任何参数,只是在生成答案之前,先从外部知识库中检索出与用户问题相关的信息,然后将这些信息和用户的问题一起交给大模型,让大模型基于检索到的信息生成答案。
1.3 RAG vs 微调 vs 提示工程:三者的区别与选择
这是初学者最容易混淆的三个概念,我们用一个详细的对比表来搞清楚它们的区别:
| 对比维度 | 提示工程 | RAG | 微调 |
|---|---|---|---|
| 核心原理 | 优化输入提示词,引导大模型输出 | 先检索外部知识,再生成答案 | 在已有模型基础上继续训练,注入新知识 |
| 成本 | 极低(几乎为 0) | 低(主要是向量数据库和推理成本) | 高(需要大量数据和 GPU 资源) |
| 周期 | 几小时到几天 | 几天到几周 | 几周到几个月 |
| 知识更新 | 无法更新知识 | 实时更新(只需更新知识库) | 困难(需要重新微调) |
| 可解释性 | 一般 | 好(可以看到引用来源) | 差(黑盒) |
| 幻觉抑制 | 一般 | 好 | 一般 |
| 私有数据支持 | 不支持 | 完美支持 | 支持但成本高 |
| 风格对齐 | 一般 | 一般 | 好 |
最佳实践策略(99% 的场景适用):
- 先用提示工程解决 80% 的通用问题
- 提示工程解决不了的,用RAG注入私有知识和实时知识
- RAG 也解决不了的(如需要深度风格对齐、特定领域的专业术语理解),再考虑微调
1.4 Naive RAG 的标准三阶段流程
Naive RAG(基础版 RAG)是所有高级 RAG 的基础,它的流程非常清晰,分为离线阶段和在线阶段两个部分。
离线阶段(数据准备阶段)
这是 RAG 系统的基础,也是影响 RAG 效果最重要的阶段。我们本节的学习内容就是这个阶段的前三个步骤:文档加载、清洗、分块。
- 文档收集:收集所有需要纳入知识库的文档
- 文档加载与解析:将不同格式的文档(PDF、Word 等)解析成纯文本
- 文档清洗:去除文档中的无关内容(页眉页脚、页码、空白字符等)
- 文档分块:将长文本分成适合检索和生成的小块
- 文本向量化:将文本块转化为高维向量
- 存入向量数据库:将向量和对应的文本块存入向量数据库
在线阶段(用户交互阶段)
当用户提问时,系统会执行以下步骤:
- 查询向量化:将用户的问题转化为向量
- 向量相似度检索:在向量数据库中找到与问题向量最相似的 Top-K 个文档块
- 拼接提示词:将检索到的文档块和用户的问题按照指定格式拼接成提示词
- 大模型生成答案:将提示词发送给大模型,生成答案
- 返回给用户:将答案返回给用户
1.5 RAG 系统的核心组件
一个完整的 RAG 系统由以下 6 个核心组件组成:
- 文档加载器:负责加载和解析不同格式的文档
- 文档分块器:负责将长文本分成合适大小的块
- 嵌入模型:负责将文本转化为向量
- 向量数据库:负责存储和检索向量
- 检索器:负责从向量数据库中检索相关文档
- 生成器:负责基于检索到的信息生成答案
二、文档加载与解析(将各种格式的文档变成纯文本)
文档加载与解析是 RAG 系统的第一步,也是最容易被忽视但却非常重要的一步。如果文档解析得不好,后面的所有步骤都会受到影响。
2.1 文档加载的基本概念
文档加载的任务是:将存储在磁盘上的各种格式的文档,转化为程序可以处理的纯文本格式,并保留必要的元数据。
元数据是指关于文档的描述信息,非常重要,后面我们会经常用到:
- 文件名
- 文件路径
- 文件类型
- 页码(对于 PDF 和 Word)
- 分块编号
- 创建时间
- 修改时间
2.2 不同格式文档的解析方法
我们将学习最常用的 4 种文档格式的解析方法:TXT、Markdown、PDF、Word。
2.2.1 TXT 文档解析
TXT 是最简单的文档格式,直接读取即可。
def load_txt(file_path): """加载TXT文档""" try: with open(file_path, 'r', encoding='utf-8') as f: text = f.read() # 返回文本内容和元数据 return { "text": text, "metadata": { "file_name": file_path.split("/")[-1], "file_path": file_path, "file_type": "txt" } } except Exception as e: print(f"加载TXT文件失败:{file_path},错误:{e}") return None2.2.2 Markdown 文档解析
Markdown 是结构化最好的纯文本格式,我们可以直接读取,也可以使用专门的库解析。
import markdown from bs4 import BeautifulSoup def load_markdown(file_path): """加载Markdown文档""" try: with open(file_path, 'r', encoding='utf-8') as f: md_text = f.read() # 方法1:直接返回原始Markdown文本(推荐,保留格式信息) return { "text": md_text, "metadata": { "file_name": file_path.split("/")[-1], "file_path": file_path, "file_type": "md" } } # 方法2:转换为纯文本(如果不需要Markdown格式) # html = markdown.markdown(md_text) # soup = BeautifulSoup(html, 'html.parser') # text = soup.get_text() # return {"text": text, "metadata": {...}} except Exception as e: print(f"加载Markdown文件失败:{file_path},错误:{e}") return None2.2.3 PDF 文档解析(重点)
PDF 是最常见也是最难解析的文档格式,有很多不同的库可以用,我们重点学习两个最常用的:PyPDF2 和 PyMuPDF。
PyPDF2:简单易用,适合纯文本 PDF
from PyPDF2 import PdfReader def load_pdf_with_pypdf2(file_path): """使用PyPDF2加载PDF文档""" try: reader = PdfReader(file_path) text = "" pages = [] for page_num, page in enumerate(reader.pages): page_text = page.extract_text() text += page_text pages.append({ "page_num": page_num + 1, "text": page_text }) return { "text": text, "pages": pages, "metadata": { "file_name": file_path.split("/")[-1], "file_path": file_path, "file_type": "pdf", "total_pages": len(reader.pages) } } except Exception as e: print(f"PyPDF2加载PDF失败:{file_path},错误:{e}") return NonePyMuPDF(fitz):功能强大,解析效果更好
PyMuPDF 是目前解析效果最好的开源 PDF 库,支持提取文本、图片、表格,速度也比 PyPDF2 快。推荐优先使用 PyMuPDF
import fitz # PyMuPDF def load_pdf_with_pymupdf(file_path): """使用PyMuPDF加载PDF文档(推荐)""" try: doc = fitz.open(file_path) text = "" pages = [] for page_num in range(doc.page_count): page = doc.load_page(page_num) page_text = page.get_text() text += page_text pages.append({ "page_num": page_num + 1, "text": page_text }) doc.close() return { "text": text, "pages": pages, "metadata": { "file_name": file_path.split("/")[-1], "file_path": file_path, "file_type": "pdf", "total_pages": len(pages) } } except Exception as e: print(f"PyMuPDF加载PDF失败:{file_path},错误:{e}") return NonePyPDF2 vs PyMuPDF 对比:
| 对比维度 | PyPDF2 | PyMuPDF |
|---|---|---|
| 解析效果 | 一般,可能会出现乱码和格式错乱 | 好,几乎能正确解析所有纯文本 PDF |
| 速度 | 一般 | 快 |
| 功能 | 只能提取文本 | 可以提取文本、图片、表格、注释 |
| 易用性 | 简单 | 简单 |
| 推荐度 | ⭐⭐ | ⭐⭐⭐⭐⭐ |
2.2.4 Word 文档解析(.docx 格式)
使用 python-docx 库解析.docx 格式的 Word 文档。
2.3 通用文档加载器
现在我们把上面的函数整合起来,写一个通用的文档加载器,能够自动识别文件格式并调用对应的解析函数。
import os def load_document(file_path): """通用文档加载器,自动识别文件格式""" if not os.path.exists(file_path): print(f"文件不存在:{file_path}") return None file_ext = os.path.splitext(file_path)[1].lower() if file_ext == ".txt": return load_txt(file_path) elif file_ext == ".md": return load_markdown(file_path) elif file_ext == ".pdf": return load_pdf_with_pymupdf(file_path) # 优先使用PyMuPDF elif file_ext == ".docx": return load_docx(file_path) else: print(f"不支持的文件格式:{file_ext}") return None # 测试通用文档加载器 if __name__ == "__main__": # 测试TXT文件 txt_doc = load_document("test_docs/test.txt") if txt_doc: print(f"TXT文档加载成功,文本长度:{len(txt_doc['text'])}") # 测试Markdown文件 md_doc = load_document("test_docs/test.md") if md_doc: print(f"Markdown文档加载成功,文本长度:{len(md_doc['text'])}") # 测试PDF文件 pdf_doc = load_document("test_docs/test.pdf") if pdf_doc: print(f"PDF文档加载成功,总页数:{pdf_doc['metadata']['total_pages']},文本长度:{len(pdf_doc['text'])}") # 测试Word文件 docx_doc = load_document("test_docs/test.docx") if docx_doc: print(f"Word文档加载成功,总段落数:{docx_doc['metadata']['total_paragraphs']},文本长度:{len(docx_doc['text'])}")2.4 文档清洗技术
原始文档中通常包含很多无关内容,这些内容会影响后续的分块和检索效果,所以我们需要对文档进行清洗。
2.4.1 常见的需要清洗的内容
- 空白字符:多余的空格、换行符、制表符
- 页眉页脚和页码
- 水印和广告
- 重复内容
- 乱码和特殊字符
- 无意义的短文本
2.4.2 文档清洗函数实现
import re def clean_text(text): """清洗文本内容""" if not text: return "" # 1. 替换多个换行符为单个换行符 text = re.sub(r'\n+', '\n', text) # 2. 替换多个空格为单个空格 text = re.sub(r' +', ' ', text) # 3. 替换制表符为空格 text = re.sub(r'\t', ' ', text) # 4. 去除首尾空白字符 text = text.strip() # 5. 去除特殊字符(保留中文、英文、数字、常用标点) text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9,。!?;:""''()【】《》、\n\.\,\!\?\;\:\'\"\(\)\[\]\<\>]', '', text) # 6. 去除过短的行(少于3个字符) lines = text.split('\n') lines = [line.strip() for line in lines if len(line.strip()) >= 3] text = '\n'.join(lines) return text # 测试文档清洗 if __name__ == "__main__": dirty_text = """ 这是一个 测试文本 ,包含很多 多余的空格。 还有很多多余的换行符。 第1页 页眉内容 这是正文内容。 页脚内容 页码:1 """ clean_text_result = clean_text(dirty_text) print("清洗前:") print(dirty_text) print("\n清洗后:") print(clean_text_result)三、文档分块技术(RAG 效果的基石)
文档分块是 RAG 系统中最关键、对效果影响最大的环节之一。很多人做的 RAG 效果不好,90% 的原因都是分块做得不好。
3.1 为什么我们需要分块?
我们不能把一整篇文档直接存入向量数据库并进行检索,主要有三个原因:
原因 1:大模型的上下文窗口限制
大模型的上下文窗口是有限的,即使是 GPT-4o 也只有 128K token(约 9 万字)。如果文档太长,我们无法将整篇文档都放入提示词中。
原因 2:检索精度问题
如果文档块太大,会包含很多无关信息,导致检索准确率下降;如果文档块太小,会丢失上下文信息,导致模型无法理解内容。
原因 3:生成质量问题
如果检索到的文档块包含太多无关信息,大模型会被干扰,生成的答案质量会下降,甚至会出现幻觉。
3.2 分块的核心原则
一个好的分块应该满足以下三个原则:
- 语义完整性:一个分块应该包含一个完整的语义单元(如一个段落、一个小节),不要把一个完整的意思拆分成两个块
- 上下文相关性:一个分块内的内容应该是相关的,不要把不相关的内容放在同一个块里
- 长度适中:分块大小应该适中,一般在 200-1000 token 之间,具体取决于场景和嵌入模型
3.3 常见的分块策略
我们将学习四种最常用的分块策略,从简单到复杂。
3.3.1 固定大小分块(最简单)
固定大小分块是最简单的分块策略,按照固定的字符数或 token 数将文本分成块。
按字符分块
def split_by_character(text, chunk_size=500, chunk_overlap=50): """按字符数分块""" chunks = [] start = 0 text_length = len(text) while start < text_length: end = start + chunk_size if end > text_length: end = text_length chunk = text[start:end] chunks.append(chunk) # 移动start指针,减去重叠部分 start = end - chunk_overlap return chunks按 token 分块(推荐)
按字符分块的问题是:不同的字符占用的 token 数不同(一个中文字符约等于 2 个 token,一个英文字符约等于 0.3 个 token)。按 token 分块更准确,因为嵌入模型和大模型都是按 token 处理的。
我们使用 OpenAI 的 tiktoken 库来计算 token 数:
import tiktoken def count_tokens(text, model="gpt-3.5-turbo"): """计算文本的token数""" encoding = tiktoken.encoding_for_model(model) return len(encoding.encode(text)) def split_by_token(text, chunk_size=500, chunk_overlap=50, model="gpt-3.5-turbo"): """按token数分块(推荐)""" encoding = tiktoken.encoding_for_model(model) tokens = encoding.encode(text) total_tokens = len(tokens) chunks = [] start = 0 while start < total_tokens: end = start + chunk_size if end > total_tokens: end = total_tokens chunk_tokens = tokens[start:end] chunk_text = encoding.decode(chunk_tokens) chunks.append(chunk_text) # 移动start指针,减去重叠部分 start = end - chunk_overlap return chunks # 测试按token分块 if __name__ == "__main__": text = "这是一个测试文本,用来测试按token分块的效果。" * 100 chunks = split_by_token(text, chunk_size=100, chunk_overlap=20) print(f"分块数量:{len(chunks)}") for i, chunk in enumerate(chunks): print(f"第{i+1}块,token数:{count_tokens(chunk)}") print(chunk[:50] + "...\n")固定大小分块的优缺点:
- 优点:简单、快速、容易实现
- 缺点:容易破坏语义完整性,把一个完整的意思拆分成两个块
3.3.2 按段落分块
按段落分块是根据换行符将文本分成段落,然后将短段落合并,将长段落拆分。
def split_by_paragraph(text, max_chunk_size=500, chunk_overlap=50): """按段落分块""" # 先按换行符分成段落 paragraphs = text.split('\n') paragraphs = [p.strip() for p in paragraphs if p.strip()] chunks = [] current_chunk = "" current_length = 0 for para in paragraphs: para_length = count_tokens(para) # 如果当前段落太长,单独分块 if para_length > max_chunk_size: # 如果当前块不为空,先加入chunks if current_chunk: chunks.append(current_chunk) current_chunk = "" current_length = 0 # 将长段落按token分块 para_chunks = split_by_token(para, max_chunk_size, chunk_overlap) chunks.extend(para_chunks) continue # 如果当前块加上这个段落超过最大长度,先保存当前块 if current_length + para_length > max_chunk_size: chunks.append(current_chunk) current_chunk = para current_length = para_length else: # 否则,将这个段落加入当前块 if current_chunk: current_chunk += "\n" + para else: current_chunk = para current_length += para_length # 加入最后一个块 if current_chunk: chunks.append(current_chunk) return chunks按段落分块的优缺点:
- 优点:能较好地保留语义完整性
- 缺点:实现稍微复杂一些,对于没有明显段落分隔的文本效果不好
3.3.3 按标题分块(适合结构化文档)
对于有明确标题层级的文档(如 Markdown、Word),按标题分块是最好的分块策略,因为标题天然就是语义单元的分隔符。
def split_by_markdown_heading(text, max_chunk_size=500): """按Markdown标题分块""" # 匹配Markdown标题(# 到 ######) heading_pattern = re.compile(r'^#{1,6} .+$', re.MULTILINE) # 找到所有标题的位置 headings = [] for match in heading_pattern.finditer(text): headings.append({ "start": match.start(), "end": match.end(), "heading": match.group() }) # 如果没有标题,退化为按段落分块 if not headings: return split_by_paragraph(text, max_chunk_size) chunks = [] # 处理第一个标题之前的内容 if headings[0]["start"] > 0: pre_heading_text = text[:headings[0]["start"]].strip() if pre_heading_text: chunks.append(pre_heading_text) # 处理每个标题和对应的内容 for i in range(len(headings)): start = headings[i]["end"] if i < len(headings) - 1: end = headings[i+1]["start"] else: end = len(text) heading_text = headings[i]["heading"] content_text = text[start:end].strip() # 将标题和内容合并 chunk = heading_text + "\n" + content_text # 如果块太大,按token分块 if count_tokens(chunk) > max_chunk_size: sub_chunks = split_by_token(chunk, max_chunk_size, 50) chunks.extend(sub_chunks) else: chunks.append(chunk) return chunks按标题分块的优缺点:
- 优点:完美保留语义完整性,分块效果最好
- 缺点:只适合有明确标题层级的结构化文档
3.3.4 递归分块(最常用,LangChain 默认)
递归分块是目前最常用、效果最好的通用分块策略,它的核心思想是:
- 先按最大的分隔符(如两个换行符)分块
- 如果分块还是太大,再按下一个分隔符(如一个换行符)分块
- 重复这个过程,直到所有分块都小于指定的大小
LangChain 的RecursiveCharacterTextSplitter就是实现了这个策略,我们可以直接使用:
from langchain.text_splitter import RecursiveCharacterTextSplitter def split_by_recursive(text, chunk_size=500, chunk_overlap=50): """递归分块(最常用,推荐)""" text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=count_tokens, # 使用token数计算长度 separators=["\n\n", "\n", "。", "!", "?", ",", " ", ""], # 中文分隔符优先级 ) chunks = text_splitter.split_text(text) return chunks # 测试递归分块 if __name__ == "__main__": text = """ # 第一章 什么是RAG RAG是检索增强生成的缩写,是一种大模型应用技术。 ## 1.1 RAG的核心原理 RAG的核心思想是先检索,后生成。它先从外部知识库中检索相关信息,然后将这些信息和用户的问题一起交给大模型,让大模型基于检索到的信息生成答案。 ## 1.2 RAG的优势 RAG可以解决大模型的幻觉问题、知识截止问题和私有数据隔离问题。 """ chunks = split_by_recursive(text, chunk_size=100, chunk_overlap=20) print(f"分块数量:{len(chunks)}") for i, chunk in enumerate(chunks): print(f"第{i+1}块:") print(chunk) print("---")归分块的优缺点:
- 优点:通用、效果好,能较好地保留语义完整性
- 缺点:实现复杂(但我们可以直接用 LangChain 的实现)
3.4 分块参数调优指南
分块大小和重叠大小是两个最重要的参数,没有万能的最优值,需要根据你的具体场景调整。
分块大小(chunk_size)
- 小分块(200-500 token):检索精度高,但上下文信息少,适合问答场景
- 中分块(500-1000 token):平衡检索精度和上下文信息,大多数场景的最优值
- 大分块(1000-2000 token):上下文信息多,但检索精度低,适合摘要和长文档理解场景
中文场景推荐:500-800 token
重叠大小(chunk_overlap)
重叠大小是指相邻两个分块之间重叠的部分,用来解决上下文断裂的问题。
- 一般设置为分块大小的10%-20%
- 例如:分块大小 500 token,重叠大小 50-100 token
3.5 分块效果评估方法
怎么判断你的分块效果好不好?可以用以下方法评估:
- 人工检查:随机抽取 10-20 个分块,检查是否语义完整,有没有上下文断裂
- 检索测试:准备一些问题,看检索到的分块是否包含答案
- 端到端测试:测试整个 RAG 系统的回答准确率
四、实战任务:实现通用文档处理流水线
import os import re import json import tiktoken import fitz from docx import Document from langchain.text_splitter import RecursiveCharacterTextSplitter # ==================== 工具函数 ==================== def count_tokens(text, model="gpt-3.5-turbo"): """计算文本的token数""" encoding = tiktoken.encoding_for_model(model) return len(encoding.encode(text)) # ==================== 文档加载函数 ==================== def load_txt(file_path): """加载TXT文档""" try: with open(file_path, 'r', encoding='utf-8') as f: text = f.read() return { "text": text, "metadata": { "file_name": os.path.basename(file_path), "file_path": file_path, "file_type": "txt" } } except Exception as e: print(f"加载TXT文件失败:{file_path},错误:{e}") return None def load_markdown(file_path): """加载Markdown文档""" try: with open(file_path, 'r', encoding='utf-8') as f: text = f.read() return { "text": text, "metadata": { "file_name": os.path.basename(file_path), "file_path": file_path, "file_type": "md" } } except Exception as e: print(f"加载Markdown文件失败:{file_path},错误:{e}") return None def load_pdf(file_path): """使用PyMuPDF加载PDF文档""" try: doc = fitz.open(file_path) text = "" pages = [] for page_num in range(doc.page_count): page = doc.load_page(page_num) page_text = page.get_text() text += page_text pages.append({ "page_num": page_num + 1, "text": page_text }) doc.close() return { "text": text, "pages": pages, "metadata": { "file_name": os.path.basename(file_path), "file_path": file_path, "file_type": "pdf", "total_pages": len(pages) } } except Exception as e: print(f"加载PDF文件失败:{file_path},错误:{e}") return None def load_docx(file_path): """加载Word文档(.docx格式)""" try: doc = Document(file_path) text = "" paragraphs = [] for para_num, para in enumerate(doc.paragraphs): if para.text.strip() != "": text += para.text + "\n" paragraphs.append({ "para_num": para_num + 1, "text": para.text }) return { "text": text, "paragraphs": paragraphs, "metadata": { "file_name": os.path.basename(file_path), "file_path": file_path, "file_type": "docx", "total_paragraphs": len(paragraphs) } } except Exception as e: print(f"加载Word文件失败:{file_path},错误:{e}") return None def load_document(file_path): """通用文档加载器""" if not os.path.exists(file_path): print(f"文件不存在:{file_path}") return None file_ext = os.path.splitext(file_path)[1].lower() if file_ext == ".txt": return load_txt(file_path) elif file_ext == ".md": return load_markdown(file_path) elif file_ext == ".pdf": return load_pdf(file_path) elif file_ext == ".docx": return load_docx(file_path) else: print(f"不支持的文件格式:{file_ext}") return None # ==================== 文档清洗函数 ==================== def clean_text(text): """清洗文本内容""" if not text: return "" # 替换多个换行符为单个换行符 text = re.sub(r'\n+', '\n', text) # 替换多个空格为单个空格 text = re.sub(r' +', ' ', text) # 替换制表符为空格 text = re.sub(r'\t', ' ', text) # 去除首尾空白字符 text = text.strip() # 去除特殊字符 text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9,。!?;:""''()【】《》、\n\.\,\!\?\;\:\'\"\(\)\[\]\<\>]', '', text) # 去除过短的行 lines = text.split('\n') lines = [line.strip() for line in lines if len(line.strip()) >= 3] text = '\n'.join(lines) return text # ==================== 文档分块函数 ==================== def split_document(doc, chunk_size=500, chunk_overlap=50): """将文档分块""" if not doc: return [] text = doc["text"] cleaned_text = clean_text(text) # 使用递归分块 text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=count_tokens, separators=["\n\n", "\n", "。", "!", "?", ",", " ", ""], ) chunk_texts = text_splitter.split_text(cleaned_text) # 为每个分块添加元数据 chunks = [] for i, chunk_text in enumerate(chunk_texts): chunk = { "id": f"{doc['metadata']['file_name']}_chunk_{i}", "text": chunk_text, "token_count": count_tokens(chunk_text), "metadata": { **doc["metadata"], # 继承文档的元数据 "chunk_id": i, "chunk_size": chunk_size, "chunk_overlap": chunk_overlap } } chunks.append(chunk) return chunks # ==================== 批量处理函数 ==================== def process_folder(folder_path, output_path="processed_chunks.jsonl", chunk_size=500, chunk_overlap=50): """批量处理文件夹中的所有文档""" all_chunks = [] # 遍历文件夹中的所有文件 for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) print(f"正在处理:{file_path}") # 加载文档 doc = load_document(file_path) if not doc: continue # 分块 chunks = split_document(doc, chunk_size, chunk_overlap) all_chunks.extend(chunks) print(f"处理完成,生成{len(chunks)}个分块\n") # 将分块结果保存到JSONL文件 with open(output_path, 'w', encoding='utf-8') as f: for chunk in all_chunks: f.write(json.dumps(chunk, ensure_ascii=False) + '\n') print(f"所有文档处理完成,共生成{len(all_chunks)}个分块,结果已保存到{output_path}") return all_chunks # ==================== 主函数 ==================== if __name__ == "__main__": # 处理test_docs文件夹中的所有文档 process_folder( folder_path="test_docs", output_path="processed_chunks.jsonl", chunk_size=500, chunk_overlap=50 )