LLM上下文窗口工程2026:超长文档处理的实战策略完全指南
128K甚至百万token的上下文窗口已经成为主流,但"装得下"不等于"用得好"。本文深度解析上下文窗口的工程化使用策略,从分块处理到混合检索,帮你真正驾驭超长文档。
128K甚至百万token的上下文窗口已经成为主流,但"装得下"不等于"用得好"。本文深度解析上下文窗口的工程化使用策略,从分块处理到混合检索,帮你真正驾驭超长文档。
pythonfrom openai import OpenAIclient = OpenAI()def analyze_full_document(doc_content: str, question: str) -> str: """全文档直接输入——适合需要全局理解的任务""" # 安全检查:确认token数在窗口内 estimated_tokens = len(doc_content) / 3.5 # 中文约3.5字/token if estimated_tokens > 100000: raise ValueError(f"文档过长({estimated_tokens:.0f}tokens),请考虑其他策略") response = client.chat.completions.create( model="gpt-4o", # 128K上下文 messages=[ { "role": "system", "content": "你是一个专业的文档分析助手。请基于提供的完整文档回答问题。" }, { "role": "user", "content": f"""文档内容:{doc_content}---问题:{question}请基于以上文档内容,提供准确、完整的回答。""" } ], temperature=0.1 ) return response.choices[0].message.content# 适合场景:# - 合同审查(需要理解全局条款关系)# - 代码库理解(需要追踪跨文件依赖)# - 财报分析(数据在全文分散)### 策略二:滑动窗口(适合顺序处理)pythonfrom typing import Generatordef sliding_window_process( document: str, question: str, window_size: int = 4000, # 每窗口token数(约字符数/2) overlap: int = 500, # 窗口重叠(保持上下文连续) model: str = "gpt-4o-mini") -> Generator[str, None, None]: """滑动窗口处理超长文档,逐段分析""" chars_per_window = window_size * 2 # 中文估算 chars_overlap = overlap * 2 chunks = [] start = 0 while start < len(document): end = start + chars_per_window chunks.append(document[start:end]) start = end - chars_overlap print(f"文档分为 {len(chunks)} 段处理") previous_context = "" for i, chunk in enumerate(chunks): prompt = f"""这是文档的第{i+1}/{len(chunks)}段。{'上一段的关键信息:\n' + previous_context if previous_context else ''}当前段落内容:{chunk}针对此段落回答:{question}(如果此段不包含相关信息,请说明并等待后续段落)""" response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0.1 ) chunk_answer = response.choices[0].message.content previous_context = chunk_answer[-500:] # 保留摘要传递给下一段 yield f"[第{i+1}段] {chunk_answer}" # 最后汇总 yield "\n[汇总] 以上是基于全文的分析结果。"### 策略三:RAG检索增强(适合精确问答)pythonimport numpy as npfrom sklearn.metrics.pairwise import cosine_similarityfrom openai import OpenAIclient = OpenAI()class DocumentRAG: """基于RAG的超长文档问答系统""" def __init__(self, chunk_size: int = 800, chunk_overlap: int = 100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.chunks = [] self.embeddings = [] def index_document(self, document: str, doc_id: str = "default"): """将文档切块并建立索引""" # 语义感知切块(按段落/句子边界) raw_chunks = self._semantic_chunk(document) # 批量生成嵌入 texts = [c["text"] for c in raw_chunks] batch_size = 100 all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] response = client.embeddings.create( model="text-embedding-3-small", input=batch ) batch_embeddings = [e.embedding for e in response.data] all_embeddings.extend(batch_embeddings) # 存储 for chunk, embedding in zip(raw_chunks, all_embeddings): chunk["doc_id"] = doc_id self.chunks.append(chunk) self.embeddings.append(embedding) print(f"索引完成:{len(raw_chunks)} 个块") def _semantic_chunk(self, text: str) -> list[dict]: """语义感知切块""" # 按段落分割 paragraphs = text.split('\n\n') chunks = [] current_chunk = "" current_start = 0 pos = 0 for para in paragraphs: if len(current_chunk) + len(para) > self.chunk_size * 2: # 字符数估算 if current_chunk: chunks.append({ "text": current_chunk.strip(), "start": current_start, "end": pos }) # 保留重叠部分 overlap_text = current_chunk[-self.chunk_overlap*2:] current_chunk = overlap_text + "\n\n" + para current_start = pos - len(overlap_text) else: current_chunk = para else: current_chunk += ("\n\n" if current_chunk else "") + para pos += len(para) + 2 # +2 for \n\n if current_chunk: chunks.append({ "text": current_chunk.strip(), "start": current_start, "end": pos }) return chunks def query( self, question: str, top_k: int = 5, rerank: bool = True ) -> str: """检索相关片段并回答""" # 问题嵌入 q_response = client.embeddings.create( model="text-embedding-3-small", input=question ) q_embedding = q_response.data[0].embedding # 向量检索 embeddings_array = np.array(self.embeddings) q_array = np.array(q_embedding).reshape(1, -1) similarities = cosine_similarity(q_array, embeddings_array)[0] top_indices = np.argsort(similarities)[::-1][:top_k * 2] # 取2倍用于重排 candidates = [ {**self.chunks[i], "score": similarities[i]} for i in top_indices ] # 可选:用LLM重排(提高相关性) if rerank: candidates = self._rerank(question, candidates, top_k) else: candidates = candidates[:top_k] # 组装上下文 context = "\n\n---\n\n".join([ f"[片段{i+1}(相关度: {c['score']:.2f})]\n{c['text']}" for i, c in enumerate(candidates) ]) # LLM生成答案 response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", "content": "基于提供的文档片段回答问题。如果片段中没有相关信息,请明确说明,不要猜测。" }, { "role": "user", "content": f"""相关文档片段:{context}问题:{question}""" } ], temperature=0.1 ) return response.choices[0].message.content def _rerank(self, question: str, candidates: list, top_k: int) -> list: """使用LLM重排候选片段""" candidate_texts = "\n\n".join([ f"[{i}] {c['text'][:300]}..." for i, c in enumerate(candidates) ]) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{ "role": "user", "content": f"""问题:{question}候选片段:{candidate_texts}请选出最相关的{top_k}个片段的序号,用逗号分隔,只输出数字。""" }], max_tokens=50 ) try: indices = [int(x.strip()) for x in response.choices[0].message.content.split(",")] return [candidates[i] for i in indices[:top_k] if i < len(candidates)] except Exception: return candidates[:top_k]### 策略四:MapReduce(适合汇总任务)pythonimport asynciofrom typing import Listasync def map_reduce_summarize( document: str, task: str = "总结主要内容", chunk_size: int = 3000, model: str = "gpt-4o-mini") -> str: """Map-Reduce策略:先分段处理,再汇总""" # 分块 chars = chunk_size * 2 # 字符估算 chunks = [document[i:i+chars] for i in range(0, len(document), chars)] # Map阶段:并发处理每个块 async def map_chunk(chunk: str, idx: int) -> str: response = client.chat.completions.create( model=model, messages=[{ "role": "user", "content": f"对以下文本片段完成任务:{task}\n\n{chunk}" }], max_tokens=1000 ) return response.choices[0].message.content map_tasks = [map_chunk(chunk, i) for i, chunk in enumerate(chunks)] mapped_results = await asyncio.gather(*map_tasks) # Reduce阶段:汇总所有结果 combined = "\n\n---\n\n".join([ f"第{i+1}部分:{result}" for i, result in enumerate(mapped_results) ]) final_response = client.chat.completions.create( model="gpt-4o", # 最终汇总用强模型 messages=[{ "role": "user", "content": f"""以下是对文档各部分的分析结果,请综合汇总:{combined}任务:{task}请给出综合性的最终结果。""" }], max_tokens=2000 ) return final_response.choices[0].message.content### 策略五:混合策略(生产首选)pythonclass AdaptiveDocumentProcessor: """自适应文档处理器——根据文档特征选择最优策略""" def __init__(self): self.rag = DocumentRAG() def process(self, document: str, question: str, task_type: str = "qa") -> str: doc_tokens = len(document) / 3.5 # 粗估 strategy = self._select_strategy(doc_tokens, task_type) print(f"文档长度: ~{doc_tokens:.0f} tokens,选择策略: {strategy}") if strategy == "direct": return analyze_full_document(document, question) elif strategy == "rag": self.rag.index_document(document) return self.rag.query(question) elif strategy == "mapreduce": return asyncio.run(map_reduce_summarize(document, question)) elif strategy == "sliding": results = list(sliding_window_process(document, question)) return "\n".join(results) else: raise ValueError(f"Unknown strategy: {strategy}") def _select_strategy(self, doc_tokens: float, task_type: str) -> str: """策略选择矩阵""" # 小文档:直接输入 if doc_tokens < 20000: return "direct" # 问答类任务:RAG if task_type in ("qa", "search", "extract"): return "rag" # 摘要/分析类任务:MapReduce(并发快) if task_type in ("summarize", "analyze") and doc_tokens < 200000: return "mapreduce" # 超大文档的顺序任务:滑动窗口 if task_type in ("review", "translate"): return "sliding" # 默认:RAG return "rag"## 三、上下文优化技巧### 3.1 前缀缓存(Prompt Caching)python# OpenAI & Anthropic都支持前缀缓存# 将长文档放在系统提示的固定位置,多次请求只需处理一次long_doc_cache = {}def cached_doc_analysis(doc_id: str, doc_content: str, question: str) -> str: """利用前缀缓存分析文档——多次问同一文档时节省成本""" # 第一次请求时,文档内容会被缓存(自动,无需显式操作) # 后续相同前缀的请求会命中缓存,input token成本减少50-80% response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", # 这部分会被缓存(需稳定,不要动态修改) "content": f"你是文档分析专家。\n\n文档内容(ID: {doc_id}):\n{doc_content}" }, { "role": "user", # 这部分每次不同,不会被缓存 "content": question } ] ) # 检查缓存命中 cached = response.usage.prompt_tokens_details.cached_tokens total = response.usage.prompt_tokens print(f"缓存命中率: {cached/total:.1%} ({cached}/{total} tokens)") return response.choices[0].message.content### 3.2 动态上下文压缩pythondef compress_context( messages: list[dict], target_tokens: int = 60000, preserve_last_n: int = 10) -> list[dict]: """当上下文过长时,压缩中间的历史消息""" estimated_tokens = sum(len(m['content']) / 3 for m in messages) if estimated_tokens <= target_tokens: return messages # 保留系统提示和最近N条消息 system_msgs = [m for m in messages if m['role'] == 'system'] recent_msgs = messages[-preserve_last_n:] middle_msgs = messages[len(system_msgs):-preserve_last_n] if not middle_msgs: return messages # 压缩中间历史 history_text = "\n".join([ f"{m['role']}: {m['content'][:200]}..." for m in middle_msgs ]) compression_response = client.chat.completions.create( model="gpt-4o-mini", messages=[{ "role": "user", "content": f"请将以下对话历史压缩为500字以内的摘要,保留关键信息和决策点:\n\n{history_text}" }] ) compressed_summary = compression_response.choices[0].message.content return system_msgs + [ {"role": "system", "content": f"[历史对话摘要]\n{compressed_summary}"} ] + recent_msgs## 四、上下文窗口对比(2026主流模型)| 模型 | 上下文窗口 | 可靠使用区间 | 适合场景 ||------|-----------|------------|---------|| GPT-4o | 128K | ~80K | 长文档分析、代码库 || GPT-4o-mini | 128K | ~80K | 成本敏感的长文档 || Claude 3.5 Sonnet | 200K | ~150K | 超长合同、大型代码库 || Gemini 1.5 Pro | 1M | ~500K | 超长视频/音频转录 || Gemini 2.5 Pro | 1M | ~700K | 百页PDF分析 || Qwen2.5-72B | 128K | ~80K | 本地私有化部署 |## 五、总结超长上下文处理没有一刀切的答案,需要根据任务特征选择策略:-<20K tokens + 全局理解→ 直接输入->20K tokens + 精确问答→ RAG-任意长度 + 汇总/分析→ MapReduce-顺序处理(翻译/审查)→ 滑动窗口-生产环境→ 混合策略 + 前缀缓存上下文工程的核心原则:放入上下文的每一个token都应该服务于任务目标,而不是"越多越好"。