当前位置: 首页 > news >正文

rabbitmq+websocket实时通知

一、业务背景

在网约车场景中,乘客下单后会进入「等待司机接单」页面。

如果只靠 HTTP 轮询(每隔几秒查一次订单状态),会有这些问题:

  • 延迟高:轮询间隔越大,用户感知越慢
  • 浪费资源:大量无效请求,服务器压力大
  • 体验差:页面需要不断刷新或定时请求

更好的方案是:服务端主动推送。

乘客打开等待页时建立 WebSocket 长连接;司机抢单成功后,后端立刻把通知推送到乘客浏览器。

但这里有个架构问题:

  • 抢单逻辑在 rpc-order 服务
  • WebSocket 连接在 api-gateway 服务

两个服务不在同一进程,不能直接调用hub.Push()

中间就需要一个消息队列做解耦——本项目用的是 RabbitMQ。


二、整体架构

完整链路如下:

司机抢单成功

Redis Stream(异步落库)

rpc-order 更新 MySQL

发布 RabbitMQ 消息(OrderGrabbedEvent)

api-gateway 消费消息

WebSocket Hub 按 userId 精准推送

乘客浏览器收到「司机已接单」通知

三个角色分工:

rpc-order

  • 抢单、落库、发布 MQ 事件

RabbitMQ

  • 跨服务传递「接单通知」

api-gateway

  • 维护 WebSocket 连接,消费 MQ 并推送

三、RabbitMQ:Direct 路由模式

3.1 为什么选 Direct 路由模式

RabbitMQ 有多种交换机类型(fanout、topic、direct 等)。

本项目用的是 Direct 路由模式:

  • 生产者发消息时带上 routing key
  • 交换机按 key 精确匹配,把消息路由到对应队列
  • 适合「一种事件类型 → 一个消费队列」的场景

3.2 拓扑结构

生产者(rpc-order)

──Publish──► Exchange: order.notify.exchange

│ routing key: order.grabbed

Queue: order.notify.queue

消费者(api-gateway) ◄──Consume──┘

对应常量:

  • 交换机:order.notify.exchange
  • 路由键:order.grabbed
  • 队列:order.notify.queue

3.3 事件结构

消息体是一个 JSON 对象,主要字段:

  • event:固定为driver_accepted,前端用来判断消息类型
  • user_id:乘客 ID,Gateway 用它查找 WebSocket 连接
  • order_no:订单号
  • driver_name:司机姓名
  • car_number:车牌号
  • car_type:车型
  • rating:司机评分

前端拿到这些字段可以直接渲染,不需要再调接口。

3.4 发布时机

发布放在 MySQL 落库成功之后,保证「通知出去时,数据库已经是已接单状态」。

调用链:

GrabOrder(Lua 抢单)

→ Redis Stream

→ OrderGrabbedConsumer 消费

→ updateOrderGrabbed(MySQL)

→ publishOrderGrabbedNotify()

→ rmq.PublishOrderGrabbed()

设计要点:

  • 从 MySQL 查订单和司机信息,不依赖 Redis 缓存(抢单后缓存已被删除)
  • 发布失败只打日志,不影响 Stream 消费 ACK(通知与落库解耦)

3.5 Confirm 模式

发送端开启了 Publisher Confirm:

  • 消息发出后等待 Broker 的 ACK
  • 确保 RabbitMQ 确实收到了
  • 避免「以为发出去了,实际丢了」

四、WebSocket:Hub 连接管理

4.1 为什么用 Hub 模式

WebSocket 连接散落在各个 HTTP 请求里,需要一个中心来统一管理。

本项目用的是经典的 Hub 模式:

  • 数据结构:map[userId] → WebSocket 连接
  • 线程安全:RWMutex保护 map,单连接写操作用Mutex保护
  • 职责:注册连接、移除连接、按 userId 推送消息

4.2 连接建立流程

路由:GET /ws/order?token=xxx

流程:

  1. 从 URL 参数或 Header 取 JWT token
  2. 解析 token 得到userId
  3. HTTP Upgrade 为 WebSocket
  4. hub.Register(userId, conn)注册连接
  5. 阻塞ReadMessage保持连接,断开时hub.Unregister

为什么 token 放 query 参数?

浏览器 WebSocket 无法自定义 Header,所以 token 只能放 URL 里。

4.3 心跳保活

  • 服务端每 30 秒发 Ping 帧
  • 60 秒无响应则断开连接
  • 防止「僵尸连接」占用 Hub 内存

4.4 重复连接处理

同一userId重复连接时,关闭旧连接,只保留最新一条。

避免多端登录导致重复推送。

4.5 Unregister 的坑

移除连接时,要校验 conn 指针是否匹配,避免误删新连接:

旧连接断开 → 触发 Unregister

但此时用户可能已经建立了新连接

如果不校验指针,会把新连接也删掉


五、MQ 消费者 → WebSocket 推送

api-gateway 启动时,在 HTTP 服务之前启动 RabbitMQ 消费者:

  1. 连接 RabbitMQ,绑定order.notify.queue
  2. 收到消息后解析OrderGrabbedEvent
  3. 调用hub.Push(userId, body)推送给在线乘客
  4. 手动 Ack 确认消费

推送逻辑:

  • 乘客在线 → 写入 WebSocket → 返回 true
  • 乘客离线 → 返回 false,MVP 方案直接 Ack,不做离线补偿

当前是 单实例 Gateway MVP 方案:

Hub 存在内存里,多实例部署需要额外方案(如 Redis Pub/Sub 广播到各实例 Hub)。


六、完整时序

乘客 api-gateway RabbitMQ rpc-order

│ │ │ │

│── GET /ws/order ────────►│ │ │

│◄── WebSocket 建立 ───────│ │ │

│ │ hub.Register(userId) │ │

│ │ │ │

│ │ │ │◄── 司机抢单

│ │ │ │── MySQL 落库

│ │ │◄── Publish ───────│

│ │◄── Consume ──────────│ │

│ │ hub.Push(userId) │ │

│◄── JSON 推送 ────────────│ │ │

│ {event: driver_accepted}│ │ │


七、关键设计决策

7.1 为什么不用 HTTP 轮询

WebSocket 是全双工长连接,服务端可以主动推。

对于「等司机接单」这种实时场景,比轮询更合适。

7.2 为什么 rpc-order 不直接推 WebSocket

  • 微服务职责分离:order 服务管订单,gateway 管连接
  • order 服务不知道乘客连在哪台 gateway 上
  • MQ 解耦:order 只管发事件,gateway 只管推连接

7.3 离线不补偿

乘客没开等待页或已离开,MQ 消息仍然 Ack。

后续可以扩展:

  • 离线消息存 Redis / DB
  • 乘客下次登录时拉取
  • 或配合 App Push 通知

7.4 手动 Ack vs 自动 Ack

消费者用手动 Ack:

  • 业务处理成功才确认
  • 失败可以 Nack 重新入队
  • JSON 格式错误等不可恢复的错误,直接 Ack 丢弃,避免无限重试

八、今天学到的核心知识点

  1. RabbitMQ Direct 路由模式:Exchange + Routing Key + Queue 三件套
  2. Publisher Confirm:发送端确认 Broker 收到消息
  3. WebSocket Hub 模式:集中管理连接,按 userId 精准推送
  4. 跨服务实时通知:MQ 做桥梁,解耦生产者和推送端
  5. JWT + WebSocket:token 放 query 参数,因为浏览器 WS 不能自定义 Header
  6. 心跳保活:Ping/Pong + ReadDeadline,防止僵尸连接
  7. 并发安全:Hub 用 RWMutex,单连接写用 Mutex

九、后续可优化方向

  • Gateway 多实例:Hub 改 Redis Pub/Sub 或专用推送服务
  • 离线消息补偿:MQ 消费时落库,乘客上线后补推
  • RabbitMQ 连接池:当前每次发布建独立连接,生产环境应复用
  • 生产环境 CheckOrigin:当前开发环境允许所有跨域,上线需校验 Origin

十、总结

这套方案的本质是:用 RabbitMQ 打通微服务之间的实时通知链路,用 WebSocket 打通服务端到浏览器的最后一跳。

乘客体验上,从「每隔几秒刷新看看有没有司机接单」,变成「司机一点接单,页面立刻弹出通知」——这就是 RabbitMQ + WebSocket 在这个项目里的价值。

http://www.jsqmd.com/news/1093231/

相关文章:

  • dotnet 10 run file 支持多文件
  • JavaScript--错误处理
  • OpenClaw(龙虾)2026 最新安装部署终极指南
  • xref_data_to_array
  • CSDN博客-第1天-单神经元反向传播
  • 计算机二级基础知识-计算机体系结构
  • 中小微企业建站首选!PageAdmin CMS,零代码搞定官网运维
  • chunk重叠overlap设多少:切断上下文的坑
  • 支持多端生成的AI开发软件怎么选?功能对比指南
  • AI编程新范式:Skills技能库如何提升Claude、Cursor代码生成质量
  • AI Agent开发实战:从零构建一个能自主规划任务的智能体
  • Python学习笔记·第24天:Pandas数据清洗——缺失值、重复值与透视表实战
  • 使用visual studio和ai制作ppt
  • AI 学习助手:基于 HarmonyOS ArkTS 的智能学习伴侣开发实践
  • 第一批被龙虾气到的人出现了
  • Vue3 项目从开发到上线:环境变量、打包优化与 Nginx 部署全流程
  • 相处的艺术:尊重与边界
  • 企业知识图谱的拐点: 当本体工程遇上 LLM 与 MCP
  • Spring Boot 自定义 Starter 机制
  • GPT-5.6 Sol预览解读:max推理、ultra多Agent与分层安全栈
  • 剑指offer-79、最⻓不含重复字符的
  • Codex Linux 教程:从安装配置到卸载清理全流程指南
  • 基于Anthropic-Cybersecurity-Skills构建网络安全AI智能体实战指南
  • FontForge字体设计完全指南:从入门到精通掌握专业字体编辑
  • GPT-5.6系列模型发布遇阻:OpenAI面临多国监管审批,Claude Fable 5重返引发全球讨论
  • Vibe Coding 实战复盘:一个人 + AI,从零打造会聊天的个人主页
  • 关于多线程归并排序的性能瓶颈与优化方案的技术7
  • HFSS求解设置实战解析:从驱动求解到本征模求解的核心配置
  • 数据中心电力模块的发展趋势对数据中心建设有哪些影响?
  • 目前自动评价系统问题---------会卡在一些异常的地方