当前位置: 首页 > news >正文

前端处理大模型SSE格式数据

1. SSE 协议基础

  • MIME 类型text/event-stream

  • 消息边界:每个事件以两个换行符\n\n(或\r\n\r\n)结束。

  • 数据前缀:每行数据以field: value格式组成,常见字段:

    • data:消息内容(可有多行,最终合并为一个字符串)。

    • event:自定义事件类型(可选)。

    • id:事件 ID(用于断线重连)。

    • retry:重连时间(毫秒)。

  • 结束标记:通常用data: [DONE]表示流结束。

2. 数据块示例(OpenAI 兼容格式)

text

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"你好"},"finish_reason":null}]}\n\n data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"世界"},"finish_reason":null}]}\n\n data: [DONE]\n\n

3. 网络传输特性

  • 一次 HTTP 请求,服务器保持连接并分块发送数据。

  • 底层 TCP 可能将多个事件合并成一个数据块,也可能将一个大事件拆分成多个小块。

  • 浏览器通过ReadableStream接收原始字节,必须手动缓冲并按\n\n分割才能还原事件。

4. 手动解析核心步骤

  1. 使用fetch获取响应,得到ReadableStream

  2. 通过getReader()获得读取器,循环读取数据块。

  3. TextDecoder将字节解码为字符串(启用stream: true处理跨块字符)。

  4. 将字符串追加到缓冲区。

  5. 在缓冲区中查找\n\n,提取完整事件块,剩余部分继续缓冲。

  6. 对每个事件块按行解析字段(data:event:等),合并多行data

  7. data字符串尝试解析为 JSON(若需要)。

  8. 调用业务回调处理事件。

5. 第三方库简化开发

  • @microsoft/fetch-event-source:完整的浏览器 SSE 客户端,自动处理连接、重连、事件分发。

  • eventsource-parser:轻量解析器,只负责将流分割为事件,网络层由自己控制。

  • ai(Vercel AI SDK):React 框架级封装,专为 AI 对话设计,内置流式处理。


通用 SSE 数据处理代码(手动实现)

以下是一个可直接使用的 JavaScript 函数,它通过fetch接收 SSE 流,解析每个事件并通过回调返回。

javascript

/** * 通用 SSE 流处理函数 * @param {string} url - 请求地址 * @param {Object} options - fetch 选项(method, headers, body 等) * @param {Object} callbacks - 回调函数 * @param {Function} callbacks.onMessage - 收到事件时调用,参数为解析后的事件对象 { data, event, id, retry } * @param {Function} callbacks.onError - 发生错误时调用 * @param {Function} callbacks.onComplete - 流正常结束时调用 */ async function handleSSE(url, options = {}, callbacks = {}) { const { onMessage, onError, onComplete } = callbacks; try { const response = await fetch(url, { ...options, headers: { 'Accept': 'text/event-stream', ...options.headers } }); if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); } if (!response.body) { throw new Error('ReadableStream not supported'); } const reader = response.body.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; // 解析一个完整的事件块(不含末尾的 \n\n) function parseEventBlock(block) { const lines = block.split('\n'); const eventObj = { data: [], event: null, id: null, retry: null }; for (const line of lines) { if (line.startsWith('data:')) { eventObj.data.push(line.slice(5).trim()); } else if (line.startsWith('event:')) { eventObj.event = line.slice(6).trim(); } else if (line.startsWith('id:')) { eventObj.id = line.slice(3).trim(); } else if (line.startsWith('retry:')) { eventObj.retry = parseInt(line.slice(6).trim(), 10); } // 忽略注释行和其他字段 } const dataStr = eventObj.data.join('\n'); // 尝试将 data 解析为 JSON let parsedData = dataStr; try { parsedData = JSON.parse(dataStr); } catch { // 保持原样 } return { data: parsedData, event: eventObj.event, id: eventObj.id, retry: eventObj.retry }; } // 循环读取数据块 while (true) { const { done, value } = await reader.read(); if (done) { // 流结束,缓冲区可能还有残留(如最后一个事件没有 \n\n) if (buffer.trim().length > 0) { console.warn('Stream ended with incomplete event:', buffer); } onComplete?.(); break; } // 解码并追加到缓冲区 buffer += decoder.decode(value, { stream: true }); // 查找事件分隔符 \n\n let boundaryIndex; while ((boundaryIndex = buffer.indexOf('\n\n')) !== -1) { const eventBlock = buffer.slice(0, boundaryIndex); buffer = buffer.slice(boundaryIndex + 2); // 跳过 \n\n if (eventBlock.trim().length > 0) { const event = parseEventBlock(eventBlock); // 忽略 [DONE] 消息,但可以通过回调自行处理 if (event.data !== '[DONE]') { onMessage?.(event); } else { // 可在这里触发结束前的特殊逻辑 } } } // 同时处理 \r\n\r\n 的兼容情况(可选) while ((boundaryIndex = buffer.indexOf('\r\n\r\n')) !== -1) { const eventBlock = buffer.slice(0, boundaryIndex); buffer = buffer.slice(boundaryIndex + 4); if (eventBlock.trim().length > 0) { const event = parseEventBlock(eventBlock); if (event.data !== '[DONE]') onMessage?.(event); } } } } catch (error) { onError?.(error); } } // ---------- 使用示例 ---------- handleSSE( 'https://api.example.com/stream', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: 'Hello' }) }, { onMessage: (event) => { console.log('收到事件:', event); // 例如处理 OpenAI 格式的增量内容 if (event.data?.choices?.[0]?.delta?.content) { const content = event.data.choices[0].delta.content; // 更新 UI... } }, onError: (err) => console.error('SSE错误:', err), onComplete: () => console.log('流式响应结束') } );

代码说明

  • 缓冲区管理:累积所有接收到的字符串,确保跨块字符正确合并。

  • 事件分割:循环查找\n\n,提取完整事件块。

  • 字段解析:按行解析data:event:等字段,多行data自动合并。

  • 数据转换:尝试将data字符串解析为 JSON,失败则保留原文本。

  • 错误处理:捕获网络错误和解析异常,通过回调通知。

使用第三方库的快速示例

使用@microsoft/fetch-event-source

javascript

import { fetchEventSource } from '@microsoft/fetch-event-source'; await fetchEventSource('/api/stream', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: 'Hello' }), onmessage(event) { const data = JSON.parse(event.data); console.log('增量:', data.choices[0].delta.content); } });
使用eventsource-parser结合fetch

javascript

import { createParser } from 'eventsource-parser'; const response = await fetch('/api/stream', { method: 'POST', body: '...' }); const parser = createParser({ onEvent(event) { const data = JSON.parse(event.data); console.log('事件:', data); } }); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; parser.feed(decoder.decode(value, { stream: true })); }
http://www.jsqmd.com/news/466085/

相关文章:

  • 新能源电动汽车 VCU hil 与 BMS hil 硬件在环仿真探秘
  • 2026年靠谱的油烟管道清洗品牌推荐:学校油烟管道清洗/商用油烟管道清洗/食堂油烟管道清洗高评分品牌推荐(畅销) - 行业平台推荐
  • 深入探讨模板初阶:函数模板与类模板
  • 劳力士、欧米茄、积家专属指南:北京上海深圳杭州南京无锡腕表保值养护哪里好 - 时光修表匠
  • 【开题答辩全过程】以 高校新生数据可视化系统为例,包含答辩的问题和答案
  • 抓包工具——UI接口测试——fiddler
  • 拒绝 500 元智商税!AutoClaw 零门槛安装教程,手把手教你低成本“喂龙虾”
  • PAT-Rational Arithmetic (20)
  • SpringBoot 多环境配置报错全集|profile切换失败、配置不生效终极解决
  • 超节点算力革命(七)| 超节点综合评估体系
  • 数挖不是树蛙-数据挖掘-绪论(非科班必备,数据挖掘科班复习必备)
  • 基于 Java + SpringBoot + Vue + MySQL 的北部湾地区助农系统实战指南
  • @Autowired`和 @Resource区别
  • 商汤小浣熊为OpenClaw注入新技能:软硬一体安全部署,养出精通Excel的龙虾!
  • 2026最强小尺寸安卓平板来了?联想拯救者Y700第五代曝光
  • 破解青少年近视困扰,铭远光学益趣控PRO带来高效防控新选择
  • 2026年比较好的油烟机清洗厂家推荐:商用油烟机清洗/工厂油烟机清洗/餐饮店油烟机清洗厂家推荐及选购指南 - 行业平台推荐
  • 02计算机组成原理-存储器技术(上)
  • 探秘温州新石器无人车:未来出行体验,销售中心实地体验分享
  • 富文本编辑器模板1
  • 墨盒买哪家好?格之格提醒你一定要选靠谱大品牌 - yangyuan-shunfeng
  • Spring的Bean是线程安全的吗
  • Spring Bean 生命周期
  • 疑似口服美容假洋牌真相调查:国内最火8个口服美容品牌深度解析 - 资讯焦点
  • 计算机毕业设计源码:基于python与Flask的京东手机数据分析系统 pyecharts requests爬虫 电子产品 电商 商品 推荐系统 数据分析 可视化 大数据 大模型(建议收藏)✅
  • 注塑机数据采集如何实现与 MES 系统的双向数据闭环?
  • IACheck AI报告文档审核为新能源汽车高压安全检测报告审核提供支撑
  • 格之格硒鼓怎么样?品质硬核、选购省心,办公耗材优选之选(1) - yangyuan-shunfeng
  • 2026年硒鼓耐用品牌推荐:格之格为何成为大众的首选品牌?(1) - yangyuan-shunfeng
  • 数组随课笔记