当前位置: 首页 > news >正文

JAVA-实战8 Redis实战项目—雷神点评(7)Redis消息队列实现异步秒杀

夢の中で描いた絵のようなんだ

Redis消息队列实现异步秒杀

需求:

创建一个Stream类型的消息队列
修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向消息队列中添加消息
项目启动时,开启一个线程任务,尝试获取消息队列中的消息,完成下单

创建消息队列

XGROUP CREATE SecondKillMessageQueue SecondKillGroup 0 MKSTREAM

image
image

修改Lua脚本

修改Lua脚本如下:

local VoucherId = ARGV[1]
local UserId = ARGV[2]
-- 新增:传递订单Id
local VoucherOrderId = ARGV[3]local StockKey = 'seckill:stock:' .. VoucherId
local OrderKey = 'seckill:order:' .. VoucherIdif(tonumber(redis.call('get',StockKey)) <=0) thenreturn 1
endif(redis.call('sismember',OrderKey,UserId) ==1) thenreturn 2
endredis.call('incrby',StockKey,-1)
redis.call('sadd',OrderKey,UserId)
-- 新增:Redis向消息队列中添加消息,其中键值对的字段名要和数据实体类的属性名一致,从而便于接下来的映射
redis.call('xadd','SecondKillMessageQueue','*','UserId',UserId,'VoucherId',VoucherId,'VoucherOrderId',VoucherOrderId);
return 0;

修改线程任务和消息队列读取

我们不再需要声明成员变量来设置阻塞队列,而是通过stringRedisTemplate.opsForStream().read()方法来直接读取Stream结构的消息队列来获取异步消息

private static final ExecutorService SECONDKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct
private void init() {SECONDKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}private class VoucherOrderHandler implements Runnable {private final String MessageQueueName = "SecondKillMessageQueue";private final String GroupName = "SecondKillGroup";private final String ConsumerName = "SecondKillConsumer";@Overridepublic void run() {while(true) {try {List<MapRecord<String,Object,Object>> VoucherOrderRecordList = stringRedisTemplate.opsForStream().read(// 三行分别是:设置消费者组名和消费者名,设置单次读取最大量和最长等待时间,设置消息队列名称和获取消息起始IdConsumer.from(GroupName,ConsumerName),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2L)),StreamOffset.create(MessageQueueName, ReadOffset.lastConsumed()));if(VoucherOrderRecordList == null || VoucherOrderRecordList.isEmpty()) {continue;}// 提取消息Id和消息内容(键值对格式)MapRecord<String,Object,Object> VoucherOrderRecord = VoucherOrderRecordList.get(0);RecordId VoucherOrderRecordId = VoucherOrderRecord.getId();Map<Object,Object> VoucherOrderRecordValue = VoucherOrderRecord.getValue();// 设置Map键字段名到Bean属性名的对应关系CopyOptions copyOptions = CopyOptions.create().setFieldMapping(MapUtil.builder("VoucherOrderId", "id")   // Map键 -> Bean属性   .put("UserId", "userId").put("VoucherId", "voucherId").build());VoucherOrderData NewVoucherOrder =  BeanUtil.fillBeanWithMap(VoucherOrderRecordValue, new VoucherOrderData(), copyOptions);// 执行数据库相关操作AddVoucherOrderFromQueueToDatabase(NewVoucherOrder);// 执行完成后确认消息,传递消息队列名称、消费者组名称、消息IDstringRedisTemplate.opsForStream().acknowledge(MessageQueueName,ConsumerName,VoucherOrderRecordId);} catch (Exception e) {log.error("Calc Order Message Failed:",e);// 出现没有被确认的消息,即Pending,进行处理HandlePendingList();}}}// 除PendingList为空处理和最终异常处理不同外,其余均和正常处理相同private void HandlePendingList(){while(true) {try {List<MapRecord<String,Object,Object>> VoucherOrderRecordList = stringRedisTemplate.opsForStream().read(Consumer.from("SecondKillGroup","SecondKillConsumer"),StreamReadOptions.empty().count(1),StreamOffset.create(MessageQueueName, ReadOffset.from("0")));if(VoucherOrderRecordList == null || VoucherOrderRecordList.isEmpty()) {// 失败 表示PendingList无异常break;}MapRecord<String,Object,Object> VoucherOrderRecord = VoucherOrderRecordList.get(0);RecordId VoucherOrderRecordId = VoucherOrderRecord.getId();Map<Object,Object> VoucherOrderRecordValue = VoucherOrderRecord.getValue();CopyOptions copyOptions = CopyOptions.create().setFieldMapping(MapUtil.builder("VoucherOrderId", "id")   // Map键 -> Bean属性.put("UserId", "userId").put("VoucherId", "voucherId").build());VoucherOrderData NewVoucherOrder =  BeanUtil.fillBeanWithMap(VoucherOrderRecordValue, new VoucherOrderData(), copyOptions);AddVoucherOrderFromQueueToDatabase(NewVoucherOrder);stringRedisTemplate.opsForStream().acknowledge(MessageQueueName,"SecondKillConsumer",VoucherOrderRecordId);} catch (Exception e) {log.error("Calc Order PendingList Message Failed:",e);}}}
}

实现过程中,值得注意的点:

stringRedisTemplate.opsForStream().read()方法对应XREAD操作,而获取消息起始Id由ReadOffset.lastConsumed()设置,其源代码执行为:return new ReadOffset(">");,也就是返回>,从下一个未消费的消息开始

MapRecord是SpringDataRedis‌框架中定义的一个Java接口,用于在Java应用程序中表示Redis.Stream中的一条消息记录,其底层数据结构对应Redis.Stream中的Field-ValuePairs(字段-值对)

其接口签名为public interface MapRecord<S, K, V> extends Record<S, Map<K, V>>, Iterable<Map.Entry<K, V>>

S (Stream):Stream 的名称(即Redis中的Key,例如SecondKillMessageQueue)。
K (Key):消息中字段的类型(通常为String或Object)。
V (Value):消息中值的类型(通常为String或Object)。

MapRecord和Redis.Stream的对应关系

MapRecord.getStream()获取StreamKey,也就是Stream名称、Redis键名,例如SecondKillMessageQueue
MapRecord.getId()获取消息唯一标识,也就是消息ID,例如1778089608104-0
MapRecord.getValue()获取消息内容,即XADD发送的键值对

调用BeanUtil.fillBeanWithMap()方法,将Stream的Value(Map类型)映射到下单记录数据实体NewVoucherOrderVoucherOrderData.class类型)时,需保证前者的字段名和后者的属性名严格匹配,相关手段如下:

①确保字段名严格一致
②可以使用CopyOptions.create().setFieldMapping()方法封装MapBuilderMapBuilder使用MapUtil.builder(MapKeyName,ClassPropertyName).put(MapKeyName2,ClassPropertyName2)....build()方法来构建)来构建CopyOptions从而配置字段映射
③使用忽略大小写的转换方法BeanUtil.fillBeanWithMapIgnoreCase()


测试效果如下:

首先查看Redis和数据库状态:
image
image
image
image

秒杀10号优惠券
image

秒杀11号优惠券:
image

以10号为例,下单后Redis状态如下:
image
image
image

下单后数据库状态如下:
image
image

http://www.jsqmd.com/news/767354/

相关文章:

  • 3分钟快速破解Navicat密码:开源解密工具完整教程
  • ToRA:代码即推理,大语言模型数学解题新范式
  • 8 claude code的记忆系统-无向量数据库的轻量级智能
  • Nuvoton MG51系列8位8051微控制器解析与应用
  • “灰度图”到底是什么,以及它是如何与RGB原图联系起来
  • 用TensorFlow和PyTorch搞定视频动作识别:手把手教你搭建时空卷积网络(附完整代码)
  • 用Typst构建可编程简历:告别Word与LaTeX的排版新方案
  • Android WorkManager 全面讲解
  • AISMM模型不是万能钥匙?3类不可替代的传统规则引擎场景+混合架构设计图(附2024年金融AI模型淘汰预警清单)
  • R语言AI编程助手gpttools:无缝集成GPT能力,提升数据分析与开发效率
  • 秋天的第一顿大闸蟹,配什么酒才叫绝搭?
  • SQL 第二篇:表结构设计(为什么企业要拆成 3 张表)
  • 5分钟精通明日方舟基建全自动管理:告别繁琐手操,提升效率300%
  • 开源ChatGPT克隆项目实战:架构解析与私有化部署指南
  • 企业内部考试:题库治理比出题更重要
  • 基于DHCPv6的PC自动获取IP地址
  • 高效图片去重清理:AntiDupl.NET开源工具全面指南
  • 2026年智能化的自动去毛刺可靠供应商推荐 - 行业平台推荐
  • 终极指南:5分钟成为Switch游戏文件管理专家
  • 【研报A94】2026年智能原生研究报告:头部底座赋能,垂直场景深耕的新格局
  • 2026年知名宣传片制作公司实力盘点:谁是行业翘楚?
  • ARM Cortex-R82处理器架构与RAS机制详解
  • 基于Alpine的adhocore/phpfpm Docker镜像:生产环境PHP部署优化实践
  • Expo 快速上手
  • Google与英伟达下注!4个月估值40亿,Recursive自学习AI能否改写研究范式?
  • 国外 VPS 账号两步验证 2FA 丢失怎么找回
  • Intel两项关键人事任命:Alex Katouzian、Pushkar Ranade助力客户端计算与物理AI突破
  • 从“能用”到“好用”:优化EasyExcel导入体验,我做了这3件事(含性能考量)
  • C语言学习笔记 - 24.C编程预知识 - 常量以什么样的二进制代码存储在计算机中
  • Ollama桥接器:实现本地大模型与AI应用无缝对接的协议转换方案