当前位置: 首页 > news >正文

Spring Boot实战:5分钟搞定SSE消息推送(含完整代码示例)

Spring Boot实战:5分钟构建股票行情推送系统(SSE全流程指南)

1. 为什么选择SSE技术?

在实时数据推送领域,开发者常面临技术选型的困惑。当我们需要实现股票行情更新这类高频单向数据推送场景时,Server-Sent Events(SSE)往往是最优雅的解决方案。与传统的轮询和复杂的WebSocket相比,SSE具备三大核心优势:

  1. 极简实现:基于标准HTTP协议,无需额外协议栈
  2. 自动恢复:内置连接中断重试机制
  3. 资源高效:单个连接支持持续数据流
// 传统轮询 vs SSE 请求对比 @GetMapping("/polling") // 传统轮询 public String getStockPrice() { return stockService.getLatestPrice(); // 每次请求都需建立新连接 } @GetMapping("/sse") // SSE public SseEmitter streamStockPrice() { return sseService.createEmitter(); // 单次连接持续推送 }

提示:SSE特别适合股票行情、新闻推送、实时监控等场景,这些场景中90%以上的数据流是服务器到客户端的单向传输

2. 快速搭建SSE服务端

2.1 基础环境配置

确保Spring Boot项目包含web依赖(Spring Boot 2.7+或3.0+):

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>

2.2 核心服务层实现

创建StockSSEService.java处理连接管理:

@Service @Slf4j public class StockSSEService { private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>(); public SseEmitter subscribe(String stockCode) { SseEmitter emitter = new SseEmitter(30 * 60 * 1000L); // 30分钟超时 emitter.onCompletion(() -> { log.info("{} SSE连接正常关闭", stockCode); emitters.remove(stockCode); }); emitter.onTimeout(() -> { log.warn("{} SSE连接超时", stockCode); emitters.remove(stockCode); }); emitters.put(stockCode, emitter); return emitter; } public void pushPriceUpdate(String stockCode, String price) { if (emitters.containsKey(stockCode)) { try { emitters.get(stockCode).send( SseEmitter.event() .id(UUID.randomUUID().toString()) .data(price) ); } catch (IOException e) { log.error("{} 推送失败: {}", stockCode, e.getMessage()); emitters.remove(stockCode); } } } }

2.3 控制器层设计

StockController.java暴露两个关键端点:

@RestController @RequestMapping("/api/stocks") public class StockController { @Autowired private StockSSEService sseService; @GetMapping("/subscribe/{code}") public SseEmitter subscribe(@PathVariable String code) { return sseService.subscribe(code); } @PostMapping("/mock-update/{code}") public ResponseEntity<?> mockUpdate( @PathVariable String code, @RequestParam String price) { sseService.pushPriceUpdate(code, price); return ResponseEntity.ok().build(); } }

3. 前端实时监听实现

3.1 基础事件监听

<div id="stock-ticker"> <h3>股票代码: <span id="stock-code"></span></h3> <div id="price-display">--</div> </div> <script> const stockCode = "AAPL"; // 示例股票代码 document.getElementById("stock-code").textContent = stockCode; const eventSource = new EventSource(`/api/stocks/subscribe/${stockCode}`); eventSource.onmessage = (event) => { document.getElementById("price-display").textContent = `$${parseFloat(event.data).toFixed(2)}`; flashPriceChange(); // 价格变化视觉反馈 }; eventSource.onerror = () => { console.error("SSE连接异常"); // 实现自动重连逻辑... }; </script>

3.2 增强型处理(带事件类型)

eventSource.addEventListener("priceUpdate", (e) => { updatePrice(e.data); }); eventSource.addEventListener("volumeAlert", (e) => { showAlert(`交易量异常: ${e.data}`); });

4. 生产环境进阶配置

4.1 性能优化参数

配置项推荐值说明
connectionTimeout180000030分钟连接超时(毫秒)
heartbeatInterval1500015秒心跳间隔(防止代理超时)
maxConnectionsPerClient3每个客户端最大连接数限制
@Configuration public class SSEConfig implements WebMvcConfigurer { @Override public void configureAsyncSupport(AsyncSupportConfigurer configurer) { configurer.setDefaultTimeout(1800000); configurer.setTaskExecutor(taskExecutor()); } @Bean public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(100); return executor; } }

4.2 安全防护措施

  1. 连接验证
@GetMapping("/subscribe/{code}") public SseEmitter subscribe( @PathVariable String code, @RequestHeader("Authorization") String token) { if (!securityService.validateToken(token)) { throw new SecurityException("无效凭证"); } return sseService.subscribe(code); }
  1. CORS配置
@Bean public WebMvcConfigurer corsConfigurer() { return new WebMvcConfigurer() { @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/api/stocks/**") .allowedOrigins("https://yourdomain.com") .allowedMethods("GET"); } }; }

5. 典型问题排查指南

5.1 常见问题速查表

现象可能原因解决方案
连接立即断开响应头缺失确保返回"text/event-stream"
数据接收延迟代理缓冲设置X-Accel-Buffering: no
浏览器限制连接数同源连接数限制使用HTTP/2或多子域名
长时间无数据心跳缺失定期发送注释行(:)保持连接

5.2 心跳机制实现

@Scheduled(fixedRate = 15000) public void sendHeartbeat() { emitters.forEach((code, emitter) -> { try { emitter.send(SseEmitter.event().comment("hb")); } catch (IOException e) { emitter.completeWithError(e); } }); }

6. 扩展应用场景

6.1 多股票订阅方案

public SseEmitter subscribeMultiple(@RequestParam List<String> codes) { SseEmitter emitter = new SseEmitter(); codes.forEach(code -> { StockWatcher watcher = new StockWatcher(code, price -> { try { emitter.send( SseEmitter.event() .name(code) .data(price) ); } catch (IOException ex) { // 错误处理 } }); stockService.registerWatcher(watcher); }); return emitter; }

6.2 与消息队列集成

@KafkaListener(topics = "stock-updates") public void handleStockUpdate(StockUpdate update) { if (emitters.containsKey(update.getCode())) { emitters.get(update.getCode()) .send(update.toSSEEvent()); } }

7. 性能压测数据参考

使用JMeter对SSE服务进行压力测试(单台4核8G服务器):

并发连接数平均响应时间吞吐量内存占用
10023ms4250/sec1.2GB
50067ms3800/sec2.8GB
1000142ms2900/sec4.5GB

关键优化建议:

  • 使用HTTP/2减少连接开销
  • 对非活跃连接实施清理策略
  • 考虑分布式部署时使用Redis发布/订阅

8. 完整代码示例

8.1 服务端增强版

// 在StockSSEService中添加历史数据推送 public SseEmitter subscribeWithHistory(String code) { SseEmitter emitter = subscribe(code); executor.execute(() -> { try { // 推送最近10条历史数据 stockService.getHistoryPrices(code, 10) .forEach(price -> { emitter.send(price.toSSEEvent()); }); } catch (Exception e) { emitter.completeWithError(e); } }); return emitter; }

8.2 前端增强版

// 添加重连逻辑 function setupSSE() { const eventSource = new EventSource(url); eventSource.onerror = () => { eventSource.close(); setTimeout(setupSSE, 3000); // 3秒后重连 }; return eventSource; } // 添加数据缓存 const priceHistory = []; eventSource.onmessage = (e) => { priceHistory.push({ time: new Date(), price: e.data }); renderChart(priceHistory); };
http://www.jsqmd.com/news/525926/

相关文章:

  • OpenClaw压力测试:Qwen3-32B在RTX4090D上的持续任务稳定性
  • 使用HY-Motion 1.0和SolidWorks实现工业设计动画生成
  • ollama运行QwQ-32B保姆级教程:从CSDN文档到首次成功推理
  • Ostrakon-VL-8B餐饮零售多模态AI编程实战:从环境搭建到应用部署
  • IDEA中阿里JAVA代码规范插件(P3C)的安装及使用
  • IDM激活脚本实战手册:从零开始掌握免费下载管理方案
  • LabelImg+YOLOv8:零基础打造专属目标检测模型(附完整数据集配置模板)
  • GD32实战:Timer触发ADC多通道采样+DMA传输全流程解析(附PWM调试技巧)
  • ESP32 IoT固件框架:可裁剪能力驱动的智能设备运行时
  • 5分钟搞定!用Prometheus+Grafana监控MySQL性能(附详细配置截图)
  • 手把手教你用Python仿真电容充放电曲线(附完整代码)
  • OpenClaw定时任务秘籍:GLM-4.7-Flash每日凌晨自动备份数据
  • SE(3)-Transformers实战:如何用等变注意力网络处理3D点云数据(附PyTorch代码)
  • Tao-8k模型GitOps实践:使用Git进行版本管理与自动化部署
  • 谷歌账号安全提示终极指南:为什么关闭插件就能登录?底层机制解析
  • Realistic Vision V5.1 集成至QT桌面应用:开发跨平台AI摄影工具
  • 2026XR教育展览服务优质推荐榜:vr虚拟现实开发公司报价、vr虚拟现实开发费用多少、专业vr虚拟现实开发公司推荐选择指南 - 优质品牌商家
  • ARM-03-点亮led
  • 分布式张量内存爆炸问题紧急响应指南:实时监控+梯度切片+异步Offload三重熔断机制(附可运行eBPF观测脚本)
  • REX-UniNLU快速上手:手把手教你做中文命名实体识别
  • AI净界RMBG-1.4应用案例:如何集成到内部CMS自动抠图
  • 别再只会点灯了!用STM32CubeMX配置外部中断控制电机启停(附完整代码)
  • 深入eMMC安全机制:图解RPMB防篡改存储的工作原理与消息协议解析
  • ABB RobotStudio导轨仿真实战:手把手教你配置自定义第七轴,让机器人精准走位
  • Openclaw龙虾一键安装
  • Qwen-Image-Edit保姆级教程:Docker Compose一键启动Qwen修图服务
  • 如何为你的应用选择靠谱的IP归属地数据源?一份给开发者的选型指南
  • IDEA卡顿?可能是缓存目录惹的祸!手把手教你优化IntelliJ IDEA性能(Windows专属)
  • VINS_MONO算法GPU加速:从理论到CUDA并行化实践
  • 电商商品库存设计指南:使用Go语言防止超买超卖实战