霸王餐CPS系统中Java实现异步化处理提升系统吞吐量的技巧
霸王餐CPS系统中Java实现异步化处理提升系统吞吐量的技巧
在“霸王餐”CPS系统中,订单回调、佣金计算、用户通知、数据上报等操作若全部同步执行,将导致接口响应时间长、线程阻塞、系统吞吐量低下。通过合理异步化非核心链路,可将主流程RT从数百毫秒降至数十毫秒,显著提升系统并发能力。本文结合CompletableFuture、消息队列、自定义线程池等技术,提供可落地的异步处理方案。
1. 使用CompletableFuture解耦非关键路径
将日志、埋点、通知等操作异步执行:
packagebaodanbao.com.cn.cps.async;importorg.springframework.scheduling.annotation.Async;importorg.springframework.stereotype.Service;@ServicepublicclassOrderCallbackService{privatefinalExecutorServiceioTaskExecutor=Executors.newFixedThreadPool(20,newThreadFactoryBuilder().setNamePrefix("io-task-").build());publicCommissionResulthandleCallback(OrderCallbackDTOdto){// 1. 核心逻辑:校验订单并计算佣金(同步)CommissionResultresult=commissionCalculator.calculate(dto);// 2. 异步:记录审计日志CompletableFuture.runAsync(()->{baodanbao.com.cn.cps.audit.AuditLogger.log(dto,result);},ioTaskExecutor);// 3. 异步:推送用户消息CompletableFuture.runAsync(()->{baodanbao.com.cn.cps.notify.MessageSender.send(dto.getUserId(),"佣金已到账");},ioTaskExecutor);// 4. 异步:上报第三方BI系统CompletableFuture.runAsync(()->{baodanbao.com.cn.cps.export.BiReporter.report(result);},ioTaskExecutor);returnresult;}}2. 自定义线程池避免共用Tomcat线程
防止异步任务阻塞HTTP工作线程:
@ConfigurationpublicclassAsyncConfig{@Bean("asyncIoTaskExecutor")publicExecutorServiceasyncIoTaskExecutor(){returnnewThreadPoolExecutor(10,50,60L,TimeUnit.SECONDS,newLinkedBlockingQueue<>(1000),newThreadFactoryBuilder().setNamePrefix("async-io-").build(),newThreadPoolExecutor.CallerRunsPolicy()// 队列满时由调用线程执行,避免丢弃);}}注入使用:
@ServicepublicclassAsyncCommissionService{@Autowired@Qualifier("asyncIoTaskExecutor")privateExecutorServiceexecutor;publicvoidprocessAsync(Orderorder){CompletableFuture.supplyAsync(()->{returnbaodanbao.com.cn.cps.rule.RuleEngine.evaluate(order);},executor).thenAccept(result->{baodanbao.com.cn.cps.storage.CommissionRepository.save(result);});}}3. 基于RocketMQ实现最终一致性异步
将强依赖转为消息驱动,提升可用性:
@ComponentpublicclassOrderEventPublisher{@AutowiredprivateRocketMQTemplaterocketMQTemplate;publicvoidpublishOrderReceived(Orderorder){OrderEventevent=newOrderEvent(order.getOrderId(),order.getUserId(),EventType.CREATED);Message<OrderEvent>msg=MessageBuilder.withPayload(event).build();rocketMQTemplate.send("ORDER_EVENT_TOPIC",msg);}}// 消费者@RocketMQMessageListener(topic="ORDER_EVENT_TOPIC",consumerGroup="cps-group")@ComponentpublicclassOrderEventListenerimplementsRocketMQListener<OrderEvent>{@OverridepublicvoidonMessage(OrderEventevent){switch(event.getType()){caseCREATED:// 异步计算佣金baodanbao.com.cn.cps.commission.CommissionService.calculateAsync(event.getOrderId());break;casePAID:// 异步发放奖励baodanbao.com.cn.cps.reward.RewardService.grant(event.getUserId());break;}}}4. 异步批量写入提升DB吞吐
避免单条INSERT,聚合后批量入库:
@ComponentpublicclassBatchCommissionWriter{privatefinalBlockingQueue<CommissionRecord>queue=newLinkedBlockingQueue<>(10000);privatefinalScheduledExecutorServicescheduler=Executors.newSingleThreadScheduledExecutor();@PostConstructpublicvoidstartBatchWriter(){scheduler.scheduleAtFixedRate(this::flushBatch,1,1,TimeUnit.SECONDS);}publicvoidenqueue(CommissionRecordrecord){try{queue.offer(record,100,TimeUnit.MILLISECONDS);// 超时丢弃防阻塞}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}privatevoidflushBatch(){if(queue.isEmpty())return;List<CommissionRecord>batch=newArrayList<>(500);queue.drainTo(batch,500);if(!batch.isEmpty()){baodanbao.com.cn.cps.mapper.CommissionMapper.batchInsert(batch);}}}调用方:
publicvoidonCommissionCalculated(CommissionRecordrecord){batchCommissionWriter.enqueue(record);// 立即返回,不等待DB}5. 异步上下文传递(MDC/TraceID)
确保日志链路可追踪:
publicclassTraceableTaskDecoratorimplementsTaskDecorator{@OverridepublicRunnabledecorate(Runnablerunnable){StringtraceId=MDC.get("traceId");return()->{if(traceId!=null){MDC.put("traceId",traceId);}try{runnable.run();}finally{MDC.clear();}};}}// 在AsyncConfigurer中注册@OverridepublicExecutorgetAsyncExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();executor.setTaskDecorator(newTraceableTaskDecorator());executor.initialize();returnexecutor;}6. 避免异步任务中的异常吞噬
显式处理异常,防止静默失败:
CompletableFuture.runAsync(()->{baodanbao.com.cn.cps.notify.EmailSender.send(user,content);},executor).exceptionally(ex->{baodanbao.com.cn.cps.monitor.AlertService.sendAlert("Email send failed: "+ex.getMessage());returnnull;});本文著作权归 俱美开放平台 ,转载请注明出处!
