当前位置: 首页 > news >正文

高效异步处理:基于RocketMQ的消费系统架构全解析

基于 RocketMQ 的考勤统计批量处理系统核心实现

基于 RocketMQ 的考勤统计批量处理系统核心实现

一、消息实体定义

二、生产者服务实现

三、消费者监听器实现

四、线程池配置

五、业务服务实现

六、配置文件

七、Maven依赖

八、关键设计要点

8.1 生产者发送策略

8.2 消费者处理策略

8.3 配置优化

一、消息实体定义

package jnpf.model.attendance.event; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; import java.util.Date; import java.util.List; /** * 考勤统计批量处理数据传输对象 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class AttendanceStatisticsBatchDto { /** * 租户ID */ @NotBlank(message = "租户Id不能为空") private String tenantId; /** * 考勤组ID */ @NotBlank(message = "考勤组Id不能为空") private String groupId; /** * 用户ID集合 */ @NotEmpty(message = "用户Id集合不能为空") private List<String> userIdList; /** * 统计日期 */ @NotNull(message = "日期不能为空") private Date day; }

二、生产者服务实现

package jnpf.attendance.service.impl; import jnpf.constants.MessageTopicConstants; import jnpf.model.attendance.event.AttendanceStatisticsBatchDto; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; import com.alibaba.fastjson.JSON; /** * 考勤统计消息生产者服务实现 */ @Slf4j @Service public class AttendanceStatisticsProducerServiceImpl { @Resource private RocketMQTemplate rocketMqTemplate; /** * 同步发送批量统计消息 * * @param dto 统计参数 * @return 发送结果 */ public SendResult sendBatchStatisticsMessage(AttendanceStatisticsBatchDto dto) { String messageStr = JSON.toJSONString(dto); Message<String> message = MessageBuilder.withPayload(messageStr) .setHeader("tenantId", dto.getTenantId()) .setHeader("msgType", "ATTENDANCE_STATISTICS_BATCH") .setHeader("sendTime", System.currentTimeMillis()) .build(); log.info("发送考勤统计批量消息: tenantId={}, groupId={}, 用户数={}, 日期={}", dto.getTenantId(), dto.getGroupId(), dto.getUserIdList().size(), dto.getDay()); return rocketMqTemplate.syncSend(MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_TOPIC, message); } /** * 异步发送批量统计消息 * * @param dto 统计参数 */ public void sendBatchStatisticsMessageAsync(AttendanceStatisticsBatchDto dto) { String messageStr = JSON.toJSONString(dto); Message<String> message = MessageBuilder.withPayload(messageStr) .setHeader("tenantId", dto.getTenantId()) .setHeader("msgType", "ATTENDANCE_STATISTICS_BATCH_ASYNC") .setHeader("sendTime", System.currentTimeMillis()) .build(); log.info("异步发送考勤统计批量消息: tenantId={}, 用户数={}", dto.getTenantId(), dto.getUserIdList().size()); rocketMqTemplate.asyncSend(MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("异步消息发送成功: msgId={}", sendResult.getMsgId()); } @Override public void onException(Throwable throwable) { log.error("异步消息发送失败: tenantId={}, error={}", dto.getTenantId(), throwable.getMessage()); } }); } /** * 发送单向消息(不关心发送结果) * * @param dto 统计参数 */ public void sendBatchStatisticsMessageOneWay(AttendanceStatisticsBatchDto dto) { String messageStr = JSON.toJSONString(dto); Message<String> message = MessageBuilder.withPayload(messageStr) .setHeader("tenantId", dto.getTenantId()) .setHeader("msgType", "ATTENDANCE_STATISTICS_BATCH_ONEWAY") .setHeader("sendTime", System.currentTimeMillis()) .build(); log.debug("单向发送考勤统计批量消息: tenantId={}, 用户数={}", dto.getTenantId(), dto.getUserIdList().size()); rocketMqTemplate.sendOneWay(MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_TOPIC, message); } /** * 发送带延迟的批量统计消息 * * @param dto 统计参数 * @param delayLevel 延迟级别 * @return 发送结果 */ public SendResult sendBatchStatisticsMessageWithDelay(AttendanceStatisticsBatchDto dto, int delayLevel) { String messageStr = JSON.toJSONString(dto); Message<String> message = MessageBuilder.withPayload(messageStr) .setHeader("tenantId", dto.getTenantId()) .setHeader("msgType", "ATTENDANCE_STATISTICS_BATCH_DELAY") .setHeader("sendTime", System.currentTimeMillis()) .build(); log.info("发送延迟考勤统计批量消息: tenantId={}, delayLevel={}, 用户数={}", dto.getTenantId(), delayLevel, dto.getUserIdList().size()); return rocketMqTemplate.syncSend(MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_TOPIC, message, 3000, delayLevel); } }

三、消费者监听器实现

package jnpf.attendance.event; import cn.hutool.core.collection.CollUtil; import cn.hutool.json.JSONUtil; import jnpf.attendance.service.AttendanceDayStatisticsService; import jnpf.constants.MessageTopicConstants; import jnpf.model.attendance.event.AttendanceStatisticsBatchDto; import jnpf.util.StringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 考勤统计批量消息消费者 */ @Slf4j @Component @RocketMQMessageListener( topic = MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_TOPIC, consumerGroup = MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_CONSUMER_GROUP, consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadNumber = 2, replyTimeout = 600000, maxReconsumeTimes = 1) public class StatisticsBatchMQListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Resource(name = "ioIntensiveThreadPool") private ThreadPoolExecutor threadPoolExecutor; @Resource private AttendanceDayStatisticsService attendanceDayStatisticsService; @Override public void onMessage(String message) { log.info("接收到批量生成用户日统计消息,参数为:{}", message); // 1. 空消息检查 if (StringUtil.isEmpty(message)) { log.warn("接收到空消息,跳过处理"); return; } // 2. JSON反序列化 AttendanceStatisticsBatchDto batchDto; try { batchDto = JSONUtil.toBean(message, AttendanceStatisticsBatchDto.class); } catch (Exception e) { log.error("消息反序列化失败,消息内容: {}", message, e); return; } // 3. 参数完整性验证 if (Objects.isNull(batchDto) || CollUtil.isEmpty(batchDto.getUserIdList()) || StringUtil.isEmpty(batchDto.getTenantId()) || StringUtil.isEmpty(batchDto.getGroupId()) || Objects.isNull(batchDto.getDay())) { log.error("消费批量生成用户日统计消息失败,消息格式不正确,消息:{}", message); return; } log.info("开始处理考勤统计批量消息: tenantId={}, groupId={}, 用户数={}, 日期={}", batchDto.getTenantId(), batchDto.getGroupId(), batchDto.getUserIdList().size(), batchDto.getDay()); // 4. 异步延迟执行 CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS, threadPoolExecutor).execute(() -> { try { // 5. 调用业务服务执行统计计算 attendanceDayStatisticsService.batchStatisticChange(batchDto); log.info("考勤统计批量处理完成: tenantId={}, 用户数={}", batchDto.getTenantId(), batchDto.getUserIdList().size()); } catch (Exception ex) { log.error("批量生成用户日统计消息消费失败,参数:{} 错误:{}", message, ex.getMessage(), ex); throw new RuntimeException("批量生成用户日统计消息消费失败,触发重试", ex); } }); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 优化消费参数 consumer.setPullBatchSize(16); // 每次拉取的消息数量 consumer.setConsumeMessageBatchMaxSize(1); // 每次批量消费的消息数 consumer.setPullInterval(3000); // 拉取间隔3秒 consumer.setConsumeTimeout(10 * 60 * 1000); // 消费超时10分钟 // 设置消费起始位置(从上次消费的位置继续) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 调整消费线程池 consumer.setConsumeThreadMin(2); consumer.setConsumeThreadMax(4); log.info("RocketMQ消费者初始化完成: topic={}, group={}", MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_TOPIC, MessageTopicConstants.ATTENDANCE_STATISTICS_BATCH_CONSUMER_GROUP); } }

四、线程池配置

package jnpf.attendance.bean; import org.jetbrains.annotations.NotNull; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 线程池配置类 * 根据任务类型(CPU密集、IO密集、混合)创建合适的线程池。 */ @Configuration public class FtbThreadPoolExecutor { // 队列边界 private static final int QUEUE_CAPACITY = 200; // 线程存活时间 s(秒) private static final long KEEP_ALIVE_TIME = 15L; private static final String THREAD_NAME_PREFIX = "FTB服务-考勤模块"; @Bean(name = "attendanceRuleThreadPool", destroyMethod = "shutdown") public ThreadPoolExecutor attendanceRuleThreadPool() { return createNamedThreadPool("attendance-rule", TaskType.IO_INTENSIVE); } @Bean(name = "cpuIntensiveThreadPool", destroyMethod = "shutdown") public ThreadPoolExecutor cpuIntensiveThreadPool() { return createThreadPool(TaskType.CPU_INTENSIVE); } @Bean(name = "ioIntensiveThreadPool", destroyMethod = "shutdown") public ThreadPoolExecutor ioIntensiveThreadPool() { return createThreadPool(TaskType.IO_INTENSIVE); } @Bean(name = "mixedThreadPool", destroyMethod = "shutdown") public ThreadPoolExecutor mixedThreadPool() { return createThreadPool(TaskType.MIXED); } private ThreadPoolExecutor createNamedThreadPool(String name, TaskType type) { int actualProcessors = Math.min(Runtime.getRuntime().availableProcessors(), 42); int runTime, cpuTime; switch (type) { case CPU_INTENSIVE: runTime = 10; cpuTime = 1; break; case IO_INTENSIVE: runTime = 5; cpuTime = 1; break; case MIXED: default: runTime = 50; cpuTime = 1; break; } int maxPoolSize = calculateMaxPoolSize(actualProcessors, runTime, cpuTime, type); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); return new ThreadPoolExecutor( actualProcessors, maxPoolSize, KEEP_ALIVE_TIME, TimeUnit.SECONDS, queue, new ThreadFactory() { private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); private final AtomicInteger count = new AtomicInteger(1); @Override public Thread newThread(@NotNull Runnable r) { Thread thread = defaultFactory.newThread(r); thread.setName("FTB服务-考勤模块-" + name + "-thread-" + count.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy() ); } private ThreadPoolExecutor createThreadPool(TaskType type) { // 限制为实际核心数 int actualProcessors = Math.min(Runtime.getRuntime().availableProcessors(), 42); int runTime, cpuTime; switch (type) { case CPU_INTENSIVE: runTime = 10; cpuTime = 1; break; case IO_INTENSIVE: runTime = 5; cpuTime = 1; break; case MIXED: default: runTime = 50; cpuTime = 1; break; } int maxPoolSize = calculateMaxPoolSize(actualProcessors, runTime, cpuTime, type); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); return new ThreadPoolExecutor( actualProcessors, maxPoolSize, KEEP_ALIVE_TIME, TimeUnit.SECONDS, queue, new ThreadFactory() { private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); private final AtomicInteger count = new AtomicInteger(1); @Override public Thread newThread(@NotNull Runnable r) { Thread thread = defaultFactory.newThread(r); thread.setName(THREAD_NAME_PREFIX + "-" + type.name().toLowerCase() + "-thread-" + count.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy() ); } /** * 根据公式:最佳线程数 = N * (1 + (WT / ST)) * 其中 WT = runTime - cpuTime,即线程等待时间 * * @param availableProcessors CPU核心数 * @param runTime 线程总运行时间(单位可自定,如毫秒) * @param cpuTime 线程CPU计算时间 * @return 最佳线程池大小 */ private int calculateMaxPoolSize(int availableProcessors, int runTime, int cpuTime, TaskType type) { if (cpuTime <= 0) { throw new IllegalArgumentException("参数不合法:cpuTime 应大于 0"); } if (type == TaskType.IO_INTENSIVE) { // I/O 密集型:4 倍核心数,上限 200 return Math.min(availableProcessors * 4, 200); } else if (type == TaskType.CPU_INTENSIVE) { // CPU 密集型:等于核心数 return availableProcessors; } else { // 混合型 long ratio = 1L + (long) runTime / cpuTime; long calculated = (long) availableProcessors * ratio; return (int) Math.min(calculated, 200); } } public enum TaskType { CPU_INTENSIVE, IO_INTENSIVE, MIXED } }

五、业务服务实现

java package jnpf.attendance.service.impl; import jnpf.model.attendance.event.AttendanceStatisticsBatchDto; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.Date; import java.util.List; /** * 考勤日统计服务实现 */ @Slf4j @Service public class AttendanceDayStatisticsServiceImpl { /** * 处理批量统计消息 * * @param batchDto 统计参数 */ @Transactional(rollbackFor = Exception.class) public void batchStatisticChange(AttendanceStatisticsBatchDto batchDto) { log.info("开始执行考勤统计批量处理: tenantId={}, 用户数={}", batchDto.getTenantId(), batchDto.getUserIdList().size()); // 这里实现实际的考勤统计逻辑 for (String userId : batchDto.getUserIdList()) { // 执行单个用户的考勤统计 calculateUserAttendance(batchDto.getTenantId(), userId, batchDto.getDay(), batchDto.getGroupId()); } log.info("考勤统计批量处理完成: tenantId={}, 用户数={}", batchDto.getTenantId(), batchDto.getUserIdList().size()); } /** * 计算单个用户考勤统计 */ private void calculateUserAttendance(String tenantId, String userId, Date day, String groupId) { // 实现具体的考勤统计计算逻辑 // 1. 查询用户当天的考勤记录 // 2. 根据考勤规则计算工时、迟到、早退等 // 3. 保存统计结果到数据库 } }

六、配置文件

application.yml:

rocketmq: name-server: ${ROCKETMQ_NAME_SERVER:192.168.3.24:30094} producer: group: ${ROCKETMQ_PRODUCER_GROUP:fantaibao-producer-group} send-message-timeout: ${ROCKETMQ_SEND_MESSAGE_TIMEOUT:30000} max-message-size: ${ROCKETMQ_MAX_MESSAGE_SIZE:8388608}

消息主题常量:

package jnpf.constants; public interface MessageTopicConstants { // 考勤统计批量生成 String ATTENDANCE_STATISTICS_BATCH_TOPIC = "attendance-statistics-batch-topic"; String ATTENDANCE_STATISTICS_BATCH_CONSUMER_GROUP = "attendance-statistics-batch-consumer-group"; }

七、Maven依赖

<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2022.0.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.32</version> </dependency>

八、关键设计要点

8.1 生产者发送策略

  1. 同步发送:需要确认发送结果的场景

  2. 异步发送:高性能场景,不阻塞主线程

  3. 单向发送:日志记录等不关心结果的场景

  4. 延迟发送:定时触发或削峰填谷

8.2 消费者处理策略

  1. 异步处理:使用线程池异步执行,避免阻塞消费线程

  2. 延迟执行:2秒延迟实现削峰填谷

  3. 参数校验:三层校验确保消息有效性

  4. 异常处理:异常抛出触发重试机制

8.3 配置优化

  1. 消费线程数:根据CPU核心数合理设置

  2. 拉取参数:平衡拉取频率和消息堆积

  3. 超时设置:根据业务处理时间设置合理超时

  4. 重试策略:设置最大重试次数避免无限重试

这个实现提供了完整的基于RocketMQ的考勤统计批量处理方案,包含生产者、消费者、线程池配置和业务服务实现,可以直接在项目中集成使用。

http://www.jsqmd.com/news/370760/

相关文章:

  • 完整教程:Kafka分布式流处理平台简介
  • ⭐⭐⭐⭐⭐满分推荐!万伯双膜储气柜稳居行业领先的核心原因
  • 2026除湿机选购必看:从家用静音到工业防爆,这份榜单帮你精准匹配 - 深度智识库
  • 【C++创新实践】我发明的伪关键字控制编程思想,用空结构体+重载简化语义化开发
  • 探索随机森林:降维、特征选择与重要性排序
  • ‌智慧迎新系统让开学报到更轻松,告别排队烦恼!
  • 无人值守的停车系统的设计与实现
  • CSS+SVG实现御坂美琴主题电流边框卡片(附完整源码)
  • 基于51单片机的停车场车位管理系统设计与实现
  • perf火焰图-2-内核文档翻译 - Hello
  • 2026年重庆汽车租赁厂家权威榜单 口碑优质实力强劲 适配多场景出行需求 - 深度智识库
  • 基于PLC控制技术的智能车库管理系统设计
  • 镇江代运营服务商测评:头部企业实力排行 - 野榜数据排行
  • 2025年正规的土耳其移民中介推荐TOP3排行榜 - 行业观察日记
  • MySQL 大小写敏感配置全解析:lower_case_table_names 与 collation 详解
  • 汽车LED前照灯自动切换系统的研究与实现
  • 《我把回家过年做成一个项目:软考高项知识全景实践》
  • 元保超医保冠名《你好,急诊医生》!用科技赋能保险,让每个家庭都拥有保障 - 包罗万闻
  • 元保“守护保百万重疾险”:普惠赋能,筑牢重疾保障屏障 - 包罗万闻
  • 先建“语义基座”,再谈运维智能!阿里云以 Operation Intelligence 定义 AIOps 新范式
  • 2026儿童防晒就看这篇测评指南:这些优质单品值得闭眼入 - 速递信息
  • 元保“守护保百万重疾险”:靠谱实用的安心保障之选 - 包罗万闻
  • 2026年国内公职类面试机构深度解析:聚焦师资与实效的理性选择 - 深度智识库
  • 元保保险电话号码:覆盖全周期保险服务的价值载体 - 包罗万闻
  • 2026全国优秀家装设计师盘点 装修/室内/别墅设计领域标杆介绍 - 深度智识库
  • 2026年1月国产数据库大事记:国开行2822万采购Gbase,浙商银行930万采购GoldenDB,墨天轮发布“2025年度数据库”……
  • 富滇银行基于 OceanBase 实现从TP到HTAP,百年“老字号”炼就数字引擎
  • 2026深圳美国本科留学中介哪家好?高端美本留学申请定制服务机构推荐 - 品牌2025
  • 2026年不锈钢黑棒优选厂商,品质之选助力项目升级,不锈钢薄壁板/不锈钢毛细管/2205不锈钢板,不锈钢黑棒现货批发推荐 - 品牌推荐师
  • 开年测评:这家做得好的墙面艺术漆厂家产品亮点多不多?诺兰迪艺术漆/艺术肌理漆/艺术漆,墙面艺术漆品牌有哪些 - 品牌推荐师