AI应用的可观测性工程:用Tracing和Logging看清LLM黑盒
“我的RAG系统回答了一个错误答案,但我不知道为什么。” “Agent跑了2分钟什么都没完成,我不知道它在做什么。” “用了新版本Prompt,感觉质量变了,但我说不清楚哪里变了。”
这些是AI工程师最常见的困境,根本原因是缺乏可观测性(Observability)。本文系统介绍如何为LLM应用构建完整的可观测性体系,让AI系统的行为从黑盒变白盒。## 可观测性的三大支柱借鉴传统软件可观测性的三大支柱,LLM应用的可观测性同样需要:-Metrics(指标):定量衡量系统健康的数值,如响应时延、Token消耗、成功率-Logs(日志):记录系统发生的事件,包括每次LLM调用的输入输出-Traces(追踪):记录一次请求的完整执行链路,特别是在Agent场景中追踪多步推理在LLM应用中,还需要额外关注:-Prompt版本追踪:哪个版本的Prompt被用于哪次请求-Token使用分析:详细的Token消耗分布,找出成本热点-质量评估:LLM生成质量的自动化指标## LangSmith:LangChain生态的可观测性标配如果你的应用基于LangChain/LangGraph,LangSmith是最省力的选择:pythonimport osfrom langchain_openai import ChatOpenAIfrom langchain.callbacks.tracers import LangChainTracer# 配置LangSmithos.environ["LANGCHAIN_TRACING_V2"] = "true"os.environ["LANGCHAIN_API_KEY"] = "your_langsmith_api_key"os.environ["LANGCHAIN_PROJECT"] = "my-rag-project"# 之后所有LangChain调用自动追踪llm = ChatOpenAI(model="gpt-4o")response = llm.invoke("你好,世界")# 这次调用的输入、输出、Token消耗、延迟都会自动记录到LangSmithLangSmith的关键功能:- 自动记录每次LLM调用(输入、输出、Token、延迟)- 完整的Agent执行追踪(每个工具调用都有记录)- Prompt版本管理(Hub)- 数据集管理和自动化评估## 自建可观测性:OpenTelemetry方案不想依赖第三方服务?用OpenTelemetry构建自主可控的可观测性:pythonfrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterfrom opentelemetry.sdk.resources import Resourceimport timeimport jsonfrom functools import wraps# 初始化Tracer(连接到Jaeger或Grafana Tempo等)resource = Resource(attributes={"service.name": "llm-application"})provider = TracerProvider(resource=resource)exporter = OTLPSpanExporter(endpoint="http://localhost:4317")provider.add_span_processor(BatchSpanProcessor(exporter))trace.set_tracer_provider(provider)tracer = trace.get_tracer("llm-app-tracer")def trace_llm_call(func): """装饰器:自动追踪LLM调用""" @wraps(func) def wrapper(*args, **kwargs): with tracer.start_as_current_span(f"llm.{func.__name__}") as span: start_time = time.time() # 记录输入 if kwargs.get("messages"): span.set_attribute("llm.input.messages", json.dumps(kwargs["messages"][:1], ensure_ascii=False)) if kwargs.get("model"): span.set_attribute("llm.model", kwargs["model"]) try: result = func(*args, **kwargs) # 记录输出 duration = (time.time() - start_time) * 1000 span.set_attribute("llm.latency_ms", duration) if hasattr(result, "usage"): span.set_attribute("llm.tokens.prompt", result.usage.prompt_tokens) span.set_attribute("llm.tokens.completion", result.usage.completion_tokens) span.set_attribute("llm.tokens.total", result.usage.total_tokens) span.set_status(trace.StatusCode.OK) return result except Exception as e: span.set_status(trace.StatusCode.ERROR, str(e)) span.record_exception(e) raise return wrapper## 结构化日志:LLM调用的标准格式pythonimport loggingimport jsonfrom datetime import datetimefrom openai import OpenAI# 配置结构化日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger("llm_app")class StructuredLLMLogger: """结构化LLM调用日志记录器""" def __init__(self, client: OpenAI, app_name: str = "llm-app"): self.client = client self.app_name = app_name def chat(self, messages: list[dict], model: str = "gpt-4o", trace_id: str = None, **kwargs) -> dict: """带完整日志记录的LLM调用""" call_id = trace_id or datetime.now().strftime("%Y%m%d_%H%M%S_%f") start_time = time.time() # 记录请求 logger.info(json.dumps({ "event": "llm_request", "call_id": call_id, "app": self.app_name, "model": model, "message_count": len(messages), "system_prompt_hash": hash(messages[0]["content"]) if messages[0]["role"] == "system" else None, "last_user_message": messages[-1]["content"][:200] if messages else "", "timestamp": datetime.now().isoformat(), }, ensure_ascii=False)) try: response = self.client.chat.completions.create( model=model, messages=messages, **kwargs ) duration_ms = (time.time() - start_time) * 1000 # 记录响应 logger.info(json.dumps({ "event": "llm_response", "call_id": call_id, "app": self.app_name, "model": model, "latency_ms": round(duration_ms, 2), "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens, "finish_reason": response.choices[0].finish_reason, "response_preview": response.choices[0].message.content[:200], "estimated_cost_usd": self._estimate_cost(model, response.usage), "timestamp": datetime.now().isoformat(), }, ensure_ascii=False)) return response except Exception as e: duration_ms = (time.time() - start_time) * 1000 logger.error(json.dumps({ "event": "llm_error", "call_id": call_id, "model": model, "latency_ms": round(duration_ms, 2), "error_type": type(e).__name__, "error_message": str(e), "timestamp": datetime.now().isoformat(), }, ensure_ascii=False)) raise def _estimate_cost(self, model: str, usage) -> float: """估算API调用成本""" pricing = { "gpt-4o": {"input": 0.000005, "output": 0.000015}, "gpt-4o-mini": {"input": 0.00000015, "output": 0.0000006}, } model_price = pricing.get(model, {"input": 0.000005, "output": 0.000015}) return (usage.prompt_tokens * model_price["input"] + usage.completion_tokens * model_price["output"])## Agent执行追踪Agent场景的追踪更复杂,需要记录整个推理链路:pythonfrom dataclasses import dataclass, fieldfrom typing import Anyimport uuid@dataclassclass AgentTraceSpan: span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) parent_id: str = None name: str = "" start_time: float = field(default_factory=time.time) end_time: float = None inputs: dict = field(default_factory=dict) outputs: dict = field(default_factory=dict) metadata: dict = field(default_factory=dict) error: str = None children: list = field(default_factory=list) def end(self, outputs: dict = None, error: str = None): self.end_time = time.time() if outputs: self.outputs = outputs if error: self.error = error @property def duration_ms(self) -> float: if self.end_time: return (self.end_time - self.start_time) * 1000 return (time.time() - self.start_time) * 1000class AgentTracer: """Agent执行追踪器""" def __init__(self): self.traces = [] self.current_span_stack = [] def start_span(self, name: str, inputs: dict = None, metadata: dict = None) -> AgentTraceSpan: parent_id = self.current_span_stack[-1].span_id if self.current_span_stack else None span = AgentTraceSpan( name=name, parent_id=parent_id, inputs=inputs or {}, metadata=metadata or {} ) if self.current_span_stack: self.current_span_stack[-1].children.append(span) else: self.traces.append(span) self.current_span_stack.append(span) return span def end_span(self, outputs: dict = None, error: str = None): if self.current_span_stack: span = self.current_span_stack.pop() span.end(outputs=outputs, error=error) return span def print_trace(self, span: AgentTraceSpan = None, indent: int = 0): """打印追踪树""" if span is None: for trace in self.traces: self.print_trace(trace) return status = "✓" if not span.error else "✗" print(f"{' ' * indent}{status} [{span.duration_ms:.0f}ms] {span.name}") if span.error: print(f"{' ' * (indent+1)}ERROR: {span.error}") for child in span.children: self.print_trace(child, indent + 1)# 使用示例tracer = AgentTracer()async def traced_agent_run(task: str): root_span = tracer.start_span("agent_run", inputs={"task": task}) try: # 规划阶段 plan_span = tracer.start_span("planning", inputs={"task": task}) plan = await generate_plan(task) tracer.end_span(outputs={"plan": plan}) # 执行阶段 for i, step in enumerate(plan): step_span = tracer.start_span(f"execute_step_{i}", inputs={"step": step}) # 工具调用 tool_span = tracer.start_span( f"tool_{step['tool']}", inputs={"args": step.get("args", {})} ) result = await call_tool(step["tool"], step.get("args", {})) tracer.end_span(outputs={"result": str(result)[:500]}) tracer.end_span(outputs={"status": "completed"}) tracer.end_span(outputs={"status": "success"}) except Exception as e: tracer.end_span(error=str(e)) raise # 打印追踪树 tracer.print_trace()## Prometheus指标监控将LLM调用指标暴露给Prometheus,与现有监控基础设施集成:pythonfrom prometheus_client import Counter, Histogram, Gauge, start_http_server# 定义指标llm_requests_total = Counter( 'llm_requests_total', 'Total LLM API calls', ['model', 'app', 'status'])llm_latency_histogram = Histogram( 'llm_latency_seconds', 'LLM API call latency', ['model', 'app'], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0])llm_tokens_counter = Counter( 'llm_tokens_total', 'Total tokens consumed', ['model', 'app', 'token_type'])llm_cost_counter = Counter( 'llm_cost_usd_total', 'Estimated USD cost of LLM calls', ['model', 'app'])# 指标收集中间件def record_llm_metrics(model: str, app: str, duration: float, usage, cost: float, status: str): llm_requests_total.labels(model=model, app=app, status=status).inc() llm_latency_histogram.labels(model=model, app=app).observe(duration) llm_tokens_counter.labels(model=model, app=app, token_type="prompt").inc(usage.prompt_tokens) llm_tokens_counter.labels(model=model, app=app, token_type="completion").inc(usage.completion_tokens) llm_cost_counter.labels(model=model, app=app).inc(cost)# 启动Prometheus HTTP服务器(暴露metrics端点)start_http_server(8080) # curl http://localhost:8080/metrics## 可视化看板设计用Grafana构建LLM监控看板,关键面板:1.成本看板:按模型/应用的每日/月度费用趋势2.性能看板:P50/P95/P99延迟,不同模型对比3.质量看板:自动化质量评分趋势,问题率4.Token分布看板:Prompt vs Completion比例,长尾请求分析## 小结构建LLM可观测性系统的最简路径:1.第一步:添加结构化日志,记录每次LLM调用的关键信息2.第二步:接入LangSmith(如果用LangChain)或OpenTelemetry3.第三步:暴露Prometheus指标,建立成本和性能告警4.第四步:建立自动化质量评估,定期跑评测集可观测性不是锦上添花,而是生产级AI应用的地基。没有可观测性的AI系统,出了问题只能靠猜。
