当前位置: 首页 > news >正文

php方案 PHP 实现分布式任务调度

一、分布式任务调度(类XXL-Job) composerrequireswoole/ide-helper predis/predis 架构:[调度中心 Scheduler]→ Redis →[执行器节点 Worker xN]↑ ↓ 定时触发 执行任务+上报结果 调度中心:<?php// scheduler.php - 单实例,负责触发任务require'vendor/autoload.php';useSwoole\Timer;usePredis\Client;$redis=newClient();// 注册任务定义$jobs=['order.timeout'=>['cron'=>'*/5 * * * *','route'=>'round_robin'],'report.daily'=>['cron'=>'0 2 * * *','route'=>'random'],'cache.warmup'=>['cron'=>'*/1 * * * *','route'=>'broadcast'],// 所有节点都执行];// 抢分布式锁(只有一个调度中心实例工作)functionacquireLock(Client$redis,string$key,int$ttl=30):bool{return(bool)$redis->set($key,gethostname(),'EX',$ttl,'NX');}functionmatchCron(string$cron):bool{[$min,$hour,$dom,$mon,$dow]=explode(' ',$cron);$t=getdate();$match=fn($expr,$val)=>$expr==='*'?true:(str_starts_with($expr,'*/')?$val%(int)substr($expr,2)===0:in_array($val,array_map('intval',explode(',',$expr))));return$match($min,$t['minutes'])&&$match($hour,$t['hours'])&&$match($dom,$t['mday'])&&$match($mon,$t['mon'])&&$match($dow,$t['wday']);}// 每分钟检查一次(生产用每秒检查)Timer::tick(60000,function()use($redis,$jobs){if(!acquireLock($redis,'scheduler:lock'))return;// 没抢到锁就跳过foreach($jobsas$jobName=>$job){if(!matchCron($job['cron']))continue;$taskId=uniqid($jobName.':',true);$task=json_encode(['id'=>$taskId,'job'=>$jobName,'route'=>$job['route'],'trigger'=>time(),]);match($job['route']){'broadcast'=>broadcastTask($redis,$jobName,$task),'round_robin'=>$redis->rpush("queue:worker:".nextWorker($redis),$task),default=>$redis->rpush("queue:worker:".randomWorker($redis),$task),};// 记录调度日志$redis->hset("job:log:$taskId",'status','dispatched','time',time());echo"[$taskId] 已调度$jobName\n";}});functionbroadcastTask(Client$redis,string$job,string$task):void{foreach($redis->smembers('workers:online')as$worker){$redis->rpush("queue:worker:$worker",$task);}}functionnextWorker(Client$redis):string{$workers=$redis->smembers('workers:online');$idx=(int)$redis->incr('scheduler:rr_index')%count($workers);return$workers[$idx];}functionrandomWorker(Client$redis):string{return$redis->srandmember('workers:online');}echo"调度中心启动\n";\Swoole\Event::wait();执行器节点(多实例):<?php// worker.php - 多实例部署require'vendor/autoload.php';useSwoole\Process;usePredis\Client;$redis=newClient();$workerId=gethostname().':'.getmypid();// 注册自己上线$redis->sadd('workers:online',$workerId);$redis->expire('workers:online',30);// 注册任务处理器$handlers=['order.timeout'=>function(array$task):array{// 实际业务逻辑echo"处理订单超时任务\n";return['processed'=>42,'status'=>'ok'];},'report.daily'=>function(array$task):array{echo"生成日报\n";return['report_id'=>uniqid(),'status'=>'ok'];},'cache.warmup'=>function(array$task):array{echo"预热缓存\n";return['keys'=>1024,'status'=>'ok'];},];// 心跳(保持在线状态)\Swoole\Timer::tick(10000,fn()=>$redis->sadd('workers:online',$workerId)&&$redis->expire('workers:online',30));// 主循环:阻塞拉取任务echo"Worker$workerId启动\n";while(true){// blpop 阻塞等待,超时2秒重试$item=$redis->blpop("queue:worker:$workerId",2);if(!$item)continue;$task=json_decode($item[1],true);$job=$task['job'];echo"[{$task['id']}] 开始执行$job\n";$redis->hset("job:log:{$task['id']}",'status','running','worker',$workerId);try{$result=isset($handlers[$job])?($handlers[$job])($task):thrownew\RuntimeException("未知任务:$job");$redis->hset("job:log:{$task['id']}",'status','success','result',json_encode($result));echo"[{$task['id']}] 完成\n";}catch(\Throwable$e){$redis->hset("job:log:{$task['id']}",'status','failed','error',$e->getMessage());// 失败重试(最多3次)$retries=(int)$redis->hget("job:log:{$task['id']}",'retries');if($retries<3){$redis->hset("job:log:{$task['id']}",'retries',$retries+1);$redis->rpush("queue:worker:$workerId",json_encode($task));}echo"[{$task['id']}] 失败:{$e->getMessage()}\n";}}查询任务状态:<?php// 查日志$log=$redis->hgetall("job:log:$taskId");// ['status' => 'success', 'result' => '...', 'worker' => 'host:pid']大白话解释 分布式任务调度: 单机定时任务(crontab): 一台服务器挂了 → 任务不跑了 任务太多 → 一台跑不完 分布式任务调度: 调度中心:只管"什么时候触发什么任务"执行器:只管"怎么跑任务"Redis:中间传话的 流程: 调度中心 → 到点了,order.timeout该跑了 → 塞进Redis队列 Worker1 → 从队列取出来跑 → 跑完上报结果 Worker挂了 → 其他Worker继续跑,不影响
http://www.jsqmd.com/news/480622/

相关文章:

  • 分析钢结构厂房制造厂的性价比,苏东钢结构在全国排名如何 - 工业品牌热点
  • 2026年全国网架钢结构施工靠谱厂家有哪些,产品特色大揭秘 - myqiye
  • php方案 PHP 实现协程调度器
  • Python小白必做的30道基础练习题
  • Python 变量和数据类型
  • 探讨2026年全屋定制MES软件,如何选择合适的产品 - 工业推荐榜
  • 2026年GEO优化靠谱公司有哪些,鸿犀智能口碑出众 - mypinpai
  • 最近爆火的OpenClaw到底是什么?一文读懂RAG、MCP
  • Java 部署:Jenkins Pipeline 构建 Java 项目(自动化)
  • AWE 2026:“新人车家”时代,机器人引领家电消费新变革
  • 2026 AWE:具身智能机器人开启家庭服务新时代
  • 大树科技电话查询:综合技术驱动型服务客观解析 - 品牌推荐
  • 【开源-Proteus8.9仿真】基于51单片机的四相步进电机控制(ULN2003 + StepMotor + LCD1602) - 少年
  • 腾讯“龙虾”产品矩阵出击,AI 市场风云再起
  • 2026年盘点弗拉门戈舞蹈教学机构,深圳西艺文化口碑怎么样 - mypinpai
  • 总结津胜GEO优势,看看在天津地区使用它靠不靠谱 - 工业品网
  • 汽车贴膜性价比怎么选,肇庆星车驾到这样的公司靠谱吗 - 工业设备
  • 探讨不错的瓷砖建材采购企业,潮州哪家口碑好且费用合理? - 工业品牌热点
  • 说说中欧班列货代品牌企业,珠三角地区哪家口碑比较好? - 工业设备
  • 2026年讲讲津胜GEO,其员工素质能满足服务需求吗 - 工业品牌热点
  • 上海百达翡丽/北京江诗丹顿/杭州爱彼维修推荐?六大城市高端腕表维修全解析 - 时光修表匠
  • 优优推电话查询:了解其服务内容与联系渠道 - 品牌推荐
  • GitHub 热榜项目 - 日榜(2026-03-15)
  • 探讨佛山蓝色防滑漆选购要点,哪个品牌更值得入手 - myqiye
  • 2026年专科生必看!学生热捧的降AIGC平台 —— 千笔·专业降AI率智能体
  • InStreet API 完整参考
  • 专科生也能用!千笔,倍受青睐的AI论文写作软件
  • 选购GEO优化方案,上海地区好用的有哪些 - myqiye
  • 【AI应用出海】
  • 【69页PPT】全生命周期数字健康智慧医共体解决方案:“1”朵健康云、“3”大核心应用、“N”类服务应用迭代、区域医院智慧管理平台...