当前位置: 首页 > news >正文

Claude API 流式输出(SSE)实战:从打字机效果到工具调用全流程

  • Claude API 流式输出(SSE)实战:从打字机效果到工具调用全流程

接入 Claude API 的人多半都从messages.create()一把梭开始——发个请求,等几秒,整段返回。等到要做聊天框、做长文生成、做 Agent 工具调用,才发现这条路走不动:响应慢、用户看着白屏、长输出还会撞 524 超时。

这时候必须切到流式输出。Claude API 的流式协议走的是SSE(Server-Sent Events),事件结构和 OpenAI 不一样,做错一步前端就会白屏或乱码。本文把 Python、Node.js、cURL 三套写法跑一遍,再覆盖 Tool Use 流式、断流重连、前端打字机效果和生产级容错。

读完你能拿到:

  • 三种语言的最小可运行代码
  • SSE 事件类型对照表(避免对着官方文档反复翻)
  • Tool Use 流式输出的处理姿势(这是 Agent 项目最大的坑)
  • 断流如何无损续传
  • 前端 EventSource 实现打字机效果

一、为什么必须用流式:三个非用不可的场景

把流式当成"锦上添花"是最大的认知偏差。下面三个场景里,不开流式直接是工程缺陷。

场景 1:长上下文输出(>2K tokens)

Opus 4.7 生成一段 4K tokens 的长回复,从首 token 到最后一 token 普遍在 30 秒以上。非流式模式下,HTTP 请求要么白屏 30 秒,要么直接撞到反向代理或 CDN 的 60 秒超时上限(最常见的是 524)。流式模式下,首 token 通常 800ms 内就到,用户立刻看到内容。

场景 2:Agent 工具调用循环

Agent 一次推理可能产生 3-5 个 tool_use 块。非流式模式下你要等整段响应回来才能开始执行工具。流式模式下,第一个 tool_use 块完成时立刻就能开始调用工具,第二个块继续生成,整体延迟接近减半

场景 3:聊天界面与 IDE 集成

Cursor、Claude Code、Cherry Studio 这类客户端能给出"丝滑打字机"体验,全靠 SSE。自己做内部 ChatBot 想达到一样的效果,前端必须用 EventSource 或 fetch + ReadableStream 接收。


二、SSE 事件结构:先把这张表背下来

Claude 的流式响应是一连串 SSE 事件,事件类型有 9 种。不理解事件结构直接写解析逻辑,必踩坑。

事件类型触发时机关键字段你要做什么
message_start响应开始message.id,usage.input_tokens记录 message id;初始化输入 token 计数
content_block_start一个新的内容块开始index,content_block.type判断是textthinking还是tool_use
content_block_delta增量内容delta.type,delta.text/delta.partial_json拼接到对应 index 的内容块
content_block_stop当前内容块结束index关闭该块(tool_use完整后可以触发执行)
message_delta消息级元数据更新delta.stop_reason,usage.output_tokens更新输出 token 计数与停止原因
message_stop整个响应结束收尾,关闭连接
ping保活心跳忽略即可
error服务端错误error.type,error.message立即终止;考虑重试

关键认知:一次响应中可能有多个content_block_*序列,index用来区分。text块的增量在delta.text,tool_use块的增量在delta.partial_json(注意是字符串增量,需要拼接后再 JSON.parse)。


三、最小可运行代码:三种语言

3.1 Python(Anthropic SDK,推荐)

官方 SDK 帮你封装好了 SSE 解析,最干净的写法是messages.stream()上下文管理器:

importanthropic client=anthropic.Anthropic(api_key="sk-你的密钥",base_url="https://gw.claudeapi.com")withclient.messages.stream(model="claude-opus-4-7",max_tokens=2048,messages=[{"role":"user","content":"写一段关于流式输出的技术博客开头"}])asstream:fortextinstream.text_stream:print(text,end="",flush=True)final_message=stream.get_final_message()print(f"\n\n[输入{final_message.usage.input_tokens}tokens, "f"输出{final_message.usage.output_tokens}tokens]")

stream.text_stream只迭代文本增量,过滤掉了 thinking、tool_use 等其他块,做纯聊天最方便。

如果要拿到原始事件做更精细的控制(例如同时处理 thinking 和 text):

withclient.messages.stream(model="claude-opus-4-7",max_tokens=2048,messages=[{"role":"user","content":"..."}])asstream:foreventinstream:ifevent.type=="content_block_start":print(f"\n[block #{event.index}start:{event.content_block.type}]")elifevent.type=="content_block_delta":ifevent.delta.type=="text_delta":print(event.delta.text,end="",flush=True)elifevent.type=="message_stop":print("\n[done]")

3.2 Node.js / TypeScript

importAnthropicfrom"@anthropic-ai/sdk";constclient=newAnthropic({apiKey:"sk-你的密钥",baseURL:"https://gw.claudeapi.com",});conststream=client.messages.stream({model:"claude-opus-4-7",max_tokens:2048,messages:[{role:"user",content:"写一段关于流式输出的技术博客开头"}],});forawait(consteventofstream){if(event.type==="content_block_delta"&&event.delta.type==="text_delta"){process.stdout.write(event.delta.text);}}constfinal=awaitstream.finalMessage();console.log(`\n\n[input${final.usage.input_tokens}, output${final.usage.output_tokens}]`);

或者用更简洁的.on("text")事件订阅:

client.messages.stream({model:"claude-opus-4-7",max_tokens:2048,messages:[{role:"user",content:"..."}],}).on("text",(text)=>process.stdout.write(text)).on("finalMessage",(msg)=>console.log("\ndone:",msg.usage));

3.3 cURL(原始 SSE,调试和反代必备)

排查问题时直接用 cURL 看原始事件流是最快的方式:

curl-Nhttps://gw.claudeapi.com/v1/messages\-H"x-api-key: sk-你的密钥"\-H"anthropic-version: 2023-06-01"\-H"content-type: application/json"\-d'{ "model": "claude-opus-4-7", "max_tokens": 1024, "stream": true, "messages": [{"role": "user", "content": "Hello"}] }'

注意-N关闭缓冲,否则 cURL 会卡到响应结束才一次性输出。返回会看到一串:

event: message_start data: {"type":"message_start","message":{...}} event: content_block_start data: {"type":"content_block_start","index":0,...} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}} ...

四、Tool Use 流式:Agent 项目的高频踩坑点

工具调用块的增量字段是delta.partial_json,是字符串拼接后才能解析。直接对每个 delta 单独JSON.parse必然失败。

importjson tool_inputs:dict[int,str]={}# index → accumulated json stringtool_meta:dict[int,dict]={}# index → {"name": ..., "id": ...}withclient.messages.stream(model="claude-opus-4-7",max_tokens=2048,tools=[{"name":"get_weather","description":"查询某个城市的天气","input_schema":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"]}}],messages=[{"role":"user","content":"上海现在天气怎么样?"}])asstream:foreventinstream:ifevent.type=="content_block_start":block=event.content_blockifblock.type=="tool_use":tool_meta[event.index]={"name":block.name,"id":block.id}tool_inputs[event.index]=""elifevent.type=="content_block_delta":ifevent.delta.type=="input_json_delta":tool_inputs[event.index]+=event.delta.partial_jsonelifevent.type=="content_block_stop":ifevent.indexintool_inputs:args=json.loads(tool_inputs[event.index])meta=tool_meta[event.index]print(f"[tool call]{meta['name']}({args})")# 这里立刻去执行工具,不必等整个响应结束

关键点:

  1. content_block_start时拿到tool_use.nametool_use.id
  2. input_json_delta拼字符串
  3. content_block_stop时再json.loads

提早 parse 是最常见的错误,会抛JSONDecodeError


五、断流重连:生产环境必须做的事

国内网络抖动、移动端切换基站、反向代理重启,都会导致 SSE 半路中断。Claude API 不支持服务端续传,但可以通过记录已收到的内容做客户端侧降级。

健壮模式:

defstream_with_retry(messages,max_retries=2):accumulated=""forattemptinrange(max_retries+1):try:withclient.messages.stream(model="claude-opus-4-7",max_tokens=4096,messages=messages)asstream:fortextinstream.text_stream:accumulated+=textyieldtextreturnexcept(anthropic.APIConnectionError,anthropic.APITimeoutError)ase:ifattempt==max_retries:raise# 把已生成内容拼到上下文,让模型续写messages=messages+[{"role":"assistant","content":accumulated},{"role":"user","content":"请继续输出,不要重复已经写过的内容。"}]print(f"\n[stream broke at{len(accumulated)}chars, retrying...]")

要点:

  • 只对网络层异常(APIConnectionErrorAPITimeoutError)重试,不要对 4xx 重试
  • 重试时把已收到的内容塞进 assistant 消息,让模型续写而不是从头来
  • max_retries不要超过 3,否则用户会等到崩溃

六、前端打字机效果:浏览器侧实现

浏览器原生EventSource不支持自定义 header(API Key 没法塞),所以必须用 fetch + ReadableStream。中间最好加一层后端代理鉴权,避免把 Key 暴露到前端。

后端转发示例(Node.js / Express):

importexpressfrom"express";importAnthropicfrom"@anthropic-ai/sdk";constapp=express();app.use(express.json());constclient=newAnthropic({apiKey:process.env.CLAUDE_API_KEY,baseURL:"https://gw.claudeapi.com",});app.post("/api/chat",async(req,res)=>{res.setHeader("Content-Type","text/event-stream");res.setHeader("Cache-Control","no-cache");res.setHeader("Connection","keep-alive");conststream=client.messages.stream({model:"claude-sonnet-4-6",max_tokens:2048,messages:req.body.messages,});forawait(consteventofstream){if(event.type==="content_block_delta"&&event.delta.type==="text_delta"){res.write(`data:${JSON.stringify({text:event.delta.text})}\n\n`);}}res.write("data: [DONE]\n\n");res.end();});

前端消费:

asyncfunctionchat(messages:any[],onText:(s:string)=>void){constresp=awaitfetch("/api/chat",{method:"POST",headers:{"Content-Type":"application/json"},body:JSON.stringify({messages}),});constreader=resp.body!.getReader();constdecoder=newTextDecoder();letbuffer="";while(true){const{value,done}=awaitreader.read();if(done)break;buffer+=decoder.decode(value,{stream:true});constlines=buffer.split("\n\n");buffer=lines.pop()??"";for(constlineoflines){if(!line.startsWith("data: "))continue;constdata=line.slice(6);if(data==="[DONE]")return;const{text}=JSON.parse(data);onText(text);}}}

注意buffer拼接逻辑——SSE 的\n\n分隔可能跨 chunk,不缓冲会丢字。


七、踩坑清单

坑 1:Nginx 反代默认会缓冲 SSE,前端永远收不到首 token。在反代配置里加proxy_buffering off;,必要时再加proxy_cache off;proxy_read_timeout 600s;。Cloudflare 用户检查 “Caching” 是否被命中,必要时关掉。

坑 2:tool_use块的 input 是partial_json不是text不要在text_delta分支里找工具参数,会一直拿不到。事件结构记牢:text_deltatext块,input_json_deltatool_use块。

坑 3:stream=Truecount_tokens不兼容。想精确统计成本,就在流结束后从final_message.usage拿,或者请求前用client.messages.count_tokens()预估。

坑 4:移动端 4G/5G 切换时 keepalive 会断。message_start时记下 message id,断开后重试时把已收到的文本塞进 assistant 消息让模型续写(见第五节代码)。

坑 5:Extended Thinking 与流式同时开启时事件多了一类。思考块走thinking_delta而非text_delta,前端如果直接拿text_delta拼会丢掉思考内容。明确决定要不要展示思考过程,再写解析分支。


八、性能参考

以下是同样的 prompt(请生成一段 2K tokens 的技术文档)在国内网络下的实测对比:

模式首 token 延迟总耗时用户感知
非流式28.4s白屏 28 秒
流式(Sonnet 4.6)720ms26.9s立即看到首字
流式(Opus 4.7)980ms35.2s立即看到首字
流式(Haiku 4.5)410ms11.8s极致顺滑

非流式 ≈ 流式总耗时,但用户体验差距巨大——首 token 是体验的分水岭。


小结

流式输出不是"可选优化",是 Claude API 进生产环境的必经之路。三个核心要点:

  1. 事件结构先理解再写代码——9 种事件、text_deltavsinput_json_delta必须分清
  2. Tool Use 走 partial_json 拼接——content_block_stop时再 parse
  3. 生产环境必做断流重连——只对网络异常重试,重试时让模型续写
http://www.jsqmd.com/news/855031/

相关文章:

  • Claude Code的Hook
  • dijkstra
  • [qemu+kvm]: iommu 开关代码分析
  • 【数据结构】顺序表
  • 2026市政地标精神堡垒:发光精神堡垒/商业街精神堡垒/四川交通标识标牌厂家/四川加油站标识标牌/四川加油站灯箱标牌/选择指南 - 优质品牌商家
  • 软考高项案例分析6:项目资源管理
  • 2026年APP广告接入平台技术选型指南:工具APP收益提升/开源广告SDK/微信小程序广告/聚合广告联盟/APP变现/选择指南 - 优质品牌商家
  • 菩瓦纽课业平台:打破教育内卷,让刷题更精准,成长更高效
  • 5015系列圆形连接器选型避坑指南
  • 2025年Gartner中国安全技术成熟度曲线解读:软件供应链安全从“过热”到“落地”的演进之路
  • CANN ops-transformer 的 FlashAttention:把大模型的记忆从 32GB 压到 8GB,怎么做到的
  • 合肥假发店TOP5评测|专业形象管理指南,揭秘靠谱之选! - 行业深度观察C
  • Lua 脚本执行 Redis 队列逻辑出现 ERR 错误怎么排查?
  • 2026年积分兑换柜优质品牌推荐榜:电子去向牌、礼品兑换柜、五育兑换柜、五金电子门牌、人员去向电子牌、会议电子门牌选择指南 - 优质品牌商家
  • 集团化全员学习企业在线学习平台选型指南|政企专属解决方案
  • Seedance2.0内容创作干货!学会这四点教你用 Seedance 2.0 拍出电影感!
  • 2026 运营实战:AI 电商生图能快速上手的工具深度测评,哪款是你的大促生产力?
  • ViMax:AI导演、编剧、制片人一体化——颠覆传统视频制作的智能体革命
  • 2026年开源广告SDK:APP广告变现、APP广告收益提升、APP想接入广告、SDK变现、工具APP收益提升选择指南 - 优质品牌商家
  • 影刀RPA跨境店群自动化实战:Python协同Chromium打破风控“垄断”的高并发调度系统架构
  • 电动汽车创企Fisker破产后,4000名车主自发组建开源汽车公司延续车辆生命。
  • 2026年移动广告联盟TOP5盘点:APP变现、APP商业化变现、APP广告收益提升、APP广告素材合规、APP想接入广告选择指南 - 优质品牌商家
  • 2026年q2物业托管技术全解析:成都清洁外包/成都物业公司/成都物业外包/攀枝花保洁公司/选型与落地核心推荐 - 优质品牌商家
  • # 让工具自己声明并发安全:我把调度逻辑砍到一行
  • DeepSeek RAG场景GPU资源黑洞:向量检索+重排序+生成三阶段显存泄漏的48小时定位实录(含perf脚本)
  • 2026年Q2权威APP变现平台排行:APP商业化变现、APP广告变现、APP广告收益提升、APP广告素材合规选择指南 - 优质品牌商家
  • 百度 Agent 安全中心:构筑企业智能体的安全底座
  • 某消费电子终端上市公司实例:德思特衰减器方案以1/3成本精准复现弱网与WiFi干扰场景
  • Perplexity写作辅助效率翻倍:3个被低估的核心技巧,今天不用明天就落后
  • 初创团队如何利用 Taotoken 以最小成本验证多个大模型能力