当前位置: 首页 > news >正文

下面设计实现的是:交换机Hlr指令处理任务模块。当然,在后续的业务发展过程中,还可能出现,其他类型指令的任务处理,所以根据“开闭”原则的定义,要抽象出一个接口类:BusinessEvent

  1. /** * @filename:BusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * @Description:业务事件任务接口定义 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; public interface BusinessEvent { // 执行具体批处理的任务 public int execute(Integer userId); }

    然后具体的Hlr指令发送任务模块HlrBusinessEvent要实现这个接口类的方法,完成用户停复机Hlr指令的派发。代码如下:

    /** * @filename:HlrBusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派发任务接口定义 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.apache.commons.lang.math.RandomUtils; public class HlrBusinessEvent implements BusinessEvent { // 交换机上的指令执行成功失败标识0表示成功 1表示失败 public final static int TASKSUCC = 0; public final static int TASKFAIL = 1; private final static int ELAPSETIME = 1000; @Override public int execute(Integer userId) { // 这里为了举例,随机产生1000以内的随机数 int millis = RandomUtils.nextInt(ELAPSETIME); // 简单模拟往交换机发送停机/复机的指令 try { Thread.sleep(millis); String strContent = String.format( "线程标识[%s]用户标识:[%d]执行交换机指令工单耗时:[%d]毫秒", Thread .currentThread().getName(), userId, millis); System.out.println(strContent); // 这里为了演示直接简单根据随机数是不是偶数简单模拟交换机指令执行的结果 return (millis % 2 == 0) ? TASKSUCC : TASKFAIL; } catch (InterruptedException e) { e.printStackTrace(); return TASKFAIL; } } }

    实际运行情况中,我们可能要监控一下指令发送的时长,于是再设计一个:针对Hlr指令发送任务模块HlrBusinessEvent,切面嵌入代理的Hlr指令时长计算代理类:HlrBusinessEventAdvisor,具体的代码如下:

    /** * @filename:HlrBusinessEventAdvisor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派发时长计算代理类 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.lang.time.StopWatch; public class HlrBusinessEventAdvisor implements MethodInterceptor { public HlrBusinessEventAdvisor() { } @Override public Object invoke(MethodInvocation invocation) throws Throwable { // 计算一下指令派发时长 StopWatch sw = new StopWatch(); sw.start(); Object obj = invocation.proceed(); sw.stop(); System.out.println("执行交换机指令工单耗时: [" + sw.getTime() + "] 毫秒"); return obj; } }

    剩下的,我们由于是要,异步并行计算得到执行结果,于是我们设计一个:批处理Hlr任务执行模块HlrBusinessEventTask,它要实现java.util.concurrent.Callable接口的方法call,它会返回一个异步任务的执行结果。

    /** * @filename:HlrBusinessEventTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派任务执行类 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import java.util.concurrent.Callable; import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; public class HlrBusinessEventTask implements Callable<Integer> { private NotifyUsers user = null; private final static String MAPPERMETHODNAME = "execute"; public HlrBusinessEventTask(NotifyUsers user) { this.user = user; } @Override public Integer call() throws Exception { synchronized (this) { ProxyFactory weaver = new ProxyFactory(new HlrBusinessEvent()); NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(); advisor.setMappedName(MAPPERMETHODNAME); advisor.setAdvice(new HlrBusinessEventAdvisor()); weaver.addAdvisor(advisor); BusinessEvent proxyObject = (BusinessEvent) weaver.getProxy(); Integer result = new Integer(proxyObject.execute(user.getUserId())); // 返回执行结果 return result; } } }

  2. 接下来,我们要把并行异步加载的查询结果,和并行异步处理任务执行的模块,给它组合起来使用,故重新封装一个,通知用户批处理任务管理类模块:NotifyUsersBatchTask。它的主要功能是:批量并行异步加载查询待停复机的手机用户,然后把它放入并行异步处理的线程池中,进行异步处理。然后我们打印出,本次批处理的任务一共有多少,成功数和失败数分别是多少(当然,本文还给出了另外一种JMX方式的监控)。NotifyTaskSuccCounter类,主要是统计派发的任务中执行成功的任务的数量,而与之相对应的类NotifyTaskFailCounter,是用来统计执行失败的任务的数量。具体的代码如下

    /** * @filename:NotifyUsersBatchTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:通知用户批处理任务管理类 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.IfClosure; import org.apache.commons.lang.StringUtils; import newlandframework.batchtask.jmx.BatchTaskMonitor; import newlandframework.batchtask.model.NotifyUsers; import newlandframework.batchtask.parallel.BatchQueryLoader; import newlandframework.batchtask.parallel.BatchTaskReactor; public class NotifyUsersBatchTask { public NotifyUsersBatchTask() { } private ArrayList<DataSource> dataSource; // 基于JMX的任务完成情况监控计数器 private BatchTaskMonitor monitor = new BatchTaskMonitor(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); // 支持同时加载多个数据源 public NotifyUsersBatchTask(ArrayList<DataSource> dataSource) { this.dataSource = dataSource; } // 批处理任务执行成功计数器 class NotifyTaskSuccCounter implements Closure { public static final String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER"; private int numberSucc = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKSUCCCOUNTER); numberSucc++; } public int getSuccNumber() { return numberSucc; } } // 批处理任务执行失败计数器 class NotifyTaskFailCounter implements Closure { public static final String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER"; private int numberFail = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKFAILCOUNTER); numberFail++; } public int getFailNumber() { return numberFail; } } // 并行加载查询多个水平分库的数据集合 public List<NotifyUsers> query() throws SQLException { BatchQueryLoader loader = new BatchQueryLoader(); String strSQL = "select home_city, msisdn, user_id from notify_users"; for (int i = 0; i < dataSource.size(); i++) { Connection con = dataSource.get(i).getConnection(); Statement st = con.createStatement(); loader.attachLoadEnv(strSQL, st, con); } List<ResultSet> list = loader.executeQuery(); System.out.println("查询出记录总数为:" + list.size()); final List<NotifyUsers> listNotifyUsers = new ArrayList<NotifyUsers>(); for (int i = 0; i < list.size(); i++) { ResultSet rs = list.get(i); while (rs.next()) { NotifyUsers users = new NotifyUsers(); users.setHomeCity(rs.getInt(1)); users.setMsisdn(rs.getInt(2)); users.setUserId(rs.getInt(3)); listNotifyUsers.add(users); } } // 释放连接资源 loader.close(); return listNotifyUsers; } // 批处理数据集合,任务分派 public void batchNotify(List<NotifyUsers> list, final ExecutorService excutor) { System.out.println("处理记录总数为:" + list.size()); System.out.println(StringUtils.center("记录明细如下", 40, "-")); NotifyTaskSuccCounter cntSucc = new NotifyTaskSuccCounter(); NotifyTaskFailCounter cntFail = new NotifyTaskFailCounter(); BatchTaskPredicate predicate = new BatchTaskPredicate(excutor); Closure batchAction = new IfClosure(predicate, cntSucc, cntFail); CollectionUtils.forAllDo(list, batchAction); System.out.println("批处理一共处理:" + list.size() + "记录,处理成功:" + cntSucc.getSuccNumber() + "条记录,处理失败:" + cntFail.getFailNumber() + "条记录"); } }

http://www.jsqmd.com/news/1106154/

相关文章:

  • Agent记忆中RAG难题,浙大MemGate盘活了
  • 终极指南:HS2-HF Patch - Honey Select 2游戏体验的完整革命
  • 智能合约开发中的威胁建模:代码生成前的安全基线构建
  • 生成式引擎优化(GEO)在酒店民宿行业的落地实践:对抗 OTA 流量截流
  • Adobe破解终极指南:三步免费激活Photoshop等专业软件
  • 【中小学AI人工智能教育】强化学习范例——平衡杆
  • Claude 桌面版(macOS / Windows)工具分享
  • DFT:IST和ROM BIST能不能同时跑?特别是在mission mode下
  • 多模态AI系统性能优化:从3.2秒到1.5秒的实战经验
  • 新160个CrackMe042-crackme、043-riijj_cm_20041121、044-tsrh-crackme逆向分析
  • 前端应用离线暂停更新策略:构建稳定可靠的渐进式部署方案
  • 第9章 MCP 协议与 Skills 工具生态《AI Agent 开发平台资深技术专家 AI Agent 应用架构师 CTO 面试题库详解》
  • 在C++基础上理解CSharp-6
  • AI 编译优化入门:算子融合不是为了少写几行代码
  • utpasswd命令详解:10个实用参数让密码管理更高效
  • SolidWorks_装配体设计5_自上而下设计
  • AI Agent 编排实战:别让多个智能体互相抢麦
  • 特种行业加固计算机配套的固态硬盘,兼容性问题通常出在哪里?
  • Kiran Biometrics:开源生物识别认证系统的完整指南
  • Java反射基础
  • Frida内存操作避坑指南:从原理到实战的逆向分析核心技能
  • CNN-LSTM-AdaBoost时间序列预测实战指南
  • 大模型推理加速年度趋势:从量化到稀疏化的技术跃迁路径
  • ActiveReports for .NET 20.0J SP1-AIレポートウィザードがさらに進化
  • 大模型推理加速核心:KV Cache 复用机制与内存布局优化
  • 开启 OpenFeign 调用日志打印
  • Nuke Survival Toolkit:150个Nuke插件的终极指南与完整解决方案
  • CAD二次开发中的公差控制
  • Electron + Rust:吉他谱播放器性能优化实战
  • 抖音音频下载终极指南:5分钟掌握免费开源工具