当前位置: 首页 > news >正文

redis stream介绍

介绍

redis stream是一种类似日志追加的数据结构。可用来记录和实时处理事件。适用场景:

  • 事件溯源
  • 传感器监控
  • 通知

性能

新增 O(1)
访问单个节点是O(n),n是ID的长度
redis stream使用radix trees实现

基础

XADD

新增条目使用XADD

> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"

一个条目由一个或多个key-value组成,类似于字典。
以上命令往stream race:france中添加了一个条目:rider:Castilla, speed:29.9, position:1, location_id:2,并且自动生成id,其id为1692632147973-0*代表自动生成ID。每一个新生成的ID是递增的。

Entry IDs

ID的组成

<millisecondsTime>-<sequenceNumber>

从Streams获取数据

读取数据有多种模式:

  1. 多个客户端读取一个Stream的新数据,类似 tail -f 命令
  2. 按时间查询,查询历史数据,遍历数据
  3. 同一个消费组种的消费者消费一个Stream中的不同消息,类似Kafka

范围查询XRANGE和XREVRANGE

特殊的ID
-表示最小的id
+表示最大的id
示例:
获取最靠前的2个节点

XRANGE race:france - + COUNT 2

查询ID从1692632094485-0开始的2个元素,不包括1692632094485-0本身,‘(’符号表示不包含。

XRANGE race:france (1692632094485-0 + COUNT 2

XREVRANGE命令与XRANGE相等,只是返回的元素的顺序相反。

使用XREAD监听新元素

  1. 非阻塞式读取:
XREAD COUNT 2 STREAMS race:france 0

以上命令返回2个ID大于0的数据。
2. 阻塞读取

XREAD BLOCK 0 STREAMS race:france $

BLOCK 0的效果是一直阻塞,$表示当前stream中最大的ID值。这个命令实现了tail -f的效果。

消费组

消费组的概念与Kafka中的消费组类型,但实现上与kafka无关。

消费者组就像一个从一个流中获取数据的伪消费者,它实际上服务于多个消费者,提供了以下保证:

  1. 一条消息只能发送给一个消费者。
  2. 在一个消费组中,不同客户端通过一个名称来区分,由客户端提供唯一标识符。
  3. 每个使用者组都有从未使用过的第一个 ID 的概念,因此,当使用者请求新消息时,它可以只提供以前未传递的消息。
  4. 使用消息需要使用特定命令进行显式确认。Redis 将确认解释为:此消息已正确处理,因此可以将其从使用者组中逐出。
  5. 消费组跟踪当前挂起的所有消息,即已传递给使用者组的某个使用者但尚未确认为已处理的消息。由于此功能,在访问流的消息历史记录时,每个消费者将只看到已传递到它的消息。

创建消费者

> XGROUP CREATE race:france france_riders $
OK

在这行命令中,为流race:france创建了一个france_riders消费组,并指定id为$。$表示消费组读取最新的消息。消费者只会读取比指定ID大的消息。如果ID指定为0表示读取流中所有的数据。

自动创建流

> XGROUP CREATE race:italy italy_riders $ MKSTREAM
OK

消费者读取

> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"2) 1) 1) "1692632639151-0"2) 1) "rider"2) "Castilla"

XREADGROUP指令需要指定GROUP <group-name> <consumer-name>

  • ID指定为>,这意味着消费者只希望接收从未传递给任何其他消费者的消息。这意味着,给我新的信息。
  • 任何其他 ID,即0或任何其他有效ID或不完整ID(仅仅毫秒时间部分),将返回为发送命令的使用者返回待处理的条目,其 ID 大于所提供的 ID。因此,基本上,如果 ID 不为>,那么该命令将只允许客户端访问其待处理条目:那些发送给它,但尚未确认的消息。请注意,在这种情况下,BLOCK 和 NOACK 都被忽略。

XREADGROUP 是一个写入命令, 因为即使它从流中读取数据,读取操作也会产生副作用,导致消费者组被修改,因此它只能在主实例上调用。
一个使用ruby实现的消费者代码例子:

require 'redis'if ARGV.length == 0puts "Please specify a consumer name"exit 1
endConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.newdef process_message(id,msg)puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end$lastid = '0-0'puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true# Pick the ID based on the iteration: the first time we want to# read our pending messages, in case we crashed and are recovering.# Once we consumed our history, we can start getting new messages.if check_backlogmyid = $lastidelsemyid = '>'enditems = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)if items == nilputs "Timeout!"nextend# If we receive an empty reply, it means we were consuming our history# and that the history is now empty. Let's start to consume new messages.check_backlog = false if items[0][1].length == 0items[0][1].each{|i|id,fields = i# Process the messageprocess_message(id,fields)# Acknowledge the message as processedr.xack(:my_stream_key,GroupName,id)$lastid = id}
end

故障恢复

使用XPENDINGXCLAIM命令从故障进行恢复。
Redis 消费者者组提供了一项功能,用于在这些情况下使用该功能来声明给定消费者的待处理消息,以便此类消息将更改所有权并重新分配给不同的消费者。消费者必须检查挂起的消息列表,并且必须使用特殊命令声明特定消息,否则服务器将使消息永远处于挂起状态并分配给旧的消费者。

XPENDING

> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"2) "2"

以这种方式调用时,该命令会输出消费者组中待处理消息的总数(在本例中为两个),待处理消息中较低和较高的消息 ID,最后输出消费者列表和它们拥有的待处理消息数。
也可以指定开始和结束的ID

> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"2) "Bob"3) (integer) 746424) (integer) 1
2) 1) "1692632662819-0"2) "Bob"3) (integer) 746424) (integer) 1

该命令返回了消息的详细信息,ID、使用者名称、空闲时间(以毫秒为单位),即自上次将消息传递给某个使用者以来已经过去了多少毫秒,最后是给定消息传递的次数。
使用XCLAIM指令来更改消息的所有权

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

Claiming and the delivery counter

delivery counter在2种情况下会递增:

  1. 当一个消息被成功claim时
  2. 使用XREADGROUP去读取历史消息
    当出现故障时,消息会多次传递是正常的,但最终它们通常会被处理和确认。
    因此,一旦传递计数器达到您选择的给定大数字,将此类消息放在另一个流中并向系统管理员发送通知可能更明智。这基本上是 Redis Streams 实现死信概念的方式。

特殊的ID符号

-可能的最小id(0-1)
+可能的最大id(18446744073709551615-18446744073709551615),类似Integer.MAX_VALUE
$表示当前stream中最大的那个ID
>表示消费组中最后一个已发送的ID

持久化,复制和消息安全性

Stream数据结构像其他Redis数据结构一样异步地同步到副本节点以及持久化到AOF和RDB文件中。消费组的状态会保存在副本、AOF、RDB中。在重启后,通过AOF文件可恢复消费组的状态。

http://www.jsqmd.com/news/38700/

相关文章:

  • Java 线性表、栈、队列和优先队列
  • 2025/11/11
  • 植物大战僵尸修改器下载教程:图文详解与实用技巧
  • 微服务——注册中心
  • 【深度学习计算机视觉】13:实战Kaggle比赛:图像分类 (CIFAR-10) - 指南
  • fabricjs 整合 vue3-sketch-ruler 实现标尺功能
  • 2025年真空耙式干燥机定做厂家权威推荐榜单:真空单锥螺带干燥机/沸腾床干燥机/闪蒸干燥机源头厂家精选
  • 基础查找算法(三)二分查找
  • 2025年软像套电缆订做厂家权威推荐榜单:补偿电缆/矿物质电缆/电力电缆源头厂家精选
  • 2025年济南统招专升本学校权威推荐榜单:专升本机构报名/全日制专升本/专升本考试培训学校精选
  • 一些水题
  • (3)Bug篇 - 详解
  • 西林瓶灌装轧盖机:黔东南折旧年限与成本解析
  • list对象 集合 和 String 互转
  • 碎碎念(二四)
  • 高精度除法模板(p1480)
  • 如何完成一个简单的rust WebAssembly调用
  • 焊接工业机器人节气装置
  • 详细介绍:考研408--组成原理--day1
  • 深入解析:海尔 Haier Master 智能家居网关安装 Home Assistant 实践指南
  • 枣庄西林瓶灌装轧盖机:SIP灭菌快,自动冷却高效
  • 【Nano Banana超详细教程】AI绘图神器Gemini 2.5实测:一键生成神图!
  • 已完成今日基础缩索大学习
  • 配置ElactisSearch跨域
  • 西林瓶粉末灌装机:塔城培训服务免费提供
  • Ubuntu设置中文智能拼音输入法
  • 一份用pyhon生成word/wps文档的代码2
  • 200粉粉福
  • 【chrome】chrome浏览器OptGuideOnDeviceModel模型占用磁盘空间的解决方法!
  • 这样的算作“全栈技术”吗?