Redis Pub/Sub 与阻塞机制学习笔记
一、Redis Pub/Sub 基本概念
Redis Pub/Sub(Publish / Subscribe)是一种发布订阅消息模型。
角色:
Publisher (发布者) | v Redis | v Subscriber (订阅者)流程:
- 订阅者订阅频道
- 发布者向频道发布消息
- Redis 将消息推送给所有订阅者
示例代码:
import redis r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True) pubsub = r.pubsub() pubsub.subscribe("channel1") for msg in pubsub.listen(): if msg["type"] == "message": print(msg["data"])发布消息:
r.publish("channel1", "hello")二、Redis Pub/Sub 在 Redis 中的存储形式
一个非常重要的点:
Pub/Sub 频道不是 Redis Key。
它不会存储在 Redis 数据库中:
不是 key 不是 value 不会持久化 不会进入 db=0Redis 内部只维护内存中的订阅关系表。
内部结构可以理解为:
dict<channel, subscribers>示例:
channel1 | +--- client1 +--- client2发布消息时:
publish(channel1, hello) Redis查找 channel1 | +-- client1 +-- client2 发送消息如果没有订阅者:消息直接丢弃
三、查看 Pub/Sub 状态
Redis 提供命令查看当前频道。
查看频道:
PUBSUB CHANNELS示例:
127.0.0.1:6379> PUBSUB CHANNELS 1) "channel1"查看订阅数量:
PUBSUB NUMSUB channel1示例:
channel1 1四、listen() 的工作原理
for msg in pubsub.listen():这里的listen()是阻塞迭代器。
本质逻辑类似:
while True: msg = socket.recv() yield msg流程:
等待 socket 数据 | v Redis publish | v socket 收到消息 | v 返回给 for 循环五、什么是阻塞迭代器
1 迭代器
Python 迭代器是可以被for循环遍历的对象:
for x in iterator:底层相当于:
iterator = iter(obj) while True: x = next(iterator)2 阻塞
阻塞指:
没有数据时 程序暂停等待例如:input() 程序会等待输入。
Redis 的listen()也是类似:
没有消息 → 等待 有消息 → 返回六、listen() 返回的数据结构
收到消息时返回:
{ 'type': 'message', 'pattern': None, 'channel': 'channel1', 'data': 'hello' }字段说明:
字段 | 含义 |
type | 消息类型 |
channel | 频道 |
data | 消息内容 |
pattern | 模式订阅 |
七、为什么阻塞不会导致整个系统卡死
问题核心:
部分代码阻塞,为什么整个系统不阻塞?
解决方法是并发模型。
常见三种方式:
多线程 多进程 异步IO八、多线程模型
将阻塞任务放入子线程:
import threading threading.Thread(target=listen).start()结构:
进程 │ ├─ 主线程 │ └─ 子线程(阻塞等待)特点:
子线程阻塞 不会影响主线程九、多进程模型
uvicorn.run(..., workers=4)结构:
主进程 │ ├─ worker1 ├─ worker2 ├─ worker3 └─ worker4即使某个 worker 阻塞:
其他 worker 仍然可以处理请求十、异步 IO(asyncio)
现代系统大量使用事件循环模型。
示例:
async def task(): await asyncio.sleep(5)await表示:
当前任务等待IO 先去执行其他任务结构:
一个线程 | 事件循环 | ├─ task1 ├─ task2 └─ task3十一、IO 多路复用
异步 IO 的底层依赖操作系统机制:
epoll (Linux) kqueue (Mac) IOCP (Windows)逻辑:
线程询问内核: 哪些 socket 有数据?流程:
无数据 → 继续处理其他任务 有数据 → 再处理该任务这样线程不会被 IO 阻塞。
十二、Redis 为什么性能很高
Redis 是单线程模型,但能达到:
10万+ QPS原因:
IO 多路复用 epoll 内存数据库执行模型:
事件循环 | v 处理 socket 事件十三、Pub/Sub 的生产环境用途
适合:
实时广播 消息通知 聊天系统 配置更新示例:
用户注册 | publish user_register | +--- 邮件服务 +--- 优惠券服务 +--- 统计服务十四、Pub/Sub 的缺点
Pub/Sub不适合做任务队列。
原因:
问题 | 说明 |
消息丢失 | 没人订阅就丢 |
不可重试 | 没有 ack |
无持久化 | 不存储消息 |
