当前位置: 首页 > news >正文

基于WebScoket与RabbtiMQ实现的用户对话与信息持久化策略学习

基于WebScoket与RabbtiMQ实现的用户对话与信息持久化策略学习

前置基础知识

WebSocket通信流程
客户端发送http请求,请求头携带upgrade:websocket
服务器收到请求后若支持则返回101 switiching protocols
然后建立连接
核心处理:
onOpen:连接建立,记录连接的session
onMessage:收到消息,解析消息并绝对能够给谁会话
onClose:连接关闭,将在线连接名单中的对应用户剔除
onError:发生错误,处理异常

spring boot 整合webScoketHandler

第0步:编写拦截器:实现将用户信息从http域塞进拦截域

public class ChatInterceptor extends HttpSessionHandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {if (request instanceof ServletServerHttpRequest) {ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;// 假设你登录后把用户信息存放在了 Session 里User loginUser = (User) servletRequest.getServletRequest().getSession().getAttribute("user");if (loginUser != null) {// 存入 WebSocket 的 attributes,后续 Handler 可以直接拿到attributes.put("userId", loginUser.getId());}}return true;}
}

1,编写处理器Handler,将用户建立连接后的消息处理方式进行实现
利用ConcurrentHashMap 存储<userId,Session>
建立连接后,通过对应的session/http请求将用户id读取然后put到HashMap 即得到<userId,Session>
2,收到消息后,遍历Map实现广播
遍历hashmap中的session,将消息从发送者session中读取出来并判断其是否在线然后遍历剩余的map,发送消息
3 关闭连接,即将map中过期的session移除出去
包含了(open,message,close,error),open负责判断是否建立连接,message负责处理信息,close负责关闭连接,error负责处理异常

@Component
public class MemoryChatHandler extends TextWebSocketHandler {// 内存中的连接池:key 是用户 ID,value 是连接会话private static final Map<Long, WebSocketSession> onlineUsers = new ConcurrentHashMap<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {Long userId = (Long) session.getAttributes().get("userId");if (userId != null) {onlineUsers.put(userId, session);}}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 1. 解析前端传来的 JSON 消息 (例如使用 Gson 或 Jackson)// 格式预期:{"toId": 1001, "msg": "Hello!"}String payload = message.getPayload();JSONObject jsonObject = JSONUtil.parseObj(payload);Long toId = jsonObject.getLong("toId");String msg = jsonObject.getStr("msg");// 2. 从内存 Map 中检索目标用户的会话WebSocketSession targetSession = onlineUsers.get(toId);// 3. 转发if (targetSession != null && targetSession.isOpen()) {targetSession.sendMessage(new TextMessage("来自好友的消息:" + msg));} else {session.sendMessage(new TextMessage("系统通知:目标用户不在线"));}}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) {Long userId = (Long) session.getAttributes().get("userId");onlineUsers.remove(userId);}
}

第二步:注册配置

配置对应的webscoketConfig应该去找哪一个处理器(Handler)
区别:
点对点和广播的区别:
点对点需要在处理消息那部分,根据UserId判断是否进行消息的发送,而广播则是直接将连接的session进行发送即可
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate MemoryChatHandler memoryChatHandler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(memoryChatHandler, "/chat").addInterceptors(new ChatInterceptor()) // 注入拦截器.setAllowedOrigins("*");}
}
RabbitMQ的基本概念和使用方法

rabbitMQ:一个开源的消息中间件,基于AMQP(高级消息对列协议),用于在不同系统或者服务之间传递消息,使用Erlang语言编写
核心功能就是充当消息的中间人,在消费者和生产者之间传递消息。
核心概念:
生产者:负责发送消息到rabbitMq中
交换机:接受生产者的消息并根据路由规则分发到对列
对列:存储消息,等待消费者处理
消费者:从对列中去除消息并进行处理
rabbitMQ的主要特点:
消息应答:确保消息被成功处理后才从队列中移除
持久化:支持消息存储到磁盘,防止数据丢失
灵活的路由:通过绑定建和路由规则实现消息的精准分支
高并发支持:适用于大规模分布式系统
消息对列的应用场景:
服务解耦:通过消息对列实现对立运行,避免直接依赖
异步通信:用户注册后发送短信等,主线程只用将任务写入对列,其他服务异步处理
流量消峰:秒杀/抢购活动中,瞬时高并发请求写入对列,按系统处理能力逐步消费,避免系统崩溃

消息发送与处理模式
简单模式:一个生产者,一个对列,一个消费者;适用于基础的任务分发
工作对列模式:一个生产者,一个对列,多个消费者:消息轮询分发,一条消息只能被一个消费者处理(适用于高并发任务)
交换机模式

  • 广播模式:不看key,给交换机的消息分发到所有绑定的对列
  • 路由模式:根据精确的routing key匹配(私聊核心)
  • 主题模式:根据通配符(user.*)匹配特定主题(灵活分组)
    spring boot集成RabbitMQ步骤:配,建,发,收
    在pom文件添加依赖 : spring-boot-starter-amqp
    在yml配置文件设定基础配置:
    spring:
    rabbitmq:
    host: localhost # ip地址
    port: 5672 # 端口号
    username: guest # 对列名
    password: guest # 对列密码

    开启生产者确认机制(后续可靠性设计必用)

    publisher-confirm-type: correlated
    定义组件(交换机/对列),建立绑定关系,发送消息,监听消费
    广播模式:消息发送到广播交换机后,会转发到所有绑定到他的对列上,忽略路由key
@Configuration
public class FanoutRabbitConfig {// 1. 定义 Fanout 交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange");}// 2. 定义两个队列,模拟两个不同的订阅者@Beanpublic Queue fanoutQueueA() { return new Queue("fanout.A"); }@Beanpublic Queue fanoutQueueB() { return new Queue("fanout.B"); }// 3. 绑定:将队列都绑定到这个交换机上@Beanpublic Binding bindingFanoutA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);}@Beanpublic Binding bindingFanoutB(Queue fanoutQueueB, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);}
}

直连模式:交换机根据routing key进行精确匹配,只有key完全一致的对列才会收到消息

@Configuration
public class DirectRabbitConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct.exchange");}@Beanpublic Queue orangeQueue() { return new Queue("queue.orange"); }// 绑定:只有 Routing Key 为 "orange" 的消息会进入此队列@Beanpublic Binding bindingOrange(Queue orangeQueue, DirectExchange directExchange) {return BindingBuilder.bind(orangeQueue).to(directExchange).with("orange");}
}

Topic主题模式(高级路由,分类订阅)
Routing key支持通配符
:匹配一个单次(如user.匹配 user.info)

:匹配零个或者多个单词(user.#,匹配user.info.login.log)

发送:
rabbitTemplate.convertAndSend("对列名","交换机名","发送的消息");

统一的消费者写法

@Component
public class ChatMessageReceiver {// 监听私聊队列@RabbitListener(queues = "queue.orange")public void receivePrivate(String message) {System.out.println("【私聊接收】内容: " + message);}// 监听群聊/广播队列@RabbitListener(queues = "fanout.A")public void receiveGroup(String message) {System.out.println("【群聊 A 接收】内容: " + message);}
}
基于websocket和RabbitMQ的消息持久化和异步实现

消息流动:前端 -》 websocket -》 RabbitMQ -》 消费者 -》 websocket -》 前端
实现流程:
发送阶段:用户a发送消息给b,server1收到消息后,不直接给b而是将消息包装好发送给rabbit MQ
路由阶段:RabbitMQ根据路由键(user.b),将消息投递到监听该key的所有对列服务器
接受阶段:所有服务器都监听MQ,只有发现用户b连接在自己身上的服务器,才会启动session,sendMessage
代码实现:
1 定义消息格式DTO

@Data
public class ChatMessage implements Serializable {private Long fromId;private Long toId;private String content;private Integer type; // 0-私聊, 1-群聊
}

2 使用webSocket作为生产者

@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {// 1. 解析前端消息ChatMessage msg = JSONUtil.toBean(message.getPayload(), ChatMessage.class);Long fromId = (Long) session.getAttributes().get("userId");msg.setFromId(fromId);// 2. 扔给 RabbitMQ 的 Direct 交换机// 路由键设计为 user.目标IDString routingKey = "user." + msg.getToId();rabbitTemplate.convertAndSend("chat.direct.exchange", routingKey, msg);}
}

websocket解决游览器与服务器之间的实时性问题
rabitMQ解决了服务器与服务器之间的消息传递