AI应用成本工程:把LLM调用费用降低50%的完整指南
如果你的AI应用已经有了一定规模的用户,你一定被API账单惊到过。GPT-4o、Claude 3.5这些模型的能力固然强,但Token费用叠加起来,很快就成为显著的成本中心。
成本工程不是"用更便宜的模型降质量",而是在保持质量的前提下,智能地分配计算资源。本文分享一套系统性的成本优化方法。## 先建立成本追踪基线在优化之前,必须先知道你在哪里花了钱。pythonfrom openai import AsyncOpenAIfrom dataclasses import dataclass, fieldfrom datetime import datetimeimport asyncio@dataclassclass LLMCallRecord: timestamp: str model: str prompt_tokens: int completion_tokens: int total_tokens: int cost_usd: float endpoint: str # 哪个功能/接口调用的 latency_ms: int # GPT-4o定价(2026) PRICING = { "gpt-4o": {"input": 0.0000025, "output": 0.000010}, "gpt-4o-mini": {"input": 0.00000015, "output": 0.0000006}, "gpt-4o-mini-realtime": {"input": 0.0000006, "output": 0.0000024}, } @classmethod def from_response(cls, response, endpoint: str, latency_ms: int) -> "LLMCallRecord": model = response.model usage = response.usage pricing = cls.PRICING.get(model, {"input": 0, "output": 0}) cost = ( usage.prompt_tokens * pricing["input"] + usage.completion_tokens * pricing["output"] ) return cls( timestamp=datetime.utcnow().isoformat(), model=model, prompt_tokens=usage.prompt_tokens, completion_tokens=usage.completion_tokens, total_tokens=usage.total_tokens, cost_usd=cost, endpoint=endpoint, latency_ms=latency_ms )class CostTracker: def __init__(self, db): self.db = db async def record(self, record: LLMCallRecord): await self.db.insert("llm_calls", record.__dict__) async def get_cost_breakdown(self, days: int = 7) -> dict: """按功能模块分析成本""" rows = await self.db.query(f""" SELECT endpoint, model, COUNT(*) as call_count, SUM(total_tokens) as total_tokens, SUM(cost_usd) as total_cost, AVG(latency_ms) as avg_latency FROM llm_calls WHERE timestamp > datetime('now', '-{days} days') GROUP BY endpoint, model ORDER BY total_cost DESC """) return { "breakdown": rows, "total_cost": sum(r["total_cost"] for r in rows), "top_cost_endpoints": rows[:5] }有了这个追踪,你会清楚地看到哪些功能是成本大户,然后有针对性地优化。## 策略一:模型路由不是所有请求都需要最强的模型。根据任务复杂度,自动选择合适的模型:pythonclass SmartModelRouter: """根据任务复杂度自动选择模型""" # 成本从低到高 MODEL_TIERS = { "nano": "gpt-4o-mini", # 简单任务 "standard": "gpt-4o", # 中等任务 "premium": "o1", # 复杂推理 } async def route(self, task: LLMTask) -> str: complexity = await self._assess_complexity(task) if complexity == "low": return self.MODEL_TIERS["nano"] elif complexity == "medium": return self.MODEL_TIERS["standard"] else: return self.MODEL_TIERS["premium"] async def _assess_complexity(self, task: LLMTask) -> str: # 规则判断(快速,无成本) # 分类任务 → 简单 if task.type in ["classification", "sentiment", "entity_extraction"]: return "low" # 短文本处理 → 简单 if len(task.input) < 500 and task.expected_output_length < 200: return "low" # 包含推理要求 → 复杂 reasoning_keywords = ["分析", "推断", "原因", "为什么", "比较", "权衡"] if any(kw in task.input for kw in reasoning_keywords): return "medium" # 代码生成 → 中等(除非很简单) if task.type == "code_generation": return "medium" if len(task.input) < 300 else "high" # 默认中等 return "medium"### 模型路由的成本对比GPT-4o-mini vs GPT-4o的成本差距约为17-20倍。如果你有60%的请求可以用mini处理,总体成本可以降低50%以上。## 策略二:提示词压缩更短的提示词意味着更低的输入Token成本。pythonclass PromptCompressor: def compress_context(self, messages: list[dict], target_token_budget: int) -> list[dict]: """压缩对话历史到目标Token数""" current_tokens = self._count_tokens(messages) if current_tokens <= target_token_budget: return messages # 保留:第一条系统消息 + 最近N条消息 result = [] if messages[0]["role"] == "system": result.append(messages[0]) historical = messages[1:] else: historical = messages # 先保留最近的消息 recent = [] recent_tokens = 0 for msg in reversed(historical): tokens = self._count_tokens([msg]) if recent_tokens + tokens < target_token_budget * 0.7: recent.insert(0, msg) recent_tokens += tokens else: break # 中间部分压缩成摘要 if len(recent) < len(historical): omitted = historical[:len(historical) - len(recent)] summary = self._summarize_messages(omitted) result.append({ "role": "system", "content": f"[对话摘要] {summary}" }) result.extend(recent) return result def remove_redundancy(self, prompt: str) -> str: """移除提示词中的冗余内容""" # 移除多余的空行 prompt = re.sub(r'\n{3,}', '\n\n', prompt) # 移除重复的说明(用于测试,生产环境慎用) # prompt = self._deduplicate_instructions(prompt) # 压缩示例(few-shot中的冗长示例) prompt = self._compress_examples(prompt) return prompt.strip() def _compress_examples(self, prompt: str) -> str: """压缩few-shot示例,只保留关键部分""" # 找到示例块并压缩(具体实现依赖你的Prompt格式) return prompt # 实际实现时根据你的格式处理### 动态Few-shot选择不要在所有请求里都附上所有示例。根据当前请求的特征,动态选择最相关的1-3个示例:pythonclass DynamicFewShot: def __init__(self, examples: list[dict], embedder): self.examples = examples self.embedder = embedder # 预计算示例的向量 self.example_vectors = embedder.encode([e["input"] for e in examples]) def select(self, query: str, n: int = 3) -> list[dict]: """选择最相关的n个示例""" query_vector = self.embedder.encode([query])[0] # 计算相似度 similarities = np.dot(self.example_vectors, query_vector) top_indices = np.argsort(similarities)[-n:][::-1] return [self.examples[i] for i in top_indices]## 策略三:缓存缓存是最直接的成本节省方式:完全相同的请求不重复调用LLM。pythonimport hashlibimport jsonclass SemanticCache: """语义缓存:相似的问题复用答案""" def __init__(self, vector_store, threshold: float = 0.95): self.store = vector_store self.threshold = threshold # 相似度阈值 async def get(self, query: str) -> str | None: """检查缓存""" # 计算查询向量 query_vector = await self.embed(query) # 查找相似的缓存条目 results = await self.store.search(query_vector, top_k=1) if results and results[0].score >= self.threshold: # 命中缓存 return results[0].cached_response return None async def set(self, query: str, response: str, ttl: int = 3600): """写入缓存""" query_vector = await self.embed(query) await self.store.insert({ "query": query, "query_vector": query_vector, "response": response, "created_at": datetime.utcnow().isoformat(), "ttl": ttl }) async def embed(self, text: str) -> list[float]: """生成文本向量(用便宜的嵌入模型)""" response = await self.openai.embeddings.create( model="text-embedding-3-small", # 最便宜的嵌入模型 input=text ) return response.data[0].embeddingclass ExactCache: """精确缓存:完全相同的请求直接返回""" def __init__(self, redis_client): self.redis = redis_client def _make_key(self, messages: list[dict], model: str, **params) -> str: content = json.dumps({ "messages": messages, "model": model, **params }, sort_keys=True) return "llm:" + hashlib.md5(content.encode()).hexdigest() async def get(self, messages, model, **params) -> dict | None: key = self._make_key(messages, model, **params) cached = await self.redis.get(key) if cached: return json.loads(cached) return None async def set(self, messages, model, response, ttl=3600, **params): key = self._make_key(messages, model, **params) await self.redis.setex(key, ttl, json.dumps(response))## 策略四:批量处理如果你有大量独立的LLM任务(比如批量分析文章、处理数据),用批处理API可以省50%费用:pythonasync def batch_classify_articles(articles: list[str]) -> list[str]: """批量分类文章(使用OpenAI Batch API,省50%费用)""" import json # 1. 创建批处理请求文件 requests = [] for i, article in enumerate(articles): requests.append({ "custom_id": f"article_{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": "将文章分类为:技术/产品/市场/其他"}, {"role": "user", "content": article[:500]} ], "max_tokens": 20 } }) # 写入JSONL文件 batch_file = "batch_input.jsonl" with open(batch_file, "w") as f: for req in requests: f.write(json.dumps(req, ensure_ascii=False) + "\n") # 2. 上传并创建批处理作业 client = AsyncOpenAI() with open(batch_file, "rb") as f: uploaded = await client.files.create(file=f, purpose="batch") batch = await client.batches.create( input_file_id=uploaded.id, endpoint="/v1/chat/completions", completion_window="24h" ) # 3. 等待完成(批处理通常在几分钟到几小时内完成) while batch.status not in ["completed", "failed", "cancelled"]: await asyncio.sleep(60) batch = await client.batches.retrieve(batch.id) # 4. 获取结果 if batch.status == "completed": results_file = await client.files.content(batch.output_file_id) results = {} for line in results_file.text.split("\n"): if line: result = json.loads(line) results[result["custom_id"]] = ( result["response"]["body"]["choices"][0]["message"]["content"] ) return [results.get(f"article_{i}", "未知") for i in range(len(articles))] return ["分类失败"] * len(articles)## 成本优化的预期收益综合以上策略,一个典型AI应用的成本优化空间:| 策略 | 预期节省 | 实施难度 ||------|---------|---------|| 模型路由 | 30-50% | 中 || 语义缓存 | 10-30% | 中 || 提示词压缩 | 10-20% | 低 || 批量处理 | 50%(批量任务) | 低 || 精确缓存 | 5-15% | 低 |关键建议:先建基线,找出Top 3的成本大户,针对性优化,而不是全面铺开。通常20%的接口贡献了80%的成本,精准打击效果最好。—本文关键词:LLM成本优化、Token缓存、模型路由、批量处理、AI成本工程
