【Redis从入门到精通】第54篇:发布订阅实战——实时消息推送、聊天室、事件通知
上一篇【第53篇】Redis发布订阅——消息队列的轻量替代方案
下一篇【第55篇】Redis事务——MULTI/EXEC/DISCARD/WATCH详解
上一篇我们搞懂了Redis Pub/Sub的内部原理,今天轮到真刀真枪了。我们会用实际代码实现几个经典场景,再跟Kafka、RabbitMQ来一场"正面PK",最后说说在Cluster模式下用Pub/Sub需要注意的那些坑。
实战一:聊天室
最经典的Pub/Sub应用场景就是聊天室。频道就是聊天室,订阅就是加入聊天室,发布就是发消息。
Python 实现
importredisimportthreadingimporttime r=redis.Redis(host='localhost',port=6379,decode_responses=True)defchat_listener(username,room):"""监听聊天室消息的线程"""pubsub=r.pubsub()pubsub.subscribe(room)print(f"[{username}] 已加入聊天室:{room}")print(f"[{username}] 等待消息中... (Ctrl+C 退出)")try:formessageinpubsub.listen():ifmessage['type']=='message':sender,content=message['data'].split(':',1)ifsender!=username:print(f"\r[{room}]{sender}:{content}")print(f"[{username}] > ",end='',flush=True)exceptKeyboardInterrupt:print(f"\n[{username}] 正在离开聊天室...")finally:pubsub.unsubscribe(room)pubsub.close()defsend_message(username,room):"""发送消息的循环"""whileTrue:try:msg=input(f"[{username}] > ")ifmsg.strip().lower()=='/quit':breakfull_msg=f"{username}:{msg}"r.publish(room,full_msg)except(EOFError,KeyboardInterrupt):breakif__name__=='__main__':importsys username=sys.argv[1]iflen(sys.argv)>1else'匿名用户'room=sys.argv[2]iflen(sys.argv)>2else'general'# 启动监听线程listener=threading.Thread(target=chat_listener,args=(username,room),daemon=True)listener.start()# 主线程负责发送消息send_message(username,room)print(f"[{username}] 已退出聊天室")运行效果:
# 终端1:$ python chat.py 张三 general[张三]已加入聊天室: general[张三]等待消息中...[张三]>大家好![general]李四: 张三好!# 终端2:$ python chat.py 李四 general[李四]已加入聊天室: general[李四]等待消息中...[general]张三: 大家好![李四]>张三好!私聊功能
私聊就是用用户ID作为频道名:
defsend_private_message(sender,receiver,content):"""发送私聊消息"""channel=f"private:{receiver}"message=f"{sender}:{content}"# 返回0说明用户不在线receivers=r.publish(channel,message)ifreceivers==0:print(f"[系统] 用户{receiver}不在线,消息已丢失")deflisten_private_messages(username):"""监听私聊消息"""channel=f"private:{username}"pubsub=r.pubsub()pubsub.subscribe(channel)formessageinpubsub.listen():ifmessage['type']=='message':print(f"\r[私聊]{message['data']}")实战二:实时消息推送
在实际业务中,最常见的场景是推送通知——比如订单状态变更、系统告警等。
实时推送架构 ┌──────────┐ 订单创建 ┌──────────┐ PUBLISH ┌──────────┐ │ 订单服务 │ ──────────→ │ Redis │ ──────────→ │ 用户连接 │ │ │ order:123 │ │ │ WebSocket │ └──────────┘ │ order:123 │ │ SSE │ └──────────┘ │ Long Poll │ └──────────┘ 频道命名规范: user:{userId}:notifications → 用户通知 order:{orderId}:events → 订单事件 system:alerts → 系统告警 realtime:stock:{code} → 实时股价Java + Spring Data Redis 实现
@ConfigurationpublicclassRedisPubSubConfig{@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory,MessageListenerAdapterlistenerAdapter){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅用户通知频道(使用模式订阅)container.addMessageListener(listenerAdapter,newPatternTopic("user.*:notifications"));returncontainer;}@BeanMessageListenerAdapterlistenerAdapter(NotificationListenerlistener){returnnewMessageListenerAdapter(listener);}}@ComponentpublicclassNotificationListenerimplementsMessageListener{// 注入 WebSocket 推送服务@AutowiredprivateWebSocketPushServicepushService;@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){Stringchannel=newString(message.getChannel(),StandardCharsets.UTF_8);Stringbody=newString(message.getBody(),StandardCharsets.UTF_8);// 从频道名提取用户ID: "user:10086:notifications" → "10086"StringuserId=channel.split(":")[1];// 通过WebSocket推送给用户pushService.sendToUser(userId,body);System.out.printf("通知推送: 用户=%s, 消息=%s%n",userId,body);}}@ServicepublicclassOrderService{@AutowiredprivateStringRedisTemplateredisTemplate;publicvoidcreateOrder(StringorderId,StringuserId){// ... 创建订单逻辑 ...// 发布订单事件通知Stringchannel="user:"+userId+":notifications";Stringnotification=String.format("{\"type\":\"order_created\",\"orderId\":\"%s\"}",orderId);redisTemplate.convertAndSend(channel,notification);}}实战三:与Keyspace通知结合的事件系统
Redis的Keyspace通知可以把键的变化事件通过Pub/Sub广播出来:
importredisimportjsonimportthreading r=redis.Redis(host='localhost',port=6379,decode_responses=True)# 开启键事件通知r.config_set('notify-keyspace-events','KEA')defevent_listener():"""监听所有键事件"""pubsub=r.pubsub()# 订阅db0的所有事件pubsub.psubscribe('__keyevent@0__:*')print("事件监听器已启动...")formessageinpubsub.listen():ifmessage['type']=='pmessage':event=message['channel'].split(':')[-1]key=message['data']print(f"[事件]{event}→ key:{key}")# 根据事件类型做不同处理ifevent=='expired':handle_key_expired(key)elifevent=='set':handle_key_set(key)elifevent=='del':handle_key_deleted(key)defhandle_key_expired(key):"""处理key过期事件"""print(f" → Key过期:{key}")# 可以触发缓存刷新、超时处理等逻辑defhandle_key_set(key):"""处理key设置事件"""print(f" → Key被设置:{key}")defhandle_key_deleted(key):"""处理key删除事件"""print(f" → Key被删除:{key}")# 启动监听thread=threading.Thread(target=event_listener,daemon=True)thread.start()踩坑提示:Keyspace通知的事件不保证精确送达。特别是过期事件,Redis使用惰性删除 + 定期删除两种策略,过期事件的触发时机可能延迟几秒甚至更久。不要用它做精确的超时控制,比如"30秒后自动取消订单"——用Timer或延迟队列更靠谱。
发布订阅 vs 专业消息队列
很多同学会把Redis Pub/Sub当成消息队列来用。它确实可以做消息传递,但跟专业消息队列比起来,差距还是明显的。
全面对比表格
| 对比维度 | Redis Pub/Sub | Redis Stream | Kafka | RabbitMQ |
|---|---|---|---|---|
| 消息持久化 | 不持久化 | 持久化 | 持久化(磁盘) | 持久化 |
| 离线消费 | 不支持 | 支持 | 支持 | 支持 |
| 消息回溯 | 不支持 | 支持 | 支持(offset) | 有限 |
| ACK确认 | 无 | 有(XACK) | 有(手动/自动) | 有(手动/自动) |
| 消息堆积 | 不支持 | 支持 | 支持(大量) | 支持 |
| 消息顺序 | 有序 | 有序 | 分区有序 | 队列有序 |
| 消费组 | 不支持 | 支持 | 支持 | 支持 |
| 延迟 | 极低(<1ms) | 低 | 低(几ms) | 低 |
| 吞吐量 | 高 | 高 | 极高 | 高 |
| 消息不丢保证 | 不保证 | 保证 | 保证 | 保证 |
| 运维复杂度 | 零 | 低 | 高 | 中 |
| 功能丰富度 | 低 | 中 | 高 | 高 |
选型建议
消息系统选型决策 需要消息持久化/ACK/堆积? ├─ 否 ──→ 消息量大小? │ ├─ 小(<1万/秒)──→ Redis Pub/Sub ✓ │ └─ 大(>1万/秒)──→ Redis Pub/Sub(注意缓冲区) │ └─ 是 ──→ 团队有运维Kafka能力? ├─ 是 ──→ 数据量/吞吐量很大? │ ├─ 是 ──→ Kafka ✓ │ └─ 否 ──→ RabbitMQ / Redis Stream └─ 否 ──→ Redis Stream ✓(最简单的持久化消息方案)Redis Stream:Pub/Sub的持久化升级版
如果你觉得Pub/Sub的消息丢失问题太严重,但又不想引入Kafka的复杂度,可以看看Redis 5.0引入的Redis Stream。
# 创建消费者组(类似Kafka的消费组)XGROUP CREATE mystream mygroup $ MKSTREAM# 生产者发送消息XADD mystream * user:张三 action:login timestamp:1700000000# 返回: "1700000000000-0"# 消费者读取消息XREADGROUP GROUP mygroup consumer1 COUNT1STREAMS mystream># 确认消息处理完成(ACK)XACK mystream mygroup1700000000000-0Redis Stream vs Pub/Sub 核心区别:
Pub/Sub: Stream: PUBLISH ──→ [Redis] ──→ 在线订阅者 ↓ 消息消失 离线订阅者 → 什么也收不到 VS XADD ──→ [Stream] ──→ 在线消费者 ↓ 消息持久化 离线消费者 → 上线后可以消费历史消息Java 中使用 Lettuce 实现发布订阅
Lettuce 原生Pub/Sub
publicclassLettucePubSubExample{publicstaticvoidmain(String[]args){RedisClientclient=RedisClient.create("redis://localhost:6379");StatefulRedisConnection<String,String>connection=client.connect();// 开启Pub/Sub需要单独的连接StatefulRedisPubSubConnection<String,String>pubsubConnection=client.connectPubSub();// 添加消息监听器pubsubConnection.addListener(newRedisPubSubAdapter<String,String>(){@Overridepublicvoidmessage(Stringchannel,Stringmessage){System.out.printf("收到消息 [%s]: %s%n",channel,message);}@Overridepublicvoidsubscribed(Stringchannel,longcount){System.out.printf("已订阅频道: %s (当前订阅数: %d)%n",channel,count);}});// 异步订阅RedisPubSubAsyncCommands<String,String>async=pubsubConnection.async();async.subscribe("news","sports","weather");// 发布消息(使用普通连接)RedisAsyncCommands<String,String>commands=connection.async();commands.publish("news","今天是个好日子").thenAccept(receivers->{System.out.printf("消息已发布,%d个订阅者收到%n",receivers);});// 保持运行try{Thread.sleep(10000);}catch(InterruptedExceptione){}}}多实例部署下的 Pub/Sub 注意事项
Cluster 模式下的 Pub/Sub 限制
在Cluster模式下,Pub/Sub有一个重要的限制:PUBLISH命令只会在消息所在分片的节点上广播。
Cluster 模式下 Pub/Sub 的问题 集群有3个分片: 分片1 (Master-A): 槽 0-5460 分片2 (Master-B): 槽 5461-10922 分片3 (Master-C): 槽 10923-16383 Subscriber-1 连接到 Master-A,订阅 "news" Subscriber-2 连接到 Master-B,订阅 "news" Subscriber-3 连接到 Master-C,订阅 "news" Publisher 连接到 Master-A,执行 PUBLISH news "hello" 结果: → Subscriber-1 收到消息 ✓ (在Master-A上) → Subscriber-2 没收到 ✗ (在Master-B上,不在同一个分片) → Subscriber-3 没收到 ✗ (在Master-C上,不在同一个分片)踩坑提示:这是Cluster模式下使用Pub/Sub最大的坑!很多同学在开发测试环境(Standalone)没问题,上了集群就发现消息丢失,就是因为这个原因。
解决方案:Sharded Pub/Sub(Redis 7.0+)
Redis 7.0引入了Sharded Pub/Sub,让消息在同一个分片的节点之间正确广播:
# Redis 7.0+ 新增命令SSUBSCRIBE channel[channel...]# 分片订阅SUNSUBSCRIBE[channel...]# 分片退订SPUBLISH channel message# 分片发布Sharded Pub/Sub的特点:
- 频道被映射到具体的分片(类似Key的槽位映射)
- 只有在同一个分片上的订阅者才会收到消息
- 适合需要分片内广播的场景
对于全集群广播的需求,Redis 7.0之前没有好方案,Redis 7.0的普通Pub/Sub仍然只在本地分片有效。如果确实需要全集群广播,可以考虑:
全集群广播方案 方案1: 在每个分片的Master上都执行一次PUBLISH ┌──────────────┐ │ App Server │ │ │── PUBLISH → Master-A (分片1) │ │── PUBLISH → Master-B (分片2) │ │── PUBLISH → Master-C (分片3) └──────────────┘ 方案2: 使用外部消息中间件(如Kafka)做全局广播 ┌──────────────┐ ┌─────────┐ ┌──────────────┐ │ App Server │ ──→ │ Kafka │ ──→ │ 各分片Redis │ └──────────────┘ └─────────┘ └──────────────┘Pub/Sub 的性能考量
客户端输出缓冲区
当订阅者处理消息的速度跟不上发布速度时,消息会在客户端的输出缓冲区中堆积。如果缓冲区超过限制,Redis会强制断开这个客户端。
# 查看客户端配置CONFIG GET client-output-buffer-limit# 默认值:# 1) "client-output-buffer-limit"# 2) "normal 0 0 0 slave 268435456 67108864 60 pubsub 33554432 8388608 60"## pubsub类型:# hard limit: 32MB (33554432)# soft limit: 8MB (8388608), 持续60秒# 调整Pub/Sub客户端的缓冲区限制CONFIG SET client-output-buffer-limit"pubsub 64mb 16mb 120"高频发布的影响
# 测试: 10万QPS的PUBLISH# 在一个终端:$ redis-benchmark-tpublish-P100-c50-n100000# 结果: ~110000 requests/sec# 但如果订阅者处理慢:# → 输出缓冲区持续增长# → 最终触发断连# → 订阅者被踢掉,消息全部丢失性能最佳实践:
| 建议 | 说明 |
|---|---|
| 保持订阅者处理速度快于发布速度 | 最重要的一点 |
合理设置client-output-buffer-limit | 根据消息大小和预期QPS调整 |
| 使用连接池管理Pub/Sub连接 | 不要和常规命令混用同一个连接 |
| 监控Pub/Sub相关指标 | pubsub_channels数量、模式订阅数量 |
| 大量频道时避免模式订阅 | 模式订阅是O(N)遍历,频道多时性能差 |
本章小结
Redis Pub/Sub是一个"够用就好"的轻量级消息方案:
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 实时通知推送 | Pub/Sub | 延迟低,实现简单 |
| 聊天室 | Pub/Sub | 天然适合 |
| 实时配置更新 | Pub/Sub+ Keyspace通知 | 零额外组件 |
| 任务队列 | Redis Stream / 消息队列 | 需要持久化和ACK |
| 事件溯源 | Kafka | 需要消息回溯和长期存储 |
| 大流量消息总线 | Kafka | 需要高吞吐和持久化 |
记住一个原则:Pub/Sub是通知机制,不是消息队列。用它做"实时通知",而不是"可靠投递"。如果业务对消息丢失零容忍,请果断选择Redis Stream或Kafka。
上一篇【第53篇】Redis发布订阅——消息队列的轻量替代方案
下一篇【第55篇】Redis事务——MULTI/EXEC/DISCARD/WATCH详解
