【Redis从入门到精通】第67篇:Redis Stream——终于有了真正的消息队列
上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数
Redis作者antirez(在还没退休的时候)曾经说:“Stream是Redis有史以来最复杂的命令集。”
开发者:“终于啊!再也不用拿List冒充消息队列了。”
运维:“所以我们以后还要单独部署Kafka吗?”
架构师:“看情况。Stream长得像Kafka,但它骨子里还是Redis。”
如果说之前的List、Pub/Sub、ZSet都只是"借用"Redis来做消息队列,那Stream就是Redis官方亲自下场,直接给你造了一个消息队列数据结构。它在设计上向Kafka致敬,但保持了Redis的轻量和低延迟基因。本文将深入Stream的每一个毛孔,让你从"会用"到"懂为什么这么设计"。
一、Redis Stream的设计背景
1.1 为什么Redis 5.0要引入Stream?
在Stream出现之前,用Redis做消息队列是一场"拼凑游戏":
Redis做MQ的"史前时代": 方式1:List(BRPOP) ┌─────────────┐ │ 能干活 │ 缺点: │ 但要自己搞ACK │ - 消息确认需要自己实现 │ 要自己搞重试 │ - 不能回溯历史消息 └─────────────┘ - 多消费者没法按组消费 方式2:Pub/Sub ┌─────────────┐ │ 能广播 │ 缺点: │ 但消息不会持久 │ - 掉线期间消息全丢 │ 离线全丢 │ - 没有消费者组概念 └─────────────┘ 方式3:ZSet ┌─────────────┐ │ 能做延迟队列 │ 缺点: │ 但要自己轮询 │ - 高频轮询浪费资源 └─────────────┘ - 没有原生ACKantirez在设计Stream时,同时参考了Kafka的分区日志和Redis的简洁哲学,最终选择了一种"追加日志+消费者组"的结构。它不是Kafka的替代品,而是填补了"想要消息队列但不想引入重量级中间件"这个需求空白。
1.2 Stream的数据模型
从宏观上看,一个Stream就是一个**只追加(append-only)**的日志文件:
Stream = 一个逻辑上的消息序列 ─────────────────────────────────────────────────────────► 时间轴 [msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8] ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ 每条消息包含: ┌─────────────────────────────┐ │ ID: 1716700000000-0 │ ← 毫秒时间戳-序号 │ 字段1: value1 │ │ 字段2: value2 │ ← 键值对(KV格式) │ 字段3: value3 │ └─────────────────────────────┘ 关键特性: ✓ 消息追加到末尾,不修改不删除(除非手动DEL/XTRIM) ✓ ID保证了全局顺序 ✓ 不同消费者以不同进度消费同一条Stream ✓ 消费者组可以追踪每个消费者的进度二、Stream ID:时间与序号的精妙编码
2.1 ID格式
Stream的每条消息都有一个全局唯一的ID,格式为:
Stream ID = <millisecondsTime>-<sequenceNumber> 示例:1716700000000-0 ┌──────────────┐ ┌┐ │ 1716700000000 │0│ └──────┬───────┘ └┤ │ │ UNIX毫秒时间戳 序列号(同毫秒内的微调) (2024-05-26 (从0开始自增) 06:00:00.000)这个设计有两个精妙之处:
- 时间可排序:毫秒时间戳本身就保证了大致的时间顺序
- 同毫秒防冲突:序列号保证同一毫秒内的多条消息ID不重复
# ID生成规则演示# 使用 * 让Redis自动生成ID127.0.0.1:6379>XADD mystream * name"Tom"age"25""1716700000000-0"# 1ms后的消息127.0.0.1:6379>XADD mystream * name"Jerry"age"30""1716700000001-0"# 时间戳自动+1# 同1ms内多条消息127.0.0.1:6379>XADD mystream * msg"a"# 序列号=0"1716700000002-0"127.0.0.1:6379>XADD mystream * msg"b"# 序列号=1"1716700000002-1"127.0.0.1:6379>XADD mystream * msg"c"# 序列号=2"1716700000002-2"# 也可以手动指定ID(不推荐,除非有特殊需求)127.0.0.1:6379>XADD mystream1716700000003-0 msg"manual""1716700000003-0"2.2 ID的约束规则
ID规则: 1. 新ID必须大于Stream中当前最大ID → 保证消息严格追加、不会插入到历史位置 → 如果客户端指定了一个小于等于最大ID的值,会报错 2. 时间戳部分必须 >= 当前最大ID的时间戳 → 防止时钟回拨造成ID错乱 3. 如果时间戳相同,序列号必须 > 同时间戳的最大序列号 → 保证同一毫秒内的严格顺序 4. 最小ID:0-0(XRANGE用 - 表示) 5. 最大ID:理论上无穷(XRANGE用 + 表示)三、基础命令:写入与读取
3.1 XADD:添加消息
# 基本语法XADD stream_key[MAXLEN ~ count][NOMKSTREAM]<ID>field value[field value...]# 添加一条消息127.0.0.1:6379>XADD orders * order_id"1001"user_id"U001"amount"99.9""1716700000000-0"# 查看Stream长度127.0.0.1:6379>XLEN orders(integer)1# 限制Stream长度(约等于最近1000条,trim效率优化)127.0.0.1:6379>XADD orders MAXLEN ~1000* order_id"1002"amount"58.0""1716700000001-0"MAXLEN ~中的~表示"近似裁剪"——Redis可能在Stream节点层面做性能优化,实际保留的消息数可能略多于1000条。如果要求精确,去掉~即可,但性能会下降。
3.2 XRANGE / XREVRANGE:范围查询
# 查询所有消息(-表示最小ID,+表示最大ID)127.0.0.1:6379>XRANGE orders - + COUNT51)1)"1716700000000-0"2)1)"order_id"2)"1001"3)"user_id"4)"U001"5)"amount"6)"99.9"2)1)"1716700000001-0"2)1)"order_id"2)"1002"3)"amount"4)"58.0"# 查询指定时间范围127.0.0.1:6379>XRANGE orders17167000000001716700001000COUNT100# 反向查询(最新的在前)127.0.0.1:6379>XREVRANGE orders + - COUNT103.3 XREAD:读取消息(非消费者组模式)
# 从最早的消息开始读取127.0.0.1:6379>XREAD COUNT2STREAMS orders0-0# └─ 起始ID:0-0=最早# 阻塞读取新消息(等5秒)127.0.0.1:6379>XREAD BLOCK5000STREAMS orders $# └─ 起始ID:$=只读最新# 同时读取多个Stream127.0.0.1:6379>XREAD COUNT5BLOCK0STREAMS orders payments $XREAD的两种模式: 模式1:从头读(指定ID如0-0或特定ID) 用途:消费者首次启动,回溯历史消息 模式2:读新消息(使用$) 用途:消费者只关心后续消息,不关心历史 注意:XREAD不是消费者组模式! 它只是"读取"消息,不会追踪消费进度。 适合一次性消费或手动管理进度的场景。四、消费者组:Stream的"大师级"功能
4.1 消费者组模型
消费者组是Stream区别于List/PubSub的核心特性。它解决了多消费者协同消费的问题:
消费者组工作模型: Stream: orders ┌──────────────────────────────────────────────────┐ │ msg1 msg2 msg3 msg4 msg5 msg6 msg7 ... │ └──────────────────┬───────────────────────────────┘ │ 消费者组: "order-processors" │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │Consumer1│ │Consumer2│ │Consumer3│ │ 消费msg1 │ │ 消费msg2 │ │ 消费msg3 │ │ 消费msg4 │ │ 消费msg5 │ │ 消费msg6 │ │ 消费msg7 │ │ ... │ │ ... │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └── 每个消费者维护自己的 ─┘ 消费进度(PENDING列表) 关键规则: ✓ 同一条消息只被组内一个消费者消费 ✓ 不同消费者组可以独立消费同一条Stream ✓ 组内消费者可以动态增减(弹性伸缩)4.2 消费者组命令详解
# === 1. 创建消费者组 ===# XGROUP CREATE stream group id [MKSTREAM]127.0.0.1:6379>XGROUP CREATE orders order-processors0-0 MKSTREAM# └─ 组名 └─ 从最早的ID开始消费# └─ 如果Stream不存在则创建# 注意:id=0-0 从头消费;id=$ 只消费新消息# === 2. 消费者组读取消息 ===# XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key id127.0.0.1:6379>XREADGROUP GROUP order-processors consumer-1 COUNT2STREAMS orders># └─ 组名 └─ 消费者名 └─ > 表示"没确认过的消息"# (也接受指定ID和0)# > : 只发送从未交付给任何消费者的新消息# 0 : 发送该消费者pending中但未确认的消息# 指定ID : 从特定ID开始读取# === 3. 确认消息 ===# XACK stream group id [id ...]127.0.0.1:6379>XACK orders order-processors1716700000000-0(integer)1# 确认成功# === 4. 查看Pending列表 ===127.0.0.1:6379>XPENDING orders order-processors1)(integer)5# Pending消息总数2)"1716700000000-0"# 最小Pending ID3)"1716700000010-0"# 最大Pending ID4)1)1)"consumer-1"# 每个消费者的Pending数量2)"3"2)1)"consumer-2"2)"2"# 详细查看Pending消息127.0.0.1:6379>XPENDING orders order-processors - +101)1)"1716700000000-0"# ID2)"consumer-1"# 消费者3)(integer)60000# 空闲时间(毫秒)4)(integer)3# 被投递次数# === 5. 转移消息 ===# XCLAIM stream group consumer min-idle-time id [id ...]127.0.0.1:6379>XCLAIM orders order-processors consumer-2600001716700000000-0# └─ 组名 └─ 目标消费者 └─ 空闲阈值 └─ 消息ID# 含义:将空闲超过60秒的消息从consumer-1转移给consumer-24.3 消费者组的完整工作流
消息从生产到消费的完整生命周期: 1. 生产者发送消息 XADD orders * order_id "1001" amount "99.9" → 消息进入Stream,状态: 未分配 2. 消费者拉取消息 XREADGROUP GROUP order-processors worker-1 STREAMS orders > → 消息分配给worker-1,进入Pending列表 → 状态: 待确认(pending) 3. 消费者处理成功 业务逻辑处理... XACK orders order-processors "1716700000000-0" → 消息从Pending中移除 → 状态: 已确认(done) 4. 消费者处理失败(或者挂了) consumer-1 获取消息后崩溃(没有XACK) → 消息留在consumer-1的Pending列表中 → 空闲时间持续增长 → 状态: 挂起(stuck) 5. 其他消费者认领 XPENDING 发现某消息空闲超过阈值 XCLAIM orders order-processors worker-2 60000 "1716700000000-0" → 消息转移给worker-2,投递次数+1 → 状态: 重新分配五、Java实战:Spring Data Redis操作Stream
5.1 生产者代码
@ComponentpublicclassStreamProducer{@AutowiredprivateStringRedisTemplateredisTemplate;privatestaticfinalStringSTREAM_KEY="orders";// 发送消息publicStringsendMessage(Map<String,String>fields){// 构建RecordMapRecord<String,String,String>record=StreamRecords.newRecord().ofMap(fields).withStreamKey(STREAM_KEY);// XADD,自动分配IDRecordIdid=redisTemplate.opsForStream().add(record);// 限制Stream长度(防止内存溢出)redisTemplate.opsForStream().trim(STREAM_KEY,10000);// 保留最新1万条returnid.getValue();// 返回 "1716700000000-0"}// 批量发送publicvoidsendBatch(List<Map<String,String>>messages){messages.forEach(this::sendMessage);}}5.2 消费者代码(消费者组模式)
@ComponentpublicclassStreamConsumer{@AutowiredprivateStringRedisTemplateredisTemplate;privatestaticfinalStringSTREAM_KEY="orders";privatestaticfinalStringGROUP_NAME="order-processors";privatestaticfinalStringCONSUMER_NAME="worker-"+UUID.randomUUID().toString().substring(0,8);@PostConstructpublicvoidinit(){// 创建消费者组(如果不存在)try{redisTemplate.opsForStream().createGroup(STREAM_KEY,GROUP_NAME);}catch(RedisSystemExceptione){// 组已存在,忽略if(!e.getMessage().contains("BUSYGROUP")){throwe;}}// 启动消费循环startConsuming();}publicvoidstartConsuming(){newThread(()->{while(true){try{// 先处理Pending消息(之前没ACK的)processPending();// 再拉取新消息List<MapRecord<String,Object,Object>>messages=redisTemplate.opsForStream().read(Consumer.from(GROUP_NAME,CONSUMER_NAME),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(5)),StreamOffset.create(STREAM_KEY,ReadOffset.lastConsumed()));for(MapRecord<String,Object,Object>message:messages){try{// 处理业务逻辑processOrder(message.getValue());// 确认消息redisTemplate.opsForStream().acknowledge(STREAM_KEY,GROUP_NAME,message.getId());}catch(Exceptione){log.error("消息处理失败: {}",message.getId(),e);// 不ACK,等待XCLAIM转移或重试}}}catch(Exceptione){log.error("消费循环异常",e);try{Thread.sleep(1000);}catch(InterruptedExceptionie){}}}},"stream-consumer").start();}// 处理Pending消息(兜底机制)privatevoidprocessPending(){PendingMessagespendingMessages=redisTemplate.opsForStream().pending(STREAM_KEY,GROUP_NAME,PendingMessagesOptions.empty().range(Range.unbounded()).count(10));for(PendingMessagemsg:pendingMessages){// 空闲超过60秒的消息,重新处理if(msg.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofSeconds(60))>0){List<MapRecord<String,Object,Object>>claimed=redisTemplate.opsForStream().claim(STREAM_KEY,GROUP_NAME,CONSUMER_NAME,Duration.ofSeconds(60),RecordId.of(msg.getIdAsString()));for(MapRecord<String,Object,Object>record:claimed){try{processOrder(record.getValue());redisTemplate.opsForStream().acknowledge(STREAM_KEY,GROUP_NAME,record.getId());}catch(Exceptione){log.error("Pending消息重试失败: {}",record.getId(),e);}}}}}}⚠️ 注意:Spring Data Redis 3.x与2.x的Stream API变化较大。上面用的是3.x的API。如果你还在用2.x,需要参考对应版本的文档,核心逻辑是一样的,只是类名和方法签名略有不同。
六、Stream vs Kafka:相似与不同
6.1 架构对比
Redis Stream: ┌────────────────────────────────────────────┐ │ Redis 单节点(或Cluster) │ │ │ │ Stream: orders 消费者组: proc-1 │ │ ┌─────────────────┐ ┌──────┐ ┌──────┐ │ │ │ msg1 msg2 msg3 │ │Consumer1│Consumer2│ │ │ └─────────────────┘ └─────────┘ │ │ 外部存储 │ └────────────────────────────────────────────┘ 优势:部署简单,低延迟 劣势:单Stream,无分区,受单机内存限制 Kafka: ┌────────────────────────────────────────────┐ │ Kafka Broker集群 │ │ │ │ Topic: orders (3分区) │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Partition 0 │ │ Partition 1 │ ... │ │ │ 磁盘持久化 │ │ 磁盘持久化 │ │ │ └─────────────┘ └─────────────┘ │ │ 磁盘 = TB级存储 │ │ 消费者组 = 跨分区消费 │ └────────────────────────────────────────────┘ 优势:海量堆积、分区并行、持久化到磁盘 劣势:运维复杂、延迟略高6.2 关键差异
| 维度 | Redis Stream | Kafka |
|---|---|---|
| 分区/分片 | 无原生分区 | 分区是核心概念 |
| 存储 | 内存(RDB/AOF备份) | 磁盘(顺序写入) |
| 消息删除 | MAXLEN/XTRIM/手动DEL | 按时间或大小自动清理 |
| 单Stream吞吐 | 10万+/s | 百万+/s |
| 延迟 | <1ms | 2-10ms |
| 消费者进度 | 保存在Redis中 | 保存在Kafka内部topic |
| 消息回溯 | 支持(只要没被trim) | 支持(可配置保留时间) |
| 部署成本 | 低(你可能已有Redis) | 高(需独立集群) |
6.3 XTRIM:控制Stream长度
# 精确裁剪:保留最近1000条127.0.0.1:6379>XTRIM orders MAXLEN1000# 近似裁剪:性能更高,但可能多保留几十条127.0.0.1:6379>XTRIM orders MAXLEN ~1000# 按最小ID裁剪:删除小于指定ID的消息127.0.0.1:6379>XTRIM orders MINID1716700001000-0# XADD时同时限制长度(一步到位)127.0.0.1:6379>XADD orders MAXLEN ~1000* field value⚠️ 注意:XTRIM会删除消息!如果有消费者还没消费到这些消息,消息就永久丢失了。在设置MAXLEN时要确保所有消费者组都能在消息被trim之前消费完毕。建议给Stream预留足够的长度余量,比如你日均消费1000条,设置MAXLEN 5000比1000更安全。
七、总结
Redis Stream让Redis真正成为了一款"可以当消息队列用"的数据结构服务器。它的核心价值在于:
- 零额外成本:你很可能已经在用Redis了,Stream不需要新的部署
- 消费者组:解决了多消费者协同消费的难题
- 消息ACK:消费确认机制保障了消息可靠投递
- 低延迟:内存操作,延迟可以做到亚毫秒级
但它不是Kafka的替代品。面对TB级堆积、分区并行消费、严格顺序保证等场景,专业的消息中间件仍然是最佳选择。
记住一个判断标准:如果你的团队已经在维护Kafka/RabbitMQ,就别折腾Stream了;如果你没有MQ,且消息量不大(百万级/天以下),Stream是性价比最高的选择。
下一篇文章,我们将聊聊HyperLogLog——一个用12KB内存就能统计1亿用户的神奇数据结构。
上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数
