当前位置: 首页 > news >正文

多线程并发处理样例

多线程并发处理样例目录

  • 优化说明
    • 伪代码
    • 设计思路
    • 关键优势总结
    • 样例

优化说明

1、动态核心线程数 (calculateCorePoolSize):

  • 数据量 ≤ 1000:2 倍 CPU 核心数
  • 数据量 ≤ 5000:4 倍 CPU 核心数
  • 数据量 ≤ 10000:6 倍 CPU 核心数
  • 数据量 ≤ 50000:8 倍 CPU 核心数
  • 数据量 > 50000:12 倍 CPU 核心数

2、动态批次大小 (calculateBatchSize):

  • 数据量越小,批次越大(减少批次数量)
  • 数据量越大,批次越小(增加并发度)
  • 从 1000 到 100 动态调整

3、增强的日志监控:

  • 记录线程池参数配置
  • 记录每批次的处理情况和耗时
  • 记录总体处理数据量

这样优化后,当数据量增加时,线程数会增加,批次会变小,从而提升并发处理量。

伪代码

// 1. 生成日志编号,用于追踪整个处理过程StringnowLogNo=IdUtil.randomUUID().replace("-","");// 2. 查询需要扣款的数据(待支付 + 失败 + 部分成功状态)List<实体java类>list=iMemberDeductionPlanIntervalService.lambdaQuery().and(r->r.eq(...,WAIT_PAY).or().eq(...,FAIL).or().eq(...,PARTIAL_SUCCESS)).list();// 3. 如果数据不为空,开始批量处理if(CollectionUtils.isNotEmpty(list)){// 4. 动态计算并发参数(核心优化点)inttotalSize=list.size();// 总数据量intbaseCores=CPU核心数;intcorePoolSize=calculateCorePoolSize(totalSize,baseCores);// 动态计算线程数intbatchSize=calculateBatchSize(totalSize,corePoolSize);// 动态计算批次大小// 5. 创建线程池ExecutorServiceexecutorService=newThreadPoolExecutor(...)// 6. 分批处理数据for(每个批次){CompletableFuture.runAsync(()->{batch.forEach(this::processPlanPaysAsync);// 处理每条数据},executorService);}// 7. 等待所有批次完成CompletableFuture.allOf(futures.toArray(...)).join();// 8. 关闭线程池executorService.shutdown();}privateintcalculateCorePoolSize(inttotalSize,intbaseCores){if(totalSize<=1000){returnbaseCores*2;}elseif(totalSize<=5000){returnbaseCores*4;}elseif(totalSize<=10000){returnbaseCores*6;}elseif(totalSize<=50000){returnbaseCores*8;}else{returnbaseCores*12;}}privateintcalculateBatchSize(inttotalSize,intcorePoolSize){if(totalSize<=1000){return1000;}elseif(totalSize<=5000){return500;}elseif(totalSize<=10000){return300;}elseif(totalSize<=50000){return200;}else{return100;}}

设计思路

1、calculateCorePoolSize(int totalSize, int baseCores) - 动态计算核心线程数

数据量 ≤1000→ 线程数=CPU核心数 ×2数据量 ≤5000→ 线程数=CPU核心数 ×4数据量 ≤10000→ 线程数=CPU核心数 ×6数据量 ≤50000→ 线程数=CPU核心数 ×8数据量>50000→ 线程数=CPU核心数 ×12
  • 数据量少时,使用较少线程,避免资源浪费
  • 数据量大时,增加线程数,提高并发处理能力
  • 假设 CPU 是 8 核,那么:
    • 1000 条数据 → 16 个线程
    • 50000 条数据 → 96 个线程

2、calculateBatchSize(int totalSize, int corePoolSize) - 动态计算批次大小

数据量 ≤1000→ 每批1000条(1批处理完) 数据量 ≤5000→ 每批500条(最多10批) 数据量 ≤10000→ 每批300条(最多34批) 数据量 ≤50000→ 每批200条(最多250批) 数据量>50000→ 每批100条(最多500+批)
  • 数据量小 → 大批次,减少分批次数量
  • 数据量大 → 小批次,增加并行度,避免单批次处理时间过长
  • 配合动态线程数,实现"数据量越大,并发越高"

3、选择 CallerRunsPolicy

  • 线程池配置选择
newThreadPoolExecutor(corePoolSize,// 核心线程数(动态计算)maxPoolSize,// 最大线程数 = 核心×2(动态)60L,TimeUnit.SECONDS,// 空闲线程存活时间newLinkedBlockingQueue<>(queueCapacity),// 有界队列(防止内存溢出)newThreadPoolExecutor.CallerRunsPolicy()// 拒绝策略:调用者运行)
  • 当线程池和队列都满时,由调用者线程执行任务
  • 不会丢失任务,也不会抛出异常
  • 自动降低提交速度,起到背压(back pressure)作用

关键优势总结

✅ 自适应并发:数据量越大,线程数越多,并发越高
✅ 内存保护:使用有界队列,防止 OOM
✅ 细粒度监控:每批次都有日志追踪
✅ 优雅关闭:确保所有任务完成后再关闭线程池
✅ 容错机制:单个批次失败不影响其他批次

样例

@AsyncpublicvoidmemberDeductionPlanPaysAsync(){StringnowLogNo=IdUtil.randomUUID().replace("-","");// 查看处理时间日志编号// 执行批扣数据List<MemberPlanInterval>list=iMemberPlanIntervalService.lambdaQuery().and(r->r.eq(MemberPlanInterval::getStatus,PayChannelConst.orderStatus.WAIT_PAY).or().eq(MemberPlanInterval::getStatus,PayChannelConst.orderStatus.FAIL).or().eq(MemberPlanInterval::getStatus,PayChannelConst.orderStatus.PARTIAL_SUCCESS)).list();if(CollectionUtils.isNotEmpty(list)){logger.info("日志编号:{},开始执行扣款计划 - 定时任务批扣触发(待扣数据 + 历史失败数据 + 路由模式),执行扣款开始时间:{}",nowLogNo,DateUtil.getDateNow());inttotalSize=list.size();intbaseCores=Runtime.getRuntime().availableProcessors();// 根据数据量动态计算并发参数intcorePoolSize=calculateCorePoolSize(totalSize,baseCores);intmaxPoolSize=corePoolSize*2;intbatchSize=calculateBatchSize(totalSize,corePoolSize);intqueueCapacity=batchSize*2;logger.info("日志编号:{},数据总量:{},核心线程数:{},最大线程数:{},批次大小:{},队列容量:{}",nowLogNo,totalSize,corePoolSize,maxPoolSize,batchSize,queueCapacity);ExecutorServiceexecutorService=newThreadPoolExecutor(corePoolSize,maxPoolSize,60L,TimeUnit.SECONDS,newLinkedBlockingQueue<>(queueCapacity),newThreadPoolExecutor.CallerRunsPolicy());List<CompletableFuture<Void>>futures=newArrayList<>();inttotalBatches=(int)Math.ceil((double)totalSize/batchSize);logger.info("日志编号:{},总批次数:{}",nowLogNo,totalBatches);for(inti=0;i<totalBatches;i++){intstartIndex=i*batchSize;intendIndex=Math.min(startIndex+batchSize,totalSize);List<MemberPlanInterval>batch=list.subList(startIndex,endIndex);finalintbatchIndex=i+1;CompletableFuture<Void>future=CompletableFuture.runAsync(()->{try{logger.debug("日志编号:{},批次 {} 开始处理,数据量:{}",nowLogNo,batchIndex,batch.size());longstartTime=System.currentTimeMillis();// 处理方法processPlanPaysAsyncbatch.forEach(this::processPlanPaysAsync);longendTime=System.currentTimeMillis();logger.debug("日志编号:{},批次 {} 处理完成,耗时:{}ms",nowLogNo,batchIndex,(endTime-startTime));}catch(Exceptione){logger.error("日志编号:{},批次 {} 处理失败:{}",nowLogNo,batchIndex,e.getMessage(),e);}},executorService);futures.add(future);}CompletableFuture.allOf(futures.toArray(newCompletableFuture[0])).join();executorService.shutdown();try{if(!executorService.awaitTermination(1,TimeUnit.HOURS)){executorService.shutdownNow();}}catch(InterruptedExceptione){executorService.shutdownNow();Thread.currentThread().interrupt();}logger.info("日志编号:{},执行扣款计划 - 定时任务批扣触发(待扣数据 + 历史失败数据 + 路由模式),执行扣款结束时间:{},总处理数据量:{}",nowLogNo,DateUtil.getDateNow(),totalSize);}}privateintcalculateCorePoolSize(inttotalSize,intbaseCores){if(totalSize<=1000){returnbaseCores*2;}elseif(totalSize<=5000){returnbaseCores*4;}elseif(totalSize<=10000){returnbaseCores*6;}elseif(totalSize<=50000){returnbaseCores*8;}else{returnbaseCores*12;}}privateintcalculateBatchSize(inttotalSize,intcorePoolSize){if(totalSize<=1000){return1000;}elseif(totalSize<=5000){return500;}elseif(totalSize<=10000){return300;}elseif(totalSize<=50000){return200;}else{return100;}}
http://www.jsqmd.com/news/474887/

相关文章:

  • 设计模式的六大原则:原理与实践
  • ESP32-C61总线与内存访问监控系统深度解析
  • ComicAI vs 传统漫画制作:实测AI生成30页漫画要花多少法力值?
  • OpenCV实战:SIFT特征提取在图像匹配中的关键应用
  • 简单使用Linux
  • STM32L1调试控制与设备电子签名深度解析
  • Oracle【实战指南】19c ADG容灾配置与同步模式深度解析
  • 避坑指南:Spring Data Redis 2.6.2升级后GEO功能失效的解决方案
  • Unity 2021.3.6f1项目实战:HybridCLR热更新从零配置到避坑指南
  • 零基础玩转Image-to-Video:手把手教你一键生成动态视频
  • 议程公布 | 智能车载音频专题论坛将于3月25-26日举办
  • 《Kubernetes故障篇: kubelet 证书实现自动续签》
  • Qwen3-8B惊艳案例:生成创意故事和复杂逻辑推理实测
  • 《QGIS快速入门与应用基础》216:项目→布局管理器
  • Linux - 基础IO【下】
  • UR机器人通信端口全解析:从Modbus TCP到Dashboard的实战避坑指南
  • 云容笔谈解决403 Forbidden错误:API访问权限与配置详解
  • JavaScript 设计模式分类与应用实践
  • Markdown中同时使用了TOC与HTML锚点后,锚点无效解决方法
  • 通义千问1.5-1.8B-Chat-GPTQ-Int4实战:自动化作业批改与个性化反馈生成
  • 2026年洗车槽生产厂家盘点!钢制洗车槽厂家/工地洗车池厂家推荐/洗车槽租赁推荐/工地洗车槽厂家推荐:宁波玖鼎领衔 - 栗子测评
  • 5分钟搞定Arduino IDE+ESP32开发环境(最新2.0.9版)
  • 当水泥浆遇上随机裂隙:COMSOL里的流动艺术
  • 2026年知名的增强剂公司推荐:防水增强剂直销厂家推荐 - 品牌宣传支持者
  • 2026年长沙天心区足疗养生品牌评测与选型指南 - 2026年企业推荐榜
  • Prius 2004永磁同步电机设计报告:包含磁路法、Maxwell有限元法建模与仿真、Mot...
  • Allegro PCB设计必备:3分钟搞定中文字体导入(附BMP2Allegro工具包)
  • 从零到一:实战加固Hadoop集群,封堵未授权访问风险
  • Google Images API 调用实战:从零开始获取图片数据的完整指南
  • 智慧铁路AI巡检数据集 铁路紧固件识别 铁路紧固件缺失识别 扣件图像识别 yolo数据集第10547期