当前位置: 首页 > news >正文

【Redis从入门到精通】第67篇:Redis Stream——终于有了真正的消息队列

上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数


Redis作者antirez(在还没退休的时候)曾经说:“Stream是Redis有史以来最复杂的命令集。”
开发者:“终于啊!再也不用拿List冒充消息队列了。”
运维:“所以我们以后还要单独部署Kafka吗?”
架构师:“看情况。Stream长得像Kafka,但它骨子里还是Redis。”

如果说之前的List、Pub/Sub、ZSet都只是"借用"Redis来做消息队列,那Stream就是Redis官方亲自下场,直接给你造了一个消息队列数据结构。它在设计上向Kafka致敬,但保持了Redis的轻量和低延迟基因。本文将深入Stream的每一个毛孔,让你从"会用"到"懂为什么这么设计"。

一、Redis Stream的设计背景

1.1 为什么Redis 5.0要引入Stream?

在Stream出现之前,用Redis做消息队列是一场"拼凑游戏":

Redis做MQ的"史前时代": 方式1:List(BRPOP) ┌─────────────┐ │ 能干活 │ 缺点: │ 但要自己搞ACK │ - 消息确认需要自己实现 │ 要自己搞重试 │ - 不能回溯历史消息 └─────────────┘ - 多消费者没法按组消费 方式2:Pub/Sub ┌─────────────┐ │ 能广播 │ 缺点: │ 但消息不会持久 │ - 掉线期间消息全丢 │ 离线全丢 │ - 没有消费者组概念 └─────────────┘ 方式3:ZSet ┌─────────────┐ │ 能做延迟队列 │ 缺点: │ 但要自己轮询 │ - 高频轮询浪费资源 └─────────────┘ - 没有原生ACK

antirez在设计Stream时,同时参考了Kafka的分区日志和Redis的简洁哲学,最终选择了一种"追加日志+消费者组"的结构。它不是Kafka的替代品,而是填补了"想要消息队列但不想引入重量级中间件"这个需求空白。

1.2 Stream的数据模型

从宏观上看,一个Stream就是一个**只追加(append-only)**的日志文件:

Stream = 一个逻辑上的消息序列 ─────────────────────────────────────────────────────────► 时间轴 [msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8] ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ 每条消息包含: ┌─────────────────────────────┐ │ ID: 1716700000000-0 │ ← 毫秒时间戳-序号 │ 字段1: value1 │ │ 字段2: value2 │ ← 键值对(KV格式) │ 字段3: value3 │ └─────────────────────────────┘ 关键特性: ✓ 消息追加到末尾,不修改不删除(除非手动DEL/XTRIM) ✓ ID保证了全局顺序 ✓ 不同消费者以不同进度消费同一条Stream ✓ 消费者组可以追踪每个消费者的进度

二、Stream ID:时间与序号的精妙编码

2.1 ID格式

Stream的每条消息都有一个全局唯一的ID,格式为:

Stream ID = <millisecondsTime>-<sequenceNumber> 示例:1716700000000-0 ┌──────────────┐ ┌┐ │ 1716700000000 │0│ └──────┬───────┘ └┤ │ │ UNIX毫秒时间戳 序列号(同毫秒内的微调) (2024-05-26 (从0开始自增) 06:00:00.000)

这个设计有两个精妙之处:

  1. 时间可排序:毫秒时间戳本身就保证了大致的时间顺序
  2. 同毫秒防冲突:序列号保证同一毫秒内的多条消息ID不重复
# ID生成规则演示# 使用 * 让Redis自动生成ID127.0.0.1:6379>XADD mystream * name"Tom"age"25""1716700000000-0"# 1ms后的消息127.0.0.1:6379>XADD mystream * name"Jerry"age"30""1716700000001-0"# 时间戳自动+1# 同1ms内多条消息127.0.0.1:6379>XADD mystream * msg"a"# 序列号=0"1716700000002-0"127.0.0.1:6379>XADD mystream * msg"b"# 序列号=1"1716700000002-1"127.0.0.1:6379>XADD mystream * msg"c"# 序列号=2"1716700000002-2"# 也可以手动指定ID(不推荐,除非有特殊需求)127.0.0.1:6379>XADD mystream1716700000003-0 msg"manual""1716700000003-0"

2.2 ID的约束规则

ID规则: 1. 新ID必须大于Stream中当前最大ID → 保证消息严格追加、不会插入到历史位置 → 如果客户端指定了一个小于等于最大ID的值,会报错 2. 时间戳部分必须 >= 当前最大ID的时间戳 → 防止时钟回拨造成ID错乱 3. 如果时间戳相同,序列号必须 > 同时间戳的最大序列号 → 保证同一毫秒内的严格顺序 4. 最小ID:0-0(XRANGE用 - 表示) 5. 最大ID:理论上无穷(XRANGE用 + 表示)

三、基础命令:写入与读取

3.1 XADD:添加消息

# 基本语法XADD stream_key[MAXLEN ~ count][NOMKSTREAM]<ID>field value[field value...]# 添加一条消息127.0.0.1:6379>XADD orders * order_id"1001"user_id"U001"amount"99.9""1716700000000-0"# 查看Stream长度127.0.0.1:6379>XLEN orders(integer)1# 限制Stream长度(约等于最近1000条,trim效率优化)127.0.0.1:6379>XADD orders MAXLEN ~1000* order_id"1002"amount"58.0""1716700000001-0"

MAXLEN ~中的~表示"近似裁剪"——Redis可能在Stream节点层面做性能优化,实际保留的消息数可能略多于1000条。如果要求精确,去掉~即可,但性能会下降。

3.2 XRANGE / XREVRANGE:范围查询

# 查询所有消息(-表示最小ID,+表示最大ID)127.0.0.1:6379>XRANGE orders - + COUNT51)1)"1716700000000-0"2)1)"order_id"2)"1001"3)"user_id"4)"U001"5)"amount"6)"99.9"2)1)"1716700000001-0"2)1)"order_id"2)"1002"3)"amount"4)"58.0"# 查询指定时间范围127.0.0.1:6379>XRANGE orders17167000000001716700001000COUNT100# 反向查询(最新的在前)127.0.0.1:6379>XREVRANGE orders + - COUNT10

3.3 XREAD:读取消息(非消费者组模式)

# 从最早的消息开始读取127.0.0.1:6379>XREAD COUNT2STREAMS orders0-0# └─ 起始ID:0-0=最早# 阻塞读取新消息(等5秒)127.0.0.1:6379>XREAD BLOCK5000STREAMS orders $# └─ 起始ID:$=只读最新# 同时读取多个Stream127.0.0.1:6379>XREAD COUNT5BLOCK0STREAMS orders payments $
XREAD的两种模式: 模式1:从头读(指定ID如0-0或特定ID) 用途:消费者首次启动,回溯历史消息 模式2:读新消息(使用$) 用途:消费者只关心后续消息,不关心历史 注意:XREAD不是消费者组模式! 它只是"读取"消息,不会追踪消费进度。 适合一次性消费或手动管理进度的场景。

四、消费者组:Stream的"大师级"功能

4.1 消费者组模型

消费者组是Stream区别于List/PubSub的核心特性。它解决了多消费者协同消费的问题:

消费者组工作模型: Stream: orders ┌──────────────────────────────────────────────────┐ │ msg1 msg2 msg3 msg4 msg5 msg6 msg7 ... │ └──────────────────┬───────────────────────────────┘ │ 消费者组: "order-processors" │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │Consumer1│ │Consumer2│ │Consumer3│ │ 消费msg1 │ │ 消费msg2 │ │ 消费msg3 │ │ 消费msg4 │ │ 消费msg5 │ │ 消费msg6 │ │ 消费msg7 │ │ ... │ │ ... │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └── 每个消费者维护自己的 ─┘ 消费进度(PENDING列表) 关键规则: ✓ 同一条消息只被组内一个消费者消费 ✓ 不同消费者组可以独立消费同一条Stream ✓ 组内消费者可以动态增减(弹性伸缩)

4.2 消费者组命令详解

# === 1. 创建消费者组 ===# XGROUP CREATE stream group id [MKSTREAM]127.0.0.1:6379>XGROUP CREATE orders order-processors0-0 MKSTREAM# └─ 组名 └─ 从最早的ID开始消费# └─ 如果Stream不存在则创建# 注意:id=0-0 从头消费;id=$ 只消费新消息# === 2. 消费者组读取消息 ===# XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key id127.0.0.1:6379>XREADGROUP GROUP order-processors consumer-1 COUNT2STREAMS orders># └─ 组名 └─ 消费者名 └─ > 表示"没确认过的消息"# (也接受指定ID和0)# > : 只发送从未交付给任何消费者的新消息# 0 : 发送该消费者pending中但未确认的消息# 指定ID : 从特定ID开始读取# === 3. 确认消息 ===# XACK stream group id [id ...]127.0.0.1:6379>XACK orders order-processors1716700000000-0(integer)1# 确认成功# === 4. 查看Pending列表 ===127.0.0.1:6379>XPENDING orders order-processors1)(integer)5# Pending消息总数2)"1716700000000-0"# 最小Pending ID3)"1716700000010-0"# 最大Pending ID4)1)1)"consumer-1"# 每个消费者的Pending数量2)"3"2)1)"consumer-2"2)"2"# 详细查看Pending消息127.0.0.1:6379>XPENDING orders order-processors - +101)1)"1716700000000-0"# ID2)"consumer-1"# 消费者3)(integer)60000# 空闲时间(毫秒)4)(integer)3# 被投递次数# === 5. 转移消息 ===# XCLAIM stream group consumer min-idle-time id [id ...]127.0.0.1:6379>XCLAIM orders order-processors consumer-2600001716700000000-0# └─ 组名 └─ 目标消费者 └─ 空闲阈值 └─ 消息ID# 含义:将空闲超过60秒的消息从consumer-1转移给consumer-2

4.3 消费者组的完整工作流

消息从生产到消费的完整生命周期: 1. 生产者发送消息 XADD orders * order_id "1001" amount "99.9" → 消息进入Stream,状态: 未分配 2. 消费者拉取消息 XREADGROUP GROUP order-processors worker-1 STREAMS orders > → 消息分配给worker-1,进入Pending列表 → 状态: 待确认(pending) 3. 消费者处理成功 业务逻辑处理... XACK orders order-processors "1716700000000-0" → 消息从Pending中移除 → 状态: 已确认(done) 4. 消费者处理失败(或者挂了) consumer-1 获取消息后崩溃(没有XACK) → 消息留在consumer-1的Pending列表中 → 空闲时间持续增长 → 状态: 挂起(stuck) 5. 其他消费者认领 XPENDING 发现某消息空闲超过阈值 XCLAIM orders order-processors worker-2 60000 "1716700000000-0" → 消息转移给worker-2,投递次数+1 → 状态: 重新分配

五、Java实战:Spring Data Redis操作Stream

5.1 生产者代码

@ComponentpublicclassStreamProducer{@AutowiredprivateStringRedisTemplateredisTemplate;privatestaticfinalStringSTREAM_KEY="orders";// 发送消息publicStringsendMessage(Map<String,String>fields){// 构建RecordMapRecord<String,String,String>record=StreamRecords.newRecord().ofMap(fields).withStreamKey(STREAM_KEY);// XADD,自动分配IDRecordIdid=redisTemplate.opsForStream().add(record);// 限制Stream长度(防止内存溢出)redisTemplate.opsForStream().trim(STREAM_KEY,10000);// 保留最新1万条returnid.getValue();// 返回 "1716700000000-0"}// 批量发送publicvoidsendBatch(List<Map<String,String>>messages){messages.forEach(this::sendMessage);}}

5.2 消费者代码(消费者组模式)

@ComponentpublicclassStreamConsumer{@AutowiredprivateStringRedisTemplateredisTemplate;privatestaticfinalStringSTREAM_KEY="orders";privatestaticfinalStringGROUP_NAME="order-processors";privatestaticfinalStringCONSUMER_NAME="worker-"+UUID.randomUUID().toString().substring(0,8);@PostConstructpublicvoidinit(){// 创建消费者组(如果不存在)try{redisTemplate.opsForStream().createGroup(STREAM_KEY,GROUP_NAME);}catch(RedisSystemExceptione){// 组已存在,忽略if(!e.getMessage().contains("BUSYGROUP")){throwe;}}// 启动消费循环startConsuming();}publicvoidstartConsuming(){newThread(()->{while(true){try{// 先处理Pending消息(之前没ACK的)processPending();// 再拉取新消息List<MapRecord<String,Object,Object>>messages=redisTemplate.opsForStream().read(Consumer.from(GROUP_NAME,CONSUMER_NAME),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(5)),StreamOffset.create(STREAM_KEY,ReadOffset.lastConsumed()));for(MapRecord<String,Object,Object>message:messages){try{// 处理业务逻辑processOrder(message.getValue());// 确认消息redisTemplate.opsForStream().acknowledge(STREAM_KEY,GROUP_NAME,message.getId());}catch(Exceptione){log.error("消息处理失败: {}",message.getId(),e);// 不ACK,等待XCLAIM转移或重试}}}catch(Exceptione){log.error("消费循环异常",e);try{Thread.sleep(1000);}catch(InterruptedExceptionie){}}}},"stream-consumer").start();}// 处理Pending消息(兜底机制)privatevoidprocessPending(){PendingMessagespendingMessages=redisTemplate.opsForStream().pending(STREAM_KEY,GROUP_NAME,PendingMessagesOptions.empty().range(Range.unbounded()).count(10));for(PendingMessagemsg:pendingMessages){// 空闲超过60秒的消息,重新处理if(msg.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofSeconds(60))>0){List<MapRecord<String,Object,Object>>claimed=redisTemplate.opsForStream().claim(STREAM_KEY,GROUP_NAME,CONSUMER_NAME,Duration.ofSeconds(60),RecordId.of(msg.getIdAsString()));for(MapRecord<String,Object,Object>record:claimed){try{processOrder(record.getValue());redisTemplate.opsForStream().acknowledge(STREAM_KEY,GROUP_NAME,record.getId());}catch(Exceptione){log.error("Pending消息重试失败: {}",record.getId(),e);}}}}}}

⚠️ 注意:Spring Data Redis 3.x与2.x的Stream API变化较大。上面用的是3.x的API。如果你还在用2.x,需要参考对应版本的文档,核心逻辑是一样的,只是类名和方法签名略有不同。

六、Stream vs Kafka:相似与不同

6.1 架构对比

Redis Stream: ┌────────────────────────────────────────────┐ │ Redis 单节点(或Cluster) │ │ │ │ Stream: orders 消费者组: proc-1 │ │ ┌─────────────────┐ ┌──────┐ ┌──────┐ │ │ │ msg1 msg2 msg3 │ │Consumer1│Consumer2│ │ │ └─────────────────┘ └─────────┘ │ │ 外部存储 │ └────────────────────────────────────────────┘ 优势:部署简单,低延迟 劣势:单Stream,无分区,受单机内存限制 Kafka: ┌────────────────────────────────────────────┐ │ Kafka Broker集群 │ │ │ │ Topic: orders (3分区) │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Partition 0 │ │ Partition 1 │ ... │ │ │ 磁盘持久化 │ │ 磁盘持久化 │ │ │ └─────────────┘ └─────────────┘ │ │ 磁盘 = TB级存储 │ │ 消费者组 = 跨分区消费 │ └────────────────────────────────────────────┘ 优势:海量堆积、分区并行、持久化到磁盘 劣势:运维复杂、延迟略高

6.2 关键差异

维度Redis StreamKafka
分区/分片无原生分区分区是核心概念
存储内存(RDB/AOF备份)磁盘(顺序写入)
消息删除MAXLEN/XTRIM/手动DEL按时间或大小自动清理
单Stream吞吐10万+/s百万+/s
延迟<1ms2-10ms
消费者进度保存在Redis中保存在Kafka内部topic
消息回溯支持(只要没被trim)支持(可配置保留时间)
部署成本低(你可能已有Redis)高(需独立集群)

6.3 XTRIM:控制Stream长度

# 精确裁剪:保留最近1000条127.0.0.1:6379>XTRIM orders MAXLEN1000# 近似裁剪:性能更高,但可能多保留几十条127.0.0.1:6379>XTRIM orders MAXLEN ~1000# 按最小ID裁剪:删除小于指定ID的消息127.0.0.1:6379>XTRIM orders MINID1716700001000-0# XADD时同时限制长度(一步到位)127.0.0.1:6379>XADD orders MAXLEN ~1000* field value

⚠️ 注意:XTRIM会删除消息!如果有消费者还没消费到这些消息,消息就永久丢失了。在设置MAXLEN时要确保所有消费者组都能在消息被trim之前消费完毕。建议给Stream预留足够的长度余量,比如你日均消费1000条,设置MAXLEN 5000比1000更安全。

七、总结

Redis Stream让Redis真正成为了一款"可以当消息队列用"的数据结构服务器。它的核心价值在于:

  1. 零额外成本:你很可能已经在用Redis了,Stream不需要新的部署
  2. 消费者组:解决了多消费者协同消费的难题
  3. 消息ACK:消费确认机制保障了消息可靠投递
  4. 低延迟:内存操作,延迟可以做到亚毫秒级

但它不是Kafka的替代品。面对TB级堆积、分区并行消费、严格顺序保证等场景,专业的消息中间件仍然是最佳选择。

记住一个判断标准:如果你的团队已经在维护Kafka/RabbitMQ,就别折腾Stream了;如果你没有MQ,且消息量不大(百万级/天以下),Stream是性价比最高的选择。

下一篇文章,我们将聊聊HyperLogLog——一个用12KB内存就能统计1亿用户的神奇数据结构。


上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数


http://www.jsqmd.com/news/958326/

相关文章:

  • 【Veo 2运动捕捉黄金参数手册】:20年影像工程师亲测的5大动态设置阈值与帧率协同公式
  • 旧房翻新品牌哪家好,和居派如何? - mypinpai
  • 教师必备!这些PPT模板堪称教学神器 - 品牌测评鉴赏家
  • Okbiye 文献综述 AI 创作:打破科研综述撰写壁垒,一站式解锁学术文献梳理新范式
  • 2026年6月上海GEO优化公司推荐:TOP5专业评测价格适用场景 - 品牌推荐
  • 保姆级教程:用MATLAB Simscape Multibody从零搭建一个会动的倒立摆模型
  • 计算机毕业设计之django基于Django和Bootstrap的社区疫情防控系统设计与实现
  • 如何使用 6 种方法将照片从三星手机传输到三星手机
  • 2026年防雷接地工程应用白皮书-机房与重点场所深度剖析 - 优质品牌商家
  • 解密IPATool:iOS应用包下载的黑科技革命
  • 计算机小程序毕设实战-基于微信小程序的靓丽旅游分享平台基于springboot+微信小程序的丽江市旅游分享平台【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 快马平台五分钟生成罗盘时钟:零基础打造动态方位时间显示原型
  • 多维聚合不是GROUP BY:数据变形术与OLAP操作心法
  • 从一次HDFS客户端连接失败,聊聊Hadoop FileSystem SPI机制那些事儿
  • 说说天津有哪些靠谱的蒸饼制造商 - mypinpai
  • 985硕士去华为OD,是真的亏,还是
  • 河南到全国大票零担快运专线物流服务商选择参考 - 品牌排行榜
  • 从脚本到Skills:测试智能体的下一步,让AI学会“如何测而不是测什么”
  • 双有源桥DAB变换器三重移相TPS仿真模型研究(Simulink仿真实现)
  • 从HZK16到C数组:手把手实现嵌入式汉字字模提取与转换工具
  • H2O中stacking实战:元学习器原理、避坑指南与R语言生产部署
  • 2026酱香型调味酒酒体设计品牌选型技术推荐:白酒批发厂家/白酒招商代理/缺陷酒修复/苦味酒处理/实力盘点 - 优质品牌商家
  • 成都窗帘技术选型与落地全推荐:品质把控核心要点 - 优质品牌商家
  • 2026年当前,如何甄选专业靠谱的细石混凝土泵厂商? - 2026年企业资讯
  • 新材略律所,企业劳动争议案例分析排名靠前吗? - mypinpai
  • 计算机毕业设计之django基于Django黄河文化资源管理系统
  • 如何从 Vivo 文件保险箱恢复已删除的照片
  • 天津瓷器回收,京顺斋全国上门,专业鉴宝,诚信无忧 - 深鉴新闻
  • 2026年上海新房装修施工实力企业深度解析:聚焦全流程品质交付 - 2026年企业资讯
  • 投简历总石沉大海,可能是你的PPT模板拖后腿了,简历PPT模板平台推荐 - 品牌测评鉴赏家