AI应用的可观测性工程2026:让LLM系统从黑盒变白盒
为什么AI应用难以调试?
传统软件系统出了问题,你有一套成熟的调试手段:查日志、看堆栈、断点调试、Metrics报警。这些工具运作良好,因为传统系统是确定性的——相同输入,必然相同输出,错误有明确的代码路径。LLM应用打破了这一切。同一个prompt,不同时间的输出可能不同(temperature > 0)。用户说"结果不对",但你不知道是prompt的问题、模型的问题、还是RAG检索的问题。生产环境突然准确率下降,但没有任何异常日志——因为LLM总是"成功"地返回了一些文字,只是内容不对。**AI可观测性(LLM Observability)**正是为了解决这些问题而出现的工程领域。它不是普通的日志监控,而是专门为LLM系统设计的追踪、评估和分析体系。## 可观测性的三层架构AI应用的可观测性分为三个层次,从底层到顶层:第三层:业务可观测性 - 用户满意度、任务完成率、业务指标 ↑第二层:LLM可观测性 - Trace追踪、Prompt日志、Token统计、延迟分布 ↑第一层:基础设施可观测性 - CPU/GPU、内存、API调用错误率、网络延迟大多数团队只做第一层,这是远远不够的。本文重点讨论第二层和第三层。## LLM追踪(Tracing)工程实践### 为什么Trace比Log更重要?传统日志记录单点事件。而LLM应用的一次用户请求,可能涉及:用户输入 → 意图理解 → RAG检索 → Prompt构建 → LLM调用 → 结果后处理 → 输出。这个链路中任何一步出问题都会影响最终质量,但日志无法告诉你"这次响应的第3步RAG检索只召回了2条相关文档,导致回答缺乏依据"。Trace(分布式追踪)提供了完整的请求链路视图,每一步都有:输入、输出、耗时、用量、元数据。### 用LangSmith实现自动Tracepythonimport osfrom langsmith import Client, traceablefrom langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplateimport anthropic# 配置LangSmithos.environ["LANGCHAIN_TRACING_V2"] = "true"os.environ["LANGCHAIN_API_KEY"] = "your_langsmith_api_key"os.environ["LANGCHAIN_PROJECT"] = "my_ai_app_prod"# 自动追踪LangChain调用llm = ChatOpenAI(model="gpt-4o", temperature=0.3)@traceable(name="rag_pipeline", run_type="chain")def rag_pipeline(user_question: str) -> dict: """RAG完整链路,自动被LangSmith追踪""" # 步骤1:检索 docs = retrieve_relevant_docs(user_question) # 步骤2:构建prompt context = "\n\n".join([d.page_content for d in docs]) prompt = f"基于以下上下文回答问题:\n\n{context}\n\n问题:{user_question}" # 步骤3:LLM调用(自动记录prompt和response) response = llm.invoke(prompt) return { "answer": response.content, "source_docs": len(docs), "context_length": len(context) }@traceable(name="retrieve_docs", run_type="retriever")def retrieve_relevant_docs(query: str) -> list: """文档检索,单独追踪""" # 实际检索逻辑 return vector_store.similarity_search(query, k=5)### 自建轻量级Trace系统pythonimport uuidimport timeimport jsonfrom dataclasses import dataclass, field, asdictfrom typing import Optional, List, Any, Dictfrom contextlib import contextmanagerfrom datetime import datetime@dataclassclass Span: """单个追踪跨度""" span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) name: str = "" trace_id: str = "" parent_span_id: Optional[str] = None start_time: float = field(default_factory=time.time) end_time: Optional[float] = None # LLM特定字段 inputs: Dict[str, Any] = field(default_factory=dict) outputs: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) # Token和成本追踪 input_tokens: int = 0 output_tokens: int = 0 cost_usd: float = 0.0 error: Optional[str] = None @property def duration_ms(self) -> float: if self.end_time: return (self.end_time - self.start_time) * 1000 return 0.0 def finish(self, outputs: dict = None, error: str = None): self.end_time = time.time() if outputs: self.outputs = outputs if error: self.error = errorclass LLMTracer: """轻量级LLM追踪器""" def __init__(self, backend="console"): self.backend = backend self._active_spans: Dict[str, Span] = {} self.spans: List[Span] = [] @contextmanager def trace(self, name: str, trace_id: str = None, parent_id: str = None, **metadata): """追踪上下文管理器""" span = Span( name=name, trace_id=trace_id or str(uuid.uuid4()), parent_span_id=parent_id, metadata=metadata ) self._active_spans[span.span_id] = span try: yield span span.finish() except Exception as e: span.finish(error=str(e)) raise finally: self._active_spans.pop(span.span_id, None) self.spans.append(span) self._emit(span) def _emit(self, span: Span): """发送追踪数据""" if self.backend == "console": status = "ERROR" if span.error else "OK" print(f"[TRACE] {span.name} | {status} | {span.duration_ms:.0f}ms | " f"tokens={span.input_tokens+span.output_tokens} | " f"cost=${span.cost_usd:.4f}") elif self.backend == "file": with open("traces.jsonl", "a", encoding="utf-8") as f: f.write(json.dumps(asdict(span), ensure_ascii=False) + "\n")# 全局追踪器实例tracer = LLMTracer(backend="file")class TracedOpenAIClient: """带自动追踪的OpenAI客户端包装""" COST_PER_TOKEN = { "gpt-4o": {"input": 2.5e-6, "output": 10e-6}, "gpt-4o-mini": {"input": 0.15e-6, "output": 0.6e-6}, "o1": {"input": 15e-6, "output": 60e-6}, } def __init__(self, api_key: str = None): import openai self.client = openai.OpenAI(api_key=api_key) def chat_completion( self, messages: list, model: str = "gpt-4o", trace_id: str = None, span_name: str = "llm_call", **kwargs ) -> dict: """ 带追踪的chat completion调用 """ with tracer.trace(span_name, trace_id=trace_id, model=model) as span: span.inputs = {"messages": messages, "model": model, **kwargs} response = self.client.chat.completions.create( model=model, messages=messages, **kwargs ) # 记录token使用 span.input_tokens = response.usage.prompt_tokens span.output_tokens = response.usage.completion_tokens # 计算成本 costs = self.COST_PER_TOKEN.get(model, {"input": 0, "output": 0}) span.cost_usd = ( span.input_tokens * costs["input"] + span.output_tokens * costs["output"] ) output = response.choices[0].message.content span.outputs = {"content": output[:500]} # 只记录前500字符 return { "content": output, "usage": { "input_tokens": span.input_tokens, "output_tokens": span.output_tokens, "cost_usd": span.cost_usd } }## LLM评估(Evaluation)框架追踪只是记录发生了什么,评估才是判断"好不好"。### 自动化评估指标pythonfrom enum import Enumimport reclass EvalMetric(Enum): FAITHFULNESS = "faithfulness" # 回答是否忠实于上下文(RAG防幻觉) RELEVANCE = "relevance" # 回答是否与问题相关 COMPLETENESS = "completeness" # 回答是否完整 SAFETY = "safety" # 回答是否安全(无有害内容) COHERENCE = "coherence" # 回答是否语义连贯class LLMEvaluator: """ 使用LLM-as-Judge方法进行自动化评估 让一个强模型(如GPT-4o)评估另一个模型的输出质量 """ def __init__(self, judge_model: str = "gpt-4o"): import openai self.client = openai.OpenAI() self.judge_model = judge_model def evaluate_faithfulness(self, context: str, answer: str) -> dict: """ 评估RAG回答是否忠实于检索到的上下文 核心RAG质量指标:防止幻觉 """ prompt = f"""评估任务:判断"回答"是否完全基于"上下文",没有引入上下文中不存在的信息。上下文:{context[:2000]}回答:{answer}评分标准:- 5分:回答完全基于上下文,没有任何幻觉- 4分:回答主要基于上下文,有极少量推断(合理)- 3分:回答部分基于上下文,有明显的推断或补充- 2分:回答混合了上下文内容和外部知识- 1分:回答大量包含上下文中不存在的信息(严重幻觉)只输出JSON:{{"score": 1-5, "reason": "一句话解释", "has_hallucination": true/false}}""" response = self.client.chat.completions.create( model=self.judge_model, response_format={"type": "json_object"}, messages=[{"role": "user", "content": prompt}] ) import json result = json.loads(response.choices[0].message.content) return { "metric": "faithfulness", "score": result["score"] / 5.0, # 归一化到0-1 "raw_score": result["score"], "reason": result.get("reason", ""), "has_hallucination": result.get("has_hallucination", False) } def evaluate_answer_relevance(self, question: str, answer: str) -> dict: """评估回答是否真正回答了问题""" prompt = f"""问题:{question}回答:{answer}评估回答是否直接、完整地回答了问题。只输出JSON:{{"score": 1-5, "addressed_question": true/false, "reason": "..."}}""" response = self.client.chat.completions.create( model=self.judge_model, response_format={"type": "json_object"}, messages=[{"role": "user", "content": prompt}] ) import json result = json.loads(response.choices[0].message.content) return { "metric": "relevance", "score": result["score"] / 5.0, "addressed_question": result.get("addressed_question", True) } def batch_evaluate(self, samples: List[dict]) -> dict: """ 批量评估,用于离线质量分析 samples格式:[{"question": ..., "context": ..., "answer": ..., "trace_id": ...}] """ results = [] for sample in samples: eval_result = { "trace_id": sample.get("trace_id"), "question": sample["question"][:100] } if sample.get("context"): eval_result["faithfulness"] = self.evaluate_faithfulness( sample["context"], sample["answer"] ) eval_result["relevance"] = self.evaluate_answer_relevance( sample["question"], sample["answer"] ) results.append(eval_result) # 汇总统计 faithfulness_scores = [r["faithfulness"]["score"] for r in results if "faithfulness" in r] relevance_scores = [r["relevance"]["score"] for r in results] return { "total_samples": len(results), "avg_faithfulness": sum(faithfulness_scores) / len(faithfulness_scores) if faithfulness_scores else None, "avg_relevance": sum(relevance_scores) / len(relevance_scores), "hallucination_rate": sum(1 for r in results if r.get("faithfulness", {}).get("has_hallucination")) / len(results), "details": results }## Prometheus + Grafana监控集成pythonfrom prometheus_client import Counter, Histogram, Gauge, start_http_serverimport time# 定义LLM特定的Prometheus指标LLM_REQUESTS_TOTAL = Counter( 'llm_requests_total', 'LLM API调用总次数', ['model', 'endpoint', 'status'])LLM_LATENCY_SECONDS = Histogram( 'llm_latency_seconds', 'LLM API响应时间分布', ['model'], buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0])LLM_TOKENS_TOTAL = Counter( 'llm_tokens_total', 'Token使用量', ['model', 'token_type'] # token_type: input/output)LLM_COST_USD_TOTAL = Counter( 'llm_cost_usd_total', '累计API成本(美元)', ['model'])EVAL_SCORE = Gauge( 'llm_eval_score', '最近N次评估的平均分', ['metric_name'] # faithfulness/relevance/etc.)class PrometheusLLMMonitor: """LLM监控的Prometheus集成""" def record_call(self, model: str, status: str, latency: float, input_tokens: int, output_tokens: int, cost: float): LLM_REQUESTS_TOTAL.labels(model=model, endpoint="chat", status=status).inc() LLM_LATENCY_SECONDS.labels(model=model).observe(latency) LLM_TOKENS_TOTAL.labels(model=model, token_type="input").inc(input_tokens) LLM_TOKENS_TOTAL.labels(model=model, token_type="output").inc(output_tokens) LLM_COST_USD_TOTAL.labels(model=model).inc(cost) def update_eval_scores(self, eval_results: dict): for metric, score in eval_results.items(): if isinstance(score, (int, float)): EVAL_SCORE.labels(metric_name=metric).set(score)# 在应用启动时暴露metrics端点# start_http_server(8000) # Prometheus会来抓取 http://localhost:8000/metrics## 可观测性平台选型| 工具 | 适用场景 | 优势 | 局限 ||------|----------|------|------|| LangSmith | LangChain项目 | 开箱即用、深度集成 | 仅LangChain生态 || Langfuse | 开源、自托管 | 灵活、可私有化 | 需要自己运维 || Weights & Biases | ML实验追踪 | 实验管理强 | LLM功能较新 || Helicone | 轻量代理方式 | 零侵入集成 | 功能相对简单 || 自建(本文方案) | 定制化需求 | 完全控制 | 开发成本高 |## 总结LLM可观测性的核心实践:1.追踪每一次LLM调用:记录完整的输入输出、token用量、延迟和成本2.链路级Trace,不只是点日志:一次用户请求的完整链路追踪,找到瓶颈步骤3.自动化质量评估:LLM-as-Judge评估幻觉率、相关性,替代纯人工质检4.成本和延迟双监控:生产环境的质量问题往往先反映在这两个指标上5.Prompt版本管理:追踪不同prompt版本的效果差异,支持A/B测试从黑盒到白盒,不是一步完成的。先把追踪做起来,再做评估,最后建立完整的监控体系。每一步都比什么都没有好得多。
