当前位置: 首页 > news >正文

Redis 入门与实践:从基础到 Stream 消息队列

目录

一、Redis 简介

二、Redis 核心特点

三、Redis 常用数据结构

3.1 String(字符串)

3.2 Hash(哈希)

3.3 List(列表)

3.4 Set(集合)

3.5 Sorted Set(有序集合)

四、Redis Stream:消息队列

4.1 基本命令

4.2 消费者组

4.3 Stream 与 List 的对比

五、Redis 在项目中的典型用法

5.1 架构示意

5.2 MAF 营销分析任务流

5.3 消费者组带来的好处

5.4 Redis Stream 请求分发规则

六、Redis 常用配置

七、Python 操作 Redis

7.1 Linux安装

7.2 Docker安装

7.3 连接Redis

7.4 基本使用

八、Redis 使用建议

总结


一、Redis 简介

Redis(Remote Dictionary Server)是一个基于内存的键值存储系统,支持多种数据结构,常被用作缓存、消息队列和会话存储。它提供高性能读写,并支持持久化、主从复制和集群。

二、Redis 核心特点

特点说明
内存存储数据主要在内存中,读写延迟低(通常微秒级)
持久化支持 RDB 快照和 AOF 日志,保证数据不丢失
多种数据结构String、List、Hash、Set、Sorted Set、Stream、Bitmap 等
单线程模型命令串行执行,避免锁竞争,保证原子性
主从复制支持读写分离和高可用
发布订阅支持 Pub/Sub 和 Stream 消息队列
事务支持 MULTI/EXEC 事务
Lua 脚本支持 Lua 脚本,保证原子性
集群支持 Redis Cluster 水平扩展

三、Redis 常用数据结构

3.1 String(字符串)

SET key value # 设置 GET key # 获取 INCR key # 自增 EXPIRE key seconds # 设置过期时间

3.2 Hash(哈希)

HSET user:1 name "张三" age 25 HGET user:1 name HGETALL user:1

3.3 List(列表)

LPUSH queue task1 # 左侧入队 RPOP queue # 右侧出队 LRANGE queue 0 -1 # 范围查询

3.4 Set(集合)

SADD tags "redis" "cache" SMEMBERS tags SISMEMBER tags "redis"

3.5 Sorted Set(有序集合)

ZADD rank 100 "user1" 95 "user2" ZRANGE rank 0 -1 WITHSCORES ZREVRANK rank "user1"

四、Redis Stream:消息队列

Redis 5.0 引入 Stream,用于实现消息队列,支持:

  • 消息持久化
  • 消费者组
  • 消息确认(ACK)
  • 历史消息回溯

4.1 基本命令

# 添加消息 XADD mystream * field1 value1 field2 value2 # 读取消息(从头) XREAD COUNT 10 STREAMS mystream 0 # 读取最新消息(阻塞) XREAD BLOCK 5000 STREAMS mystream $

4.2 消费者组

# 创建消费者组 XGROUP CREATE mystream mygroup 0 MKSTREAM # 消费消息 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream > # 确认消息 XACK mystream mygroup message-id # 查看待确认消息 XPENDING mystream mygroup

4.3 Stream 与 List 的对比

特性ListStream
消息持久化
消费者组
消息确认
历史回溯有限支持
适用场景简单队列可靠消息队列

五、Redis 在项目中的典型用法

以银行场景下的 SOP 合规检测为例,Redis Stream 用于任务分发和结果回传。

5.1 架构示意

后端/前端 → XADD 推送任务 → Redis Stream

sop_engine 监听 XREADGROUP

处理完成后 XADD 推送结果

后端消费 message:xxx:results

5.2 MAF 营销分析任务流

# 1. 推送 MAF 任务 XADD message:maf:tasks * task_id "maf_001" \ conversation_id "maf_001" \ conversation_json_path "bucket/path/conversation.json" # 2. 引擎监听 message:maf:tasks,消费并分析 # 3. 引擎完成后写入结果流 XADD message:maf:results * task_id "maf_001" status "completed" \ result_path "maf-results-bucket/maf_001/analysis_result.json"

5.3 消费者组带来的好处

  • 多个 worker 共同消费,实现负载均衡
  • 每条消息只被组内一个消费者处理
  • 支持 ACK,失败可重试
  • 支持 PEL(Pending Entries List)查看未确认消息

5.4 Redis Stream 请求分发规则

如果两个项目使用了同一套Redis配置,那么无法保证Redis Stream 请求会由谁处理,取决于 Redis 消费者组的分发。

两个容器都连同一个 Redis,监听同一个 Stream(如 message:maf:tasks),且通常在同一消费者组(如 maf_analyze_group)里:

  • Redis 会把消息分发给组内的消费者
  • 每条消息只会被组内一个消费者处理
  • 具体是容器 1 还是容器 2,由 Redis 的分发策略决定,无法指定

因此:

  • 有时是A容器处理
  • 有时是B容器处理
  • 测试时无法稳定地“只让某个容器”处理请求

可能带来的问题:

场景说明
代码版本混用同一批任务,部分由旧代码处理,部分由新代码处理
测试不可控无法保证测试请求一定打到新容器
任务重复若消费者组配置不当,可能出现同一条消息被多个消费者处理

如何保证只使用新代码:

方案 1:停掉旧容器

# 停掉旧容器后再部署新容器 docker stop <old_container_id> docker run ... # 启动新容器

方案 2:用不同的 Stream 做测试

  • 新容器监听不同的 Stream,例如 message:maf:tasks_test
  • 测试时往 message:maf:tasks_test 发消息
  • 需要改配置或环境变量,让新容器使用 message:maf:tasks_test

方案 3:用不同的消费者组

  • 新容器使用不同的消费者组名,例如 maf_analyze_group_v2
  • 两个组都会收到同一条消息,各自处理一次
  • 适合做 A/B 或灰度,但会产生重复处理,需要业务上能接受

方案 4:只保留一个容器

  • 部署新容器前先停掉旧容器
  • 或使用滚动更新,保证同一时间只有一个版本在跑

六、Redis 常用配置

# 绑定地址 bind 0.0.0.0 # 端口 port 6379 # 密码 requirepass your_password # 最大内存 maxmemory 2gb maxmemory-policy allkeys-lru # 持久化 save 900 1 save 300 10 save 60 10000 appendonly yes

七、Python 操作 Redis

7.1 Linux安装

# Ubuntu/Debian sudo apt update sudo apt install redis-server # CentOS/RHEL sudo yum install redis # 启动服务 sudo systemctl start redis sudo systemctl enable redis # 验证 redis-cli ping # 返回 PONG 表示成功

7.2 Docker安装

docker run -d --name redis -p 6379:6379 redis:latest # 带密码 docker run -d --name redis -p 6379:6379 redis redis-server --requirepass yourpassword

7.3 连接Redis

# 本地连接 redis-cli # 带密码连接 redis-cli -a yourpassword # 远程连接 redis-cli -h host -p 6379 -a password

7.4 基本使用

import redis r = redis.Redis(host='localhost', port=6379, db=0, password='') # String r.set('name', 'Redis') print(r.get('name')) # Stream 添加消息 r.xadd('mystream', {'task_id': '001', 'data': 'hello'}) # Stream 消费 messages = r.xreadgroup('mygroup', 'consumer1', {'mystream': '>'}, count=1) for stream, msgs in messages: for msg_id, data in msgs: print(msg_id, data) r.xack('mystream', 'mygroup', msg_id)

八、Redis 使用建议

  1. 合理设置 maxmemory 和淘汰策略,避免 OOM。
  2. 生产环境开启 requirepass 和访问控制。
  3. 使用连接池,减少连接开销。
  4. 对热点 key 做拆分或本地缓存,减轻压力。
  5. 使用 Pipeline 批量执行命令,降低网络往返。
  6. Stream 场景下注意消费者组和 ACK,避免消息堆积和重复消费。

总结

Redis 适合做缓存、会话存储和消息队列。Stream 在需要可靠消费、负载均衡和消息确认的场景中,比简单 List 更合适。结合具体业务(如 MAF 任务流),可以设计出清晰、可扩展的异步处理架构。

http://www.jsqmd.com/news/512733/

相关文章:

  • 不用写代码!用Cherry Studio+Ollama打造行业专属GPT助手(含30+预置模板调参心得)
  • 产生式表示法
  • 2026年高口碑AIGC短剧制作出海服务商推荐榜单
  • 无线网Wi-Fi简介
  • STP 生成树协议课程课后总结
  • 第四篇:嵌入式系统常用通信接口详解(I2C、SPI、UART、RS232/485、CAN、USB)
  • 满载效率|D100运载无人机实测
  • 盒模型深度解剖:标准盒模型与怪异盒模型的区别
  • MySQL 的查询优化器如何选择执行计划?
  • 基于Python的黑龙江旅游景点数据分析系统的实现_flask+spider
  • ERP系统
  • 2026年国贤府PARK价格深度解析:价值锚点与市场前景研判 - 十大品牌推荐
  • Vue3 + vxe-table 实战:如何用工具栏模式实现ERP系统的列个性化记忆功能?
  • 天猫下单,门店换货;全渠道售后“此刻更丝滑”!商派Omni-OMS系统助力
  • 分析蛋糕裱花烘焙培训学校,太原欧米奇性价比高不高,值得选吗? - myqiye
  • 思科Nexus交换机 --- 华为CE6800 STP生成树对接故障
  • 2026白酒制造商排名出炉,雄盛橄榄酒以特色工艺和服务性价比入选 - mypinpai
  • Grid网格布局从入门到精通:像大师一样布局
  • 探讨适合家居行业的AIGEO搜索优化品牌如何选择 - 工业设备
  • 从零开始:使用ArcGIS系列工具高效生成TPK与mmpk离线地图包
  • Python 开发“设计模式”指南
  • 设计旅途之照明篇(四)——照明系统图
  • 在国产替代中如何选择可靠连接器?2026年针对赫斯曼与Lumberg插头等三款主流产品的专业评测 - 速递信息
  • DS18B20 单总线(1-Wire)协议:UART 模拟篇
  • 2026年南京口碑好的日立空调售后服务推荐,专业维修与保养全解析 - 工业品网
  • GPT-5.4降价血战:mini当老大,nano做小弟,独立开发者的省钱攻略
  • 基于博途1200PLC+HMI的‘大小球分拣控制系统仿真‘工程
  • 探讨日立空调售后靠谱吗,张尤达全品牌服务有保障 - 工业品牌热点
  • 本地部署openclaw
  • 响应式设计的核心:深入理解CSS媒体查询