当前位置: 首页 > news >正文

任务调度与重试平台开源完整流程(从 0 到持续维护)==写一个开源项目全流程

1)目标能力(MVP 先做这6个)1. Cron 与一次性任务2. 任务入队执行(异步)3. 失败重试(固定间隔/指数退避)4. 最大重试后进入死信队列(DLQ)5. 幂等控制(避免重复执行)6. 管理 API(创建、暂停、恢复、手动触发、查看执行历史) ---2)项目结构(建议) hyperf-scheduler-platform/ ├─ app/ │ ├─ Controller/ │ │ └─ TaskController.php │ ├─ Model/ │ │ ├─ ScheduledTask.php │ │ └─ TaskExecution.php │ ├─ Service/ │ │ ├─ SchedulerService.php │ │ ├─ DispatchService.php │ │ ├─ RetryPolicy/ │ │ │ ├─ RetryPolicyInterface.php │ │ │ └─ ExponentialRetryPolicy.php │ │ └─ TaskExecutorRegistry.php │ ├─ Job/ │ │ └─ ExecuteTaskJob.php │ ├─ TaskHandler/ │ │ ├─ TaskHandlerInterface.php │ │ └─ DemoSendEmailHandler.php │ └─ Crontab/ │ └─ ScheduleTickCrontab.php ├─ config/autoload/ │ ├─ async_queue.php │ ├─ crontab.php │ ├─ dependencies.php │ └─ routes.php ├─ migrations/ ├─ tests/ ├─ docker-compose.yml ├─ .github/workflows/ci.yml ├─ README.md ├─ SECURITY.md └─ LICENSE ---3)0初始化composercreate-project hyperf/hyperf-skeleton hyperf-scheduler-platformcdhyperf-scheduler-platformcomposerrequire hyperf/crontab hyperf/async-queue hyperf/rediscomposerrequire hyperf/db-connection hyperf/database hyperf/commandcomposerrequire dragonmantank/cron-expressioncomposerrequire--devphpunit/phpunit friendsofphp/php-cs-fixer phpstan/phpstan ---4)数据库设计(核心)4.1scheduled_tasks(任务定义) // migrations/2026_04_25_000001_create_scheduled_tasks.php Schema::create('scheduled_tasks',function(Blueprint$table){$table->bigIncrements('id');$table->string('name',128);$table->string('handler',191);// 处理器类名$table->json('payload')->nullable();$table->string('cron_expr',64)->nullable();//cron或 null(一次性)$table->timestamp('run_at')->nullable();// 一次性执行时间$table->unsignedTinyInteger('retry_strategy')->default(2);//1=fixed,2=expo$table->unsignedInteger('retry_limit')->default(5);$table->unsignedInteger('retry_base_seconds')->default(10);$table->unsignedTinyInteger('status')->default(1);//1=active,2=paused$table->timestamp('next_run_at')->nullable();$table->timestamp('last_run_at')->nullable();$table->timestamps();$table->index(['status','next_run_at']);});4.2task_executions(执行记录) Schema::create('task_executions',function(Blueprint$table){$table->bigIncrements('id');$table->unsignedBigInteger('task_id');$table->string('idempotency_key',128);// 防重$table->unsignedInteger('attempt')->default(1);$table->unsignedTinyInteger('status')->default(1);//1=queued,2=running,3=success,4=failed,5=dead$table->text('error_message')->nullable();$table->timestamp('scheduled_at')->nullable();$table->timestamp('started_at')->nullable();$table->timestamp('finished_at')->nullable();$table->timestamps();$table->unique(['idempotency_key']);$table->index(['task_id','status','created_at']);});---5)核心代码5.1模型 // app/Model/ScheduledTask.php class ScheduledTask extends Model{protected ?string$table='scheduled_tasks';protected array$fillable=['name','handler','payload','cron_expr','run_at','retry_strategy','retry_limit','retry_base_seconds','status','next_run_at','last_run_at'];protected array$casts=['payload'=>'array'];}// app/Model/TaskExecution.php class TaskExecution extends Model{protected ?string$table='task_executions';protected array$fillable=['task_id','idempotency_key','attempt','status','error_message','scheduled_at','started_at','finished_at'];}5.2重试策略 // app/Service/RetryPolicy/RetryPolicyInterface.php interface RetryPolicyInterface{publicfunctionnextDelaySeconds(int$attempt, int$baseSeconds): int;}// app/Service/RetryPolicy/ExponentialRetryPolicy.php final class ExponentialRetryPolicy implements RetryPolicyInterface{publicfunctionnextDelaySeconds(int$attempt, int$baseSeconds): int{//attempt=1=>base,2=>2*base,3=>4*basereturn(int)min(3600,$baseSeconds*(2** max(0,$attempt-1)));}}5.3调度扫描(每秒 tick) // app/Crontab/ScheduleTickCrontab.php use Hyperf\Crontab\Annotation\Crontab;final class ScheduleTickCrontab{publicfunction__construct(private SchedulerService$schedulerService){}#[Crontab(rule: '* * * * * *', memo: 'schedule tick per second', singleton: true)]publicfunctionexecute(): void{$this->schedulerService->tick();}}// app/Service/SchedulerService.php use Cron\CronExpression;use Hyperf\DbConnection\Db;final class SchedulerService{publicfunction__construct(private DispatchService$dispatchService){}publicfunctiontick(): void{Db::transaction(function(){$rows=Db::select(" SELECT * FROM scheduled_tasks WHERE status = 1 AND next_run_at IS NOT NULL AND next_run_at <= NOW() ORDER BY next_run_at ASC LIMIT 100 FOR UPDATE SKIP LOCKED ");foreach($rowsas$row){$task=ScheduledTask::query()->find($row->id);if(!$task){continue;}$this->dispatchService->dispatch($task,1);$task->last_run_at=date('Y-m-d H:i:s');$task->next_run_at=$this->computeNextRunAt($task);$task->save();}});}privatefunctioncomputeNextRunAt(ScheduledTask$task): ?string{if($task->cron_expr){returnCronExpression::factory($task->cron_expr)->getNextRunDate()->format('Y-m-d H:i:s');}// 一次性任务执行后不再调度returnnull;}}5.4入队与执行 // app/Service/DispatchService.php final class DispatchService{publicfunction__construct(private DriverFactory$driverFactory){}publicfunctiondispatch(ScheduledTask$task, int$attempt): void{$idempotencyKey=sprintf('%d-%s-%d',$task->id, date('YmdHis'),$attempt);TaskExecution::query()->create(['task_id'=>$task->id,'idempotency_key'=>$idempotencyKey,'attempt'=>$attempt,'status'=>1,'scheduled_at'=>date('Y-m-d H:i:s'),]);$driver=$this->driverFactory->get('default');$driver->push(new ExecuteTaskJob($task->id,$attempt,$idempotencyKey));}}// app/Job/ExecuteTaskJob.php use Hyperf\AsyncQueue\Job;final class ExecuteTaskJob extends Job{publicfunction__construct(public int$taskId, public int$attempt, public string$idempotencyKey){}publicfunctionhandle(){/** @var TaskExecution$execution*/$execution=TaskExecution::query()->where('idempotency_key',$this->idempotencyKey)->first();if(!$execution||$execution->status===3){return;}$execution->status=2;$execution->started_at=date('Y-m-d H:i:s');$execution->save();$task=ScheduledTask::query()->findOrFail($this->taskId);try{$handler=di(TaskExecutorRegistry::class)->make($task->handler);$handler->handle($task->payload ??[]);$execution->status=3;$execution->finished_at=date('Y-m-d H:i:s');$execution->save();}catch(\Throwable$e){$this->onFailed($task,$execution,$e);}}privatefunctiononFailed(ScheduledTask$task, TaskExecution$execution,\Throwable$e): void{$execution->status=4;$execution->error_message=mb_substr($e->getMessage(),0,2000);$execution->finished_at=date('Y-m-d H:i:s');$execution->save();if($this->attempt>=$task->retry_limit){$execution->status=5;// dead$execution->save();return;}$policy=di(ExponentialRetryPolicy::class);$delay=$policy->nextDelaySeconds($this->attempt,(int)$task->retry_base_seconds);di(DriverFactory::class)->get('default')->push(new ExecuteTaskJob($task->id,$this->attempt +1,$execution->idempotency_key.'-r'.($this->attempt +1)),$delay);}}5.5处理器接口与示例 // app/TaskHandler/TaskHandlerInterface.php interface TaskHandlerInterface{publicfunctionhandle(array$payload): void;}// app/TaskHandler/DemoSendEmailHandler.php final class DemoSendEmailHandler implements TaskHandlerInterface{publicfunctionhandle(array$payload): void{// 示例: 真实项目里放邮件发送逻辑if(empty($payload['to'])){throw new\RuntimeException('missing to');}}}// app/Service/TaskExecutorRegistry.php final class TaskExecutorRegistry{publicfunctionmake(string$handler): TaskHandlerInterface{$instance=di()->get($handler);if(!$instanceinstanceof TaskHandlerInterface){throw new\RuntimeException("handler invalid: {$handler}");}return$instance;}}---6)管理 API(最小可用) // config/autoload/routes.php Router::addGroup('/api/tasks',function(){Router::post('',[TaskController::class,'create']);Router::post('/{id:\d+}/pause',[TaskController::class,'pause']);Router::post('/{id:\d+}/resume',[TaskController::class,'resume']);Router::post('/{id:\d+}/trigger',[TaskController::class,'triggerNow']);Router::get('/{id:\d+}/executions',[TaskController::class,'executions']);});---7)运行配置 // config/autoload/async_queue.phpreturn['default'=>['driver'=>Hyperf\AsyncQueue\Driver\RedisDriver::class,'redis'=>['pool'=>'default'],'channel'=>'task-queue','timeout'=>2,'retry_seconds'=>5,'handle_timeout'=>10,'processes'=>2,],];# docker-compose.ymlversion:"3.8"services: mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: scheduler ports:["3306:3306"]redis: image: redis:7 ports:["6379:6379"]---8)开源发布流程(完整)1. LICENSE(MIT/Apache-2.0)2. README:5 分钟启动、架构图、任务状态机、重试策略说明3. SECURITY.md:漏洞提交流程4. GitHub 模板:Issue/PR Template5. 首版 tag:v0.1.0(MVP 功能锁定)6. Packagist(可选):把通用调度核心拆成独立包 ---9)CI/CD(必须项) .github/workflows/ci.yml 建议至少包含: -composervalidate - php-l/ cs-fixer 检查 - phpunit - phpstan - 集成测试:创建任务 ->触发执行 ->验证重试与死信状态 ---10)持续维护路线图 - v0.1:cron+ 一次性 + 重试 + DLQ + API - v0.2: 多租户队列隔离、任务标签、并发上限 - v0.3: Web 控制台、告警(失败率/堆积量) - v1.0: 高可用调度器(多实例选主)、审计日志、权限模型 ---11)最容易踩坑的点(重点)1. 多实例重复派发(用 FOR UPDATE SKIP LOCKED 或分布式锁)2. 幂等键设计不稳定(重试必须生成可追踪键)3. 高基数日志字段(执行日志要控字段数量)4. 重试风暴(指数退避 + 最大重试 + DLQ)5. 任务处理器抛错污染主循环(执行层要隔离) --- 这套骨架可以直接作为开源首版:先把 scheduled_tasks + task_executions + tick 调度 + async queue 执行 + retry/DLQ + 管理 API + CI 放上仓库,第一周就能给社区跑通。
http://www.jsqmd.com/news/701828/

相关文章:

  • 仓颉(Cangjie)编程语言:从汉字造字始祖到全场景智能应用开发语言
  • 移动端UI自动化测试框架Maestro:YAML驱动,跨平台高效测试实践
  • 从零手写C++ MCP网关:3周上线、支撑日均47亿请求,我们删掉了所有STL容器,换上了定制化内存池
  • 快狐KIHU|49寸横屏自助触摸终端G+G电容屏国产鸿蒙系统银行网点查询
  • AltSnap:5个技巧彻底改变Windows窗口管理体验
  • 机器学习分类模型决策边界可视化实战指南
  • 深度学习超参数网格搜索实战指南
  • Qwen3-4B-Instruct-2507新手必看:从部署到生成第一段文本
  • Qwen2.5-0.5B怎么选GPU?算力匹配建议与部署参数详解
  • StarRocks MCP Server:AI Agent安全访问数据仓库的工程实践
  • 零门槛上手Llama-3.2-3B:Ollama部署教程,3步完成环境搭建
  • 卡拉罗冲刺港股:年营收8.7亿,利润1.2亿 派息1亿
  • 使用Docker快速部署FRCRN开发测试环境
  • Pixel Couplet Gen 助力乡村振兴:为乡村民宿设计特色数字年画
  • BitNet-b1.58-2B-4T-GGUF 前端开发实战:JavaScript交互应用构建
  • Java语言及重要贡献人物
  • Qianfan-OCR数据结构优化:提升大批量图片处理效率的编程技巧
  • 嵌入式C如何驯服千层参数?:在256KB RAM MCU上跑通TinyLlama的5步内存压缩法
  • 程序员的心理学学习笔记 - NPD 人格
  • 从零构建轻量级AI智能体:微架构设计与运维自动化实践
  • Budibase开源AI代理平台实战:从部署到构建自动化运营中枢
  • RainbowGPT:基于开源大模型的中文优化与微调实战指南
  • DDrawCompat终极指南:让Windows 11上的经典游戏重获新生的完整解决方案
  • Qwen3-4B-Instruct效果展示:整本PDF/百万行代码精准问答案例集
  • 抖音内容批量下载终极指南:免费开源工具完全解析
  • 2026年Q2妇科洗液OEM贴牌权威服务商排行盘点 - 优质品牌商家
  • Parlant对话控制层:构建可靠AI智能体的动态上下文工程实践
  • C++26反射+Concepts+MDA:构建自描述协议栈的7步法(附LLVM-IR级调试技巧)
  • 飞书文档转Markdown:一键解决跨国团队的文档迁移难题
  • 丹青幻境·Z-Image Atelier详细步骤:自定义Noto Serif SC字体渲染