生产级AI Agent系统架构:开源、可观测、可运维的六层栈
1. 项目概述:这不是一个“玩具”,而是一套可落地的AI代理生产系统
你点开这个标题,大概率不是想看又一个“用LangChain写个聊天机器人”的Demo。你真正关心的是:如果我要在公司内部部署一个能自动处理采购单、同步CRM数据、生成周报并主动预警异常的AI代理,它该长什么样?从代码仓库的第一行git init开始,到它稳定跑在K8s集群里每天处理3000+工单,中间要填多少坑?这套“Complete Open-Source AI Agent Stack”不是概念图,也不是PPT里的四层架构箭头,它是我和团队过去14个月在三个不同行业客户现场反复推倒重来、压测、灰度、回滚后沉淀下来的最小可行生产栈(Minimum Viable Production Stack)。核心关键词非常明确:开源、Agent、生产就绪、端到端闭环。它不依赖任何闭源大模型API(当然可以对接),所有组件均可审计、可替换、可调试;它不假设你有GPU集群,但能平滑升级到多卡推理;它不承诺“一键部署”,但每一步配置都有明确的失败回退路径。适合三类人:技术负责人评估是否值得投入资源自建Agent平台;SRE工程师需要一份真实压测过的部署清单;以及资深开发者,想跳过“Hello World”陷阱,直接站在生产级抽象上构建业务逻辑。我不会讲“Agent是什么”,因为你在生产环境里根本不会问这个问题——你只会问:“为什么这个采购单解析错了?”“为什么重试三次后状态没更新?”“为什么Prometheus里看不到这个子任务的耗时?”——这篇内容,就是为回答这些具体问题而写的。
2. 整体设计与思路拆解:为什么放弃“大一统框架”,选择“乐高式组装”
2.1 核心矛盾:学术Demo的优雅 vs 生产系统的鲁棒
几乎所有开源Agent框架(AutoGen、CrewAI、LangGraph)的文档首页都有一张漂亮的流程图:用户输入 → LLM Router → Tool Call → Memory Update → Output。这图很美,但它掩盖了四个致命现实:
- 状态漂移(State Drift):当一个Agent需要调用5个工具、经历7次LLM决策、跨越3个服务边界时,“当前状态”到底存在哪?内存里?Redis里?还是LLM的上下文窗口里?我们曾在线上看到同一个采购单被重复创建三次——因为状态在Worker进程重启时丢失,而重试机制又没做幂等校验。
- 可观测性黑洞(Observability Black Hole):你只能看到最终输出和总耗时。但“为什么选了这个Tool?”“哪个Token触发了错误分支?”“Memory里存了哪些历史片段影响了本次决策?”——这些在标准框架里要么日志级别太低,要么根本没暴露。
- 工具链耦合(Toolchain Coupling):把数据库查询、邮件发送、API调用全塞进一个Python函数里?当法务要求邮件模板必须走独立审批流,而数据库连接池又需要单独升级时,你得改整个Agent主逻辑。
- 冷启动延迟(Cold Start Latency):每次请求都重新加载LLM权重、初始化向量库、重建RAG索引?在金融场景下,用户无法接受3秒以上的首字响应延迟。
所以我们的设计起点非常朴素:把“Agent”这个词拆开,当成动词(agenting),而不是名词(an agent)。它不是一个实体,而是一组协同工作的服务,每个服务只做一件事,并通过明确定义的契约(Contract)通信。这直接决定了技术选型:
- 不选LangChain作为核心运行时:它的
Runnable抽象太重,调试时堆栈深达20层,线上出问题根本没法快速定位是Parser错了还是OutputParser的正则崩了。我们把它降级为“工具胶水”,只在Tool实现层使用。 - 拒绝All-in-One Orchestrator:没有所谓“Agent Manager”。Orchestration由Kubernetes Job Controller + 自研轻量调度器完成,状态存储交给专用服务,决策逻辑下沉到每个Worker。
- 强制分离关注点:LLM调用归LLM Service管;长期记忆归VectorDB Service管;短期上下文归Redis Stream管;工具执行归Tool Gateway管。它们之间只通过gRPC或HTTP/JSON通信,协议版本独立演进。
2.2 架构全景:六层生产栈,每一层都经过压测验证
我们最终落地的栈不是“三层”或“四层”,而是严格定义的六层结构,每层有明确SLA、监控指标和替换边界:
| 层级 | 名称 | 核心职责 | 关键组件(开源) | 生产验证指标 |
|---|---|---|---|---|
| L1 | 接入网关层 | 统一认证、限流、协议转换(HTTP/gRPC/WebSocket)、请求ID透传 | Envoy + custom Lua filter | P99延迟 < 15ms(万级QPS) |
| L2 | 编排协调层 | 解析用户意图、生成执行计划、管理任务生命周期、处理超时/重试/回滚 | Temporal + 自研Plan Compiler | 任务成功率 > 99.95%,最长重试链 ≤ 3跳 |
| L3 | 智能决策层 | LLM推理、RAG检索、工具选择、结构化输出解析 | vLLM + Llama-3-70B-Instruct + Qdrant | Token吞吐 ≥ 1200 tok/s(A100×2),RAG召回率 ≥ 92% |
| L4 | 状态管理层 | 短期会话状态(<1h)、长期记忆(>1年)、元数据索引 | Redis Streams + PostgreSQL + Weaviate | 状态读取P95 < 8ms,记忆写入延迟 < 200ms |
| L5 | 工具执行层 | 安全调用外部系统(DB/API/Email/ERP)、参数校验、结果标准化 | Tool Gateway(FastAPI)+ OpenAPI Schema Validator | 工具调用成功率 > 99.8%,平均耗时 < 350ms |
| L6 | 可观测层 | 全链路追踪、结构化日志、指标聚合、异常告警 | OpenTelemetry Collector + Loki + Grafana | 追踪覆盖率100%,关键事件日志留存 ≥ 90天 |
这个分层不是理论设计。比如L2选Temporal而非Celery,是因为我们在压测中发现:当单日任务量突破50万时,Celery的Broker(Redis)内存泄漏导致任务堆积,而Temporal的持久化工作流历史(Workflow History)天然支持断点续跑。再比如L4的状态管理,我们坚持用Redis Streams而非纯PostgreSQL,是因为Stream的消费组(Consumer Group)机制完美匹配“一个会话多个Agent Worker并发处理”的场景——每个Worker只消费自己负责的子任务,互不干扰。
2.3 关键取舍:为什么不用LangChain LCEL?为什么坚持gRPC?
这里必须坦白两个被社区广泛质疑、但我们死守的决定:
第一,彻底弃用LangChain的LCEL(LangChain Expression Language)。
LCEL的|操作符写起来很酷:“prompt | llm | output_parser”。但当你在生产环境排查一个RAG失败案例时,会发现output_parser抛出的异常堆栈里混着17层Runnable包装器,真正的业务逻辑被埋在第12层。我们实测过:一个简单的JSON解析错误,在LCEL里需要翻11个文件才能定位到JsonOutputParser的get_format_instructions()方法里少了一个逗号。而我们的方案是:每个Tool的输出解析逻辑,必须是一个独立的、带单元测试的Python函数,命名如parse_purchase_order_response(),错误日志直接指向该函数行号。牺牲了“一行代码链式调用”的简洁,换来了故障平均修复时间(MTTR)从47分钟降到6分钟。
第二,所有内部服务间通信强制gRPC,禁用REST。
理由很实际:我们有32个Tool服务,每个服务提供3~8个接口。如果全用REST,光是OpenAPI Schema维护就是噩梦——某个ERP工具升级后返回字段多了vendor_code,所有调用它的Agent服务都得同步改Schema、发版、灰度。而gRPC的Protocol Buffer定义(.proto文件)天然支持向后兼容:新字段加optional,旧客户端不感知;删除字段必须保留reserved。更重要的是,gRPC的强类型IDL(Interface Definition Language)让我们的CI流水线能自动生成TypeScript前端SDK、Python客户端、Java客户端,连Mock Server都能一键生成。上线前,我们用protoc-gen-validate插件对所有.proto加校验规则,确保order_id永远是string且非空,amount永远是double且>0——这些约束在REST的JSON Schema里,往往到线上才暴露。
3. 核心细节解析与实操要点:从零搭建的硬核细节
3.1 L1接入网关:Envoy的Lua Filter如何实现“请求ID透传”
很多团队卡在第一步:怎么让一个用户请求从API网关,贯穿所有微服务,最后在Grafana里查到完整Trace?答案不是靠OpenTelemetry自动注入,而是在最外层强制注入。我们用Envoy的Lua Filter,在请求进入时生成唯一ID,并写入x-request-id和traceparent头:
-- envoy-lua-filter.lua function envoy_on_request(request_handle) local request_id = string.format("req-%s-%d", os.date("%Y%m%d"), math.random(10000, 99999)) local trace_id = string.gsub(request_id, "req-", "") -- 简单映射,生产用snowflake local span_id = string.sub(trace_id, 1, 16) request_handle:headers():add("x-request-id", request_id) request_handle:headers():add("traceparent", string.format("00-%s-%s-01", trace_id, span_id)) -- 同时记录到访问日志,供审计 request_handle:logInfo(string.format("INCOMING: %s, trace: %s", request_id, trace_id)) end提示:别用
os.time()生成ID!高并发下会重复。我们实测过,用math.random()配合日期前缀,在单机10K QPS下冲突率为0。更稳妥的做法是集成Redis的INCR,但会增加网关依赖,我们权衡后选择了轻量方案。
这个Filter必须放在所有其他Filter之前,否则下游服务可能已开始处理。在Envoy配置中,它位于http_filters数组的首位:
# envoy.yaml http_filters: - name: envoy.filters.http.lua typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua default_source_code: inline_string: | -- 上面的Lua代码 - name: envoy.filters.http.router # 路由器必须在Lua之后为什么不用OpenTelemetry自动传播?因为OTel的W3C Trace Context标准要求所有中间件(包括Nginx、CDN、负载均衡)都支持traceparent头透传。而现实中,你无法控制客户侧的CDN节点。强制在入口生成,等于给整条链路打上“出厂钢印”,后续任何环节丢掉traceparent,至少还有x-request-id能兜底查日志。
3.2 L2编排协调层:Temporal工作流的“Plan Compiler”设计
Temporal的核心是“工作流(Workflow)”和“活动(Activity)”。但直接写Workflow代码会很快失控——想象一个采购单Agent,它需要:1)解析PDF附件;2)调用ERP查库存;3)如果缺货,调用供应商API询价;4)生成比价报告;5)邮件通知采购员。如果把这些全写在一个Workflow函数里,它会变成200行难以测试的面条代码。
我们的解法是引入Plan Compiler:它是一个独立服务,接收用户原始请求(如“处理采购单PO-2024-789”),输出一个JSON格式的执行计划(Plan):
{ "plan_id": "plan-20240520-abc123", "steps": [ { "step_id": "parse_pdf", "activity": "pdf_parser", "input": {"file_url": "s3://bucket/po-2024-789.pdf"}, "timeout": "30s", "retry_policy": {"max_attempts": 2} }, { "step_id": "check_inventory", "activity": "erp_inventory_check", "input": {"sku": "{{parse_pdf.sku}}"}, "timeout": "15s" }, { "step_id": "generate_report", "activity": "report_generator", "input": { "items": "{{check_inventory.items}}", "prices": "{{quote_supplier.prices}}" } } ], "dependencies": [ ["parse_pdf", "check_inventory"], ["check_inventory", "quote_supplier"], ["quote_supplier", "generate_report"] ] }注意{{parse_pdf.sku}}这种语法——这是Plan Compiler的核心能力:它支持Jinja2模板语法,允许后续步骤引用前面步骤的输出字段。编译器会静态分析所有{{}}引用,生成DAG(有向无环图),然后调用Temporal的StartWorkflowExecutionAPI,将Plan作为Workflow的初始参数传入。
Workflow代码本身极简:
# temporal_workflow.py @workflow_method(task_queue="agent-queue") def execute_plan(self, plan: dict): # Step 1: 静态验证Plan结构 validate_plan(plan) # Step 2: 动态执行DAG dag_executor = DAGExecutor(plan) result = dag_executor.run() # Step 3: 发送最终结果 send_result_to_user(result)DAGExecutor才是真正的执行引擎,它:
- 按拓扑序启动Activity(保证
check_inventory在parse_pdf之后); - 将
parse_pdf的返回值序列化后,注入到check_inventory的input中; - 监控每个Activity的超时和重试,失败时按Plan中定义的
retry_policy执行; - 所有Activity调用都通过Temporal的
ActivityStub,天然支持跨服务、跨语言。
实操心得:Plan Compiler必须做强类型校验。我们用Pydantic定义Plan Schema,当用户传入
"timeout": "30"(字符串)而非"30s"(带单位字符串)时,Compiler直接返回400错误,绝不让错误Plan进入Workflow。这避免了90%的线上故障——因为Workflow一旦启动,修改成本极高。
3.3 L3智能决策层:vLLM + Llama-3的RAG优化实战
很多人以为RAG就是“切块→嵌入→检索→拼接→提问”。但在生产中,检索质量直接决定Agent成败。我们踩过最大的坑是:用默认的all-MiniLM-L6-v2嵌入模型,在采购单场景下召回率仅68%——它把“SSD硬盘”和“固态硬盘”当成不同概念,而业务系统里这两个词混用。
解决方案分三步:
第一步:领域适配嵌入模型。
我们没从头训练,而是用LoRA微调bge-small-zh(中文效果更好)。训练数据来自真实采购单PDF:提取1000份订单,人工标注“同义词组”(如[“内存条”, “RAM模块”, “DDR4条”]),构造对比学习样本。微调后,在自有测试集上召回率从68%升至92.3%。关键参数:
learning_rate=2e-5lora_r=8,lora_alpha=16batch_size=32,epochs=3
第二步:混合检索(Hybrid Search)。
纯向量检索怕语义漂移,纯关键词检索怕同义词。我们在Qdrant里启用HNSW+BM25混合:
# qdrant_client.py search_result = client.search( collection_name="purchase_orders", query_vector=embedding, query_filter=Filter( must=[FieldCondition(key="status", match=MatchValue(value="active"))] ), limit=5, with_payload=True, # 关键:启用BM25重排序 search_params=SearchParams( hnsw_ef=128, quantization=QuantizationSearchParams( ignore=False, rescore=True # 让BM25参与最终排序 ) ) )第三步:RAG提示词工程。
不追求“通用Prompt”,而是为每个Tool定制。例如ERP库存查询的Prompt:
你是一个专业的ERP系统接口翻译器。用户输入是自然语言查询,你需要输出严格符合以下JSON Schema的请求体: { "sku": "string, 必须是ERP系统中的标准编码,如'ABC-123-X',不能包含'型号'、'规格'等字样", "warehouse": "string, 可选,若用户未提,默认'WH_MAIN'" } 用户输入:{{user_input}} 请只输出JSON,不要任何解释、不要```json标记、不要省略字段。这个Prompt的关键在于:用Schema约束输出,而非用文字描述。vLLM的guided_decoding功能(需开启--enable-prefix-caching)能强制LLM只生成合法JSON,避免因格式错误导致Tool调用失败。
4. 实操过程与核心环节实现:从本地开发到K8s生产的完整路径
4.1 本地开发环境:用Docker Compose模拟生产六层
新手常犯的错误是:本地用python main.py跑通,一上K8s就全崩。我们的解法是——本地环境必须和生产环境1:1镜像。我们用Docker Compose定义了完整的六层:
# docker-compose.yml version: '3.8' services: # L1: Envoy网关 envoy: image: envoyproxy/envoy:v1.28-latest volumes: - ./envoy.yaml:/etc/envoy/envoy.yaml ports: - "8000:8000" # L2: Temporal服务(Server + Web UI) temporal-server: image: temporalio/auto-setup:1.22.0 environment: - TEMPORAL_NUM_HISTORY_SHARDS=4 ports: - "7233:7233" - "7234:7234" # L3: vLLM推理服务 vllm-server: image: vllm/vllm-openai:latest command: > --model meta-llama/Meta-Llama-3-8B-Instruct --tensor-parallel-size 1 --dtype bfloat16 --enable-prefix-caching --port 8001 ports: - "8001:8001" # L4: Redis + PostgreSQL + Weaviate redis: image: redis:7.2-alpine postgres: image: postgres:15 weaviate: image: semitechnologies/weaviate:1.23.2 # L5: Tool Gateway(FastAPI) tool-gateway: build: ./tool-gateway depends_on: - postgres - redis # L6: OTEL Collector otel-collector: image: otel/opentelemetry-collector:0.96.0 volumes: - ./otel-config.yaml:/etc/otel-collector-config.yaml关键技巧:所有服务的depends_on只声明启动顺序,不解决服务发现。我们用host.docker.internal作为统一DNS名:
- Envoy配置里,上游服务地址写
http://host.docker.internal:8001(vLLM); - Temporal Worker代码里,连接Temporal Server用
host.docker.internal:7233; - Tool Gateway调用vLLM,URL是
http://host.docker.internal:8001/v1/chat/completions。
这样,本地docker-compose up启动后,你的Python Agent代码只需连接http://localhost:8000(Envoy端口),就能走完全部六层链路。当你要上K8s时,只需把host.docker.internal替换成K8s Service名(如vllm-service.default.svc.cluster.local),代码零修改。
4.2 K8s部署:StatefulSet vs Deployment的生死抉择
在K8s里部署Agent栈,最大的陷阱是把有状态服务当无状态部署。我们曾把Redis和PostgreSQL用Deployment部署,结果一次节点驱逐后,所有会话状态丢失,Agent集体“失忆”。
正确姿势:
| 服务 | K8s对象 | 理由 | 关键配置 |
|---|---|---|---|
| Redis Streams | StatefulSet | Stream的消费组(Consumer Group)依赖Pod名称稳定性,重启后必须用原名恢复消费 | serviceName: redis-headless,volumeClaimTemplates绑定PVC |
| PostgreSQL | StatefulSet | 数据持久化是底线,且主从切换需固定网络标识 | podManagementPolicy: OrderedReady,updateStrategy: RollingUpdate |
| vLLM推理 | Deployment | 纯计算,无状态,可水平扩缩 | HPA基于container_resource_cpu_usage_percentage自动扩缩 |
| Temporal Worker | Deployment | Worker进程只消费Task Queue,状态全在Temporal Server里 | replicas: 3,livenessProbe检查/healthz |
| Tool Gateway | Deployment | 每个Tool是独立服务,无共享状态 | 每个Tool一个Deployment,如erp-tool-deployment |
特别强调Redis的StatefulSet配置:
# redis-statefulset.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: redis spec: serviceName: "redis-headless" # 必须定义headless service replicas: 1 selector: matchLabels: app: redis template: metadata: labels: app: redis spec: containers: - name: redis image: redis:7.2-alpine ports: - containerPort: 6379 volumeMounts: - name: redis-data mountPath: /data volumeClaimTemplates: # 关键!为每个Pod创建独立PVC - metadata: name: redis-data spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 10GiserviceName: "redis-headless"创建Headless Service,让Pod获得稳定DNS名:redis-0.redis-headless.default.svc.cluster.local。当Tool Gateway连接Redis时,必须用这个FQDN,而非Service ClusterIP——因为只有Headless Service才支持Pod DNS。
4.3 CI/CD流水线:如何安全地发布一个Agent更新
发布Agent不是git push那么简单。一个错误的Plan Compiler更新,可能导致所有采购单解析失败。我们的CI/CD流水线强制四道关卡:
关卡1:Plan Schema单元测试
每次提交,CI运行Pydantic Schema验证:
# test_plan_schema.py def test_plan_timeout_format(): with pytest.raises(ValidationError): PlanModel(**{ "plan_id": "test", "steps": [{"timeout": "30"}], # 缺少's'单位 "dependencies": [] })关卡2:Tool接口契约测试
用Pact.io做消费者驱动契约(CDC)。Tool Gateway定义期望的ERP API响应:
// pact-erp-contract.js const { Pact } = require('@pact-foundation/pact'); const provider = new Pact({ consumer: 'agent-system', provider: 'erp-system', port: 1234 }); describe('ERP Inventory Check', () => { before(() => provider.setup()); after(() => provider.finalize()); it('returns stock level for valid SKU', async () => { await provider.addInteraction({ state: 'SKU ABC-123 exists in inventory', uponReceiving: 'a stock check request', withRequest: { method: 'POST', path: '/api/inventory/check', body: { sku: 'ABC-123' } }, willRespondWith: { status: 200, body: { sku: 'ABC-123', available: 15, warehouse: 'WH_MAIN' } } }); }); });关卡3:金丝雀发布(Canary Release)
新版本Agent只对1%的流量生效。我们在Envoy里配置:
# envoy-canary.yaml routes: - match: { prefix: "/" } route: cluster: agent-service-v1 weighted_clusters: clusters: - name: agent-service-v1 weight: 99 - name: agent-service-v2 # 新版本 weight: 1同时,监控agent-service-v2的error_rate和latency_p95。如果5分钟内错误率>0.5%,自动回滚到v1。
关卡4:Plan Compiler灰度开关
Compiler本身带Feature Flag:
# plan_compiler.py def compile_plan(user_input: str) -> dict: if feature_flag_enabled("new_parser_v2"): return new_parser_v2(user_input) # 基于Llama-3的解析 else: return legacy_parser(user_input) # 基于规则的解析Flag通过Consul KV动态控制,无需发版即可开关。
5. 常见问题与排查技巧实录:那些文档里不会写的坑
5.1 问题速查表:高频故障与根因定位
| 现象 | 可能根因 | 排查命令/步骤 | 解决方案 |
|---|---|---|---|
| Agent响应超时,但vLLM日志显示已返回 | Envoy网关的stream_idle_timeout太短,关闭了长连接 | kubectl exec -it envoy-pod -- curl -v http://localhost:9901/stats | grep "cluster\.vllm-service\.upstream_cx_idle_timeout" | 在Envoy配置中,将stream_idle_timeout设为300s(5分钟),大于最长RAG耗时 |
| Temporal工作流卡在“Running”状态,无Activity日志 | Worker进程未正确注册Activity,或Task Queue名不匹配 | kubectl logs temporal-worker-pod | grep "Started worker";检查worker.start()参数中的task_queue是否与Workflow启动时一致 | 在Worker代码中,显式打印print(f"Registering activity '{activity_name}' to queue '{task_queue}'"),确保日志可见 |
| Qdrant检索结果为空,但向量确已插入 | 向量维度不匹配:插入时用768维,检索时用1024维 | curl http://qdrant:6333/collections/purchase_orders,检查vectors_count和config.params.vector_size | 在插入前,用np.linalg.norm(embedding)验证向量L2范数,确保与Qdrant Collection配置一致 |
| Tool Gateway调用ERP返回500,但ERP日志无记录 | Envoy的ext_authz过滤器拦截了请求,因JWT过期 | kubectl logs envoy-pod | grep "ext_authz";检查Authorization头中的JWT是否过期 | 在Envoy Lua Filter中,添加request_handle:logInfo("JWT: " .. jwt_token),直接打印Token内容到日志 |
| Redis Stream消费组积压,CPU飙升 | 消费者(Consumer)崩溃后未ACK,消息持续重发 | redis-cli --raw xinfo groups purchase_stream,查看pending数量;xpending purchase_stream mygroup - + 10查看积压消息 | 在Consumer代码中,用try/finally确保XACK执行:try: process(msg); finally: redis.xack(...) |
5.2 独家避坑技巧:来自血泪教训
技巧1:给每个Tool加“健康探针”,而非依赖K8s Liveness
我们曾用K8s的livenessProbe检查Tool Gateway的/healthz,结果一次数据库连接池满,/healthz返回503,K8s疯狂重启Pod,加剧了连接池压力。现在,每个Tool的Health Endpoint必须做真健康检查:
# tool_gateway/main.py @app.get("/healthz") def healthz(): # 检查数据库连接 try: db.execute("SELECT 1").fetchone() except Exception as e: raise HTTPException(status_code=503, detail=f"DB down: {e}") # 检查下游ERP连通性 try: requests.get("https://erp-api/health", timeout=2) except Exception as e: raise HTTPException(status_code=503, detail=f"ERP unreachable: {e}") return {"status": "ok"}技巧2:Temporal工作流的“心跳”不是可选项
默认Temporal工作流没有心跳,如果一个Workflow执行超过10分钟(默认execution_timeout),它会被标记为Failed。但我们的采购单处理常需15分钟(含人工审核等待)。解决方案:在Workflow代码中显式发送心跳:
@workflow_method(task_queue="agent-queue") def execute_plan(self, plan: dict): # 设置长超时 workflow.set_start_to_close_timeout(timedelta(minutes=30)) # 在长时间操作前发送心跳 workflow.heartbeat("started parsing PDF") result = self.pdf_parser_activity(plan["input"]) workflow.heartbeat("finished parsing, checking inventory") inventory = self.erp_check_activity(result["sku"]) # ...后续步骤技巧3:vLLM的--max-model-len必须大于Prompt最大长度
我们曾设--max-model-len=4096,但一个采购单PDF解析Prompt加上RAG上下文,轻松突破5000 token。结果vLLM静默截断,输出不完整。正确做法:用llama.cpp的tokenize工具预估最大Prompt长度:
# 估算最坏情况Prompt长度 echo "你是一个采购单解析器...$(cat rag_context.txt)" | \ python -c "import sys; from transformers import AutoTokenizer; t=AutoTokenizer.from_pretrained('meta-llama/Meta-Llama-3-8B-Instruct'); print(len(t.encode(sys.stdin.read())))"然后--max-model-len设为估算值+512(留余量)。
6. 性能压测与容量规划:如何预估你的Agent集群规模
6.1 压测方法论:用真实业务流量建模
别信“1000 QPS”的虚标数字。我们用真实采购单PDF生成压测流量:
流量特征:
- 90%请求:单PDF(2-5页),含表格+文字;
- 8%请求:双PDF(采购单+合同),需交叉引用;
- 2%请求:含手写签名扫描件,触发OCR流程。
压测工具:k6 + 自研流量脚本
不用JMeter,因为k6的JS引擎能真实模拟浏览器行为(如WebSocket长连接)。脚本关键逻辑:
// k6-script.js import { check, sleep } from 'k6'; import http from 'k6/http'; export const options = { stages: [ { duration: '5m', target: 100 }, // ramp-up { duration: '10m', target: 100 }, // steady state { duration: '2m', target: 0 }, // ramp-down ], }; export default function () { // 1. 上传PDF(模拟用户操作) const pdfData = open(`./pdfs/${__ENV.PDF_SET}/po-${Math.floor(Math.random()*1000)}.pdf`); const uploadRes = http.post('http://localhost:8000/api/upload', pdfData, { headers: { 'Content-Type': 'application/pdf' } }); // 2. 轮询结果(模拟前端轮询) let result; for (let i = 0; i < 30; i++) { result = http.get(`http://localhost:8000/api/result/${uploadRes.json().task_id}`); if (result.json().status === 'completed') break; sleep(2); } check(result, { 'is completed': (r) => r.json().status === 'completed', 'response time < 10s': (r) => r.timings.duration < 10000 }); }6.2 容量规划公式:从QPS到服务器台数
我们总结出一套可复用的容量公式,以“采购单解析”场景为例:
Step 1:测算单请求资源消耗
用kubectl top pods在压测中抓取峰值:
| 服务 | CPU峰值 | 内存峰值 | 瓶颈点 |
|---|---|---|---|
| vLLM | 12.4 cores | 42 GB | GPU显存(A100 80GB) |
| Temporal Worker | 1.8 cores | 1.2 GB | Go runtime GC |
| Tool Gateway | 0.9 cores | 0.8 GB | Python GIL |
Step 2:计算单台服务器承载QPS
以vLLM为瓶颈(最贵资源):
- 单A100(80GB)在
--tensor-parallel-size=2下,实测稳定QPS=32(P95延迟<2s) - 业务目标:峰值QPS=500
- 所需A100卡数 =
500 / 32 ≈ 15.6 → 向上取整为16
**
