xxl-Job分布式任务
分布式任务解决的问题:集群环境下,定时任务对于同一个服务器进行的任务下发操作会出现多个集群同时触发的情况。
以下面为例实战使用xxl-job
运行之前,首先需要创建执行管理器:
创建任务:
xxl-job支持的路由策略非常丰富:
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):在线的机器按照顺序一次执行一个
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
xxl-job配置:
package com.sl.xxljob.config; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * xxl-job config */ @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken:}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address:}") private String address; @Value("${xxl.job.executor.ip:}") private String ip; @Value("${xxl.job.executor.port:0}") private int port; @Value("${xxl.job.executor.logpath:}") private String logPath; @Value("${xxl.job.executor.logretentiondays:}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }任务代码:
package com.sl.xxljob.job; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.RandomUtil; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; /** * 任务处理器 */ @Component public class JobHandler { private List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5); /** * 普通任务 */ @XxlJob("firstJob") public void firstJob() throws Exception { System.out.println("firstJob执行了.... " + LocalDateTime.now()); for (Integer data : dataList) { XxlJobHelper.log("data= {}", data); Thread.sleep(RandomUtil.randomInt(100, 500)); } System.out.println("firstJob执行结束了.... " + LocalDateTime.now()); } /** * 分片式任务 */ @XxlJob("shardingJob") public void shardingJob() throws Exception { // 分片参数 // 分片节点总数 int shardTotal = XxlJobHelper.getShardTotal(); // 当前节点下标,从0开始 int shardIndex = XxlJobHelper.getShardIndex(); System.out.println("shardingJob执行了.... " + LocalDateTime.now()); for (Integer data : dataList) { if (data % shardTotal == shardIndex) { System.out.println("当前第"+shardIndex+"分片执行了,任务项为:"+data); Thread.sleep(RandomUtil.randomInt(100, 500)); } } System.out.println("shardingJob执行结束了.... " + LocalDateTime.now()); } }分片式任务的测试:
启动XxlJobApplication两个服务,模拟测试多实例执行集群
9902服务中要设置vm参数:
一是两个服务启动时需要区分端口号,二是因为当xxl-job根据分布策略去下发任务时需要根据在配置文件中设置的xxl.job.executor.port回访端口号去选择对应集群中的某个端口来下发,所以这个回访端口号不能够重复
打印控制台结果输出
9901节点控制台输出
9902节点控制台输出
可以看出,2个节点共同完成的任务处理,并且没有重复,这样提高了任务处理能力。
调度流程:
