1)目标能力(MVP 先做这6个)1. Cron 与一次性任务2. 任务入队执行(异步)3. 失败重试(固定间隔/指数退避)4. 最大重试后进入死信队列(DLQ)5. 幂等控制(避免重复执行)6. 管理 API(创建、暂停、恢复、手动触发、查看执行历史) ---2)项目结构(建议) hyperf-scheduler-platform/ ├─ app/ │ ├─ Controller/ │ │ └─ TaskController.php │ ├─ Model/ │ │ ├─ ScheduledTask.php │ │ └─ TaskExecution.php │ ├─ Service/ │ │ ├─ SchedulerService.php │ │ ├─ DispatchService.php │ │ ├─ RetryPolicy/ │ │ │ ├─ RetryPolicyInterface.php │ │ │ └─ ExponentialRetryPolicy.php │ │ └─ TaskExecutorRegistry.php │ ├─ Job/ │ │ └─ ExecuteTaskJob.php │ ├─ TaskHandler/ │ │ ├─ TaskHandlerInterface.php │ │ └─ DemoSendEmailHandler.php │ └─ Crontab/ │ └─ ScheduleTickCrontab.php ├─ config/autoload/ │ ├─ async_queue.php │ ├─ crontab.php │ ├─ dependencies.php │ └─ routes.php ├─ migrations/ ├─ tests/ ├─ docker-compose.yml ├─ .github/workflows/ci.yml ├─ README.md ├─ SECURITY.md └─ LICENSE ---3)从0初始化composercreate-project hyperf/hyperf-skeleton hyperf-scheduler-platformcdhyperf-scheduler-platformcomposerrequire hyperf/crontab hyperf/async-queue hyperf/rediscomposerrequire hyperf/db-connection hyperf/database hyperf/commandcomposerrequire dragonmantank/cron-expressioncomposerrequire--devphpunit/phpunit friendsofphp/php-cs-fixer phpstan/phpstan ---4)数据库设计(核心)4.1scheduled_tasks(任务定义) // migrations/2026_04_25_000001_create_scheduled_tasks.php Schema::create('scheduled_tasks',function(Blueprint$table){$table->bigIncrements('id');$table->string('name',128);$table->string('handler',191);// 处理器类名$table->json('payload')->nullable();$table->string('cron_expr',64)->nullable();//cron或 null(一次性)$table->timestamp('run_at')->nullable();// 一次性执行时间$table->unsignedTinyInteger('retry_strategy')->default(2);//1=fixed,2=expo$table->unsignedInteger('retry_limit')->default(5);$table->unsignedInteger('retry_base_seconds')->default(10);$table->unsignedTinyInteger('status')->default(1);//1=active,2=paused$table->timestamp('next_run_at')->nullable();$table->timestamp('last_run_at')->nullable();$table->timestamps();$table->index(['status','next_run_at']);});4.2task_executions(执行记录) Schema::create('task_executions',function(Blueprint$table){$table->bigIncrements('id');$table->unsignedBigInteger('task_id');$table->string('idempotency_key',128);// 防重$table->unsignedInteger('attempt')->default(1);$table->unsignedTinyInteger('status')->default(1);//1=queued,2=running,3=success,4=failed,5=dead$table->text('error_message')->nullable();$table->timestamp('scheduled_at')->nullable();$table->timestamp('started_at')->nullable();$table->timestamp('finished_at')->nullable();$table->timestamps();$table->unique(['idempotency_key']);$table->index(['task_id','status','created_at']);});---5)核心代码5.1模型 // app/Model/ScheduledTask.php class ScheduledTask extends Model{protected ?string$table='scheduled_tasks';protected array$fillable=['name','handler','payload','cron_expr','run_at','retry_strategy','retry_limit','retry_base_seconds','status','next_run_at','last_run_at'];protected array$casts=['payload'=>'array'];}// app/Model/TaskExecution.php class TaskExecution extends Model{protected ?string$table='task_executions';protected array$fillable=['task_id','idempotency_key','attempt','status','error_message','scheduled_at','started_at','finished_at'];}5.2重试策略 // app/Service/RetryPolicy/RetryPolicyInterface.php interface RetryPolicyInterface{publicfunctionnextDelaySeconds(int$attempt, int$baseSeconds): int;}// app/Service/RetryPolicy/ExponentialRetryPolicy.php final class ExponentialRetryPolicy implements RetryPolicyInterface{publicfunctionnextDelaySeconds(int$attempt, int$baseSeconds): int{//attempt=1=>base,2=>2*base,3=>4*basereturn(int)min(3600,$baseSeconds*(2** max(0,$attempt-1)));}}5.3调度扫描(每秒 tick) // app/Crontab/ScheduleTickCrontab.php use Hyperf\Crontab\Annotation\Crontab;final class ScheduleTickCrontab{publicfunction__construct(private SchedulerService$schedulerService){}#[Crontab(rule: '* * * * * *', memo: 'schedule tick per second', singleton: true)]publicfunctionexecute(): void{$this->schedulerService->tick();}}// app/Service/SchedulerService.php use Cron\CronExpression;use Hyperf\DbConnection\Db;final class SchedulerService{publicfunction__construct(private DispatchService$dispatchService){}publicfunctiontick(): void{Db::transaction(function(){$rows=Db::select(" SELECT * FROM scheduled_tasks WHERE status = 1 AND next_run_at IS NOT NULL AND next_run_at <= NOW() ORDER BY next_run_at ASC LIMIT 100 FOR UPDATE SKIP LOCKED ");foreach($rowsas$row){$task=ScheduledTask::query()->find($row->id);if(!$task){continue;}$this->dispatchService->dispatch($task,1);$task->last_run_at=date('Y-m-d H:i:s');$task->next_run_at=$this->computeNextRunAt($task);$task->save();}});}privatefunctioncomputeNextRunAt(ScheduledTask$task): ?string{if($task->cron_expr){returnCronExpression::factory($task->cron_expr)->getNextRunDate()->format('Y-m-d H:i:s');}// 一次性任务执行后不再调度returnnull;}}5.4入队与执行 // app/Service/DispatchService.php final class DispatchService{publicfunction__construct(private DriverFactory$driverFactory){}publicfunctiondispatch(ScheduledTask$task, int$attempt): void{$idempotencyKey=sprintf('%d-%s-%d',$task->id, date('YmdHis'),$attempt);TaskExecution::query()->create(['task_id'=>$task->id,'idempotency_key'=>$idempotencyKey,'attempt'=>$attempt,'status'=>1,'scheduled_at'=>date('Y-m-d H:i:s'),]);$driver=$this->driverFactory->get('default');$driver->push(new ExecuteTaskJob($task->id,$attempt,$idempotencyKey));}}// app/Job/ExecuteTaskJob.php use Hyperf\AsyncQueue\Job;final class ExecuteTaskJob extends Job{publicfunction__construct(public int$taskId, public int$attempt, public string$idempotencyKey){}publicfunctionhandle(){/** @var TaskExecution$execution*/$execution=TaskExecution::query()->where('idempotency_key',$this->idempotencyKey)->first();if(!$execution||$execution->status===3){return;}$execution->status=2;$execution->started_at=date('Y-m-d H:i:s');$execution->save();$task=ScheduledTask::query()->findOrFail($this->taskId);try{$handler=di(TaskExecutorRegistry::class)->make($task->handler);$handler->handle($task->payload ??[]);$execution->status=3;$execution->finished_at=date('Y-m-d H:i:s');$execution->save();}catch(\Throwable$e){$this->onFailed($task,$execution,$e);}}privatefunctiononFailed(ScheduledTask$task, TaskExecution$execution,\Throwable$e): void{$execution->status=4;$execution->error_message=mb_substr($e->getMessage(),0,2000);$execution->finished_at=date('Y-m-d H:i:s');$execution->save();if($this->attempt>=$task->retry_limit){$execution->status=5;// dead$execution->save();return;}$policy=di(ExponentialRetryPolicy::class);$delay=$policy->nextDelaySeconds($this->attempt,(int)$task->retry_base_seconds);di(DriverFactory::class)->get('default')->push(new ExecuteTaskJob($task->id,$this->attempt +1,$execution->idempotency_key.'-r'.($this->attempt +1)),$delay);}}5.5处理器接口与示例 // app/TaskHandler/TaskHandlerInterface.php interface TaskHandlerInterface{publicfunctionhandle(array$payload): void;}// app/TaskHandler/DemoSendEmailHandler.php final class DemoSendEmailHandler implements TaskHandlerInterface{publicfunctionhandle(array$payload): void{// 示例: 真实项目里放邮件发送逻辑if(empty($payload['to'])){throw new\RuntimeException('missing to');}}}// app/Service/TaskExecutorRegistry.php final class TaskExecutorRegistry{publicfunctionmake(string$handler): TaskHandlerInterface{$instance=di()->get($handler);if(!$instanceinstanceof TaskHandlerInterface){throw new\RuntimeException("handler invalid: {$handler}");}return$instance;}}---6)管理 API(最小可用) // config/autoload/routes.php Router::addGroup('/api/tasks',function(){Router::post('',[TaskController::class,'create']);Router::post('/{id:\d+}/pause',[TaskController::class,'pause']);Router::post('/{id:\d+}/resume',[TaskController::class,'resume']);Router::post('/{id:\d+}/trigger',[TaskController::class,'triggerNow']);Router::get('/{id:\d+}/executions',[TaskController::class,'executions']);});---7)运行配置 // config/autoload/async_queue.phpreturn['default'=>['driver'=>Hyperf\AsyncQueue\Driver\RedisDriver::class,'redis'=>['pool'=>'default'],'channel'=>'task-queue','timeout'=>2,'retry_seconds'=>5,'handle_timeout'=>10,'processes'=>2,],];# docker-compose.ymlversion:"3.8"services: mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: scheduler ports:["3306:3306"]redis: image: redis:7 ports:["6379:6379"]---8)开源发布流程(完整)1. LICENSE(MIT/Apache-2.0)2. README:5 分钟启动、架构图、任务状态机、重试策略说明3. SECURITY.md:漏洞提交流程4. GitHub 模板:Issue/PR Template5. 首版 tag:v0.1.0(MVP 功能锁定)6. Packagist(可选):把通用调度核心拆成独立包 ---9)CI/CD(必须项) .github/workflows/ci.yml 建议至少包含: -composervalidate - php-l/ cs-fixer 检查 - phpunit - phpstan - 集成测试:创建任务 ->触发执行 ->验证重试与死信状态 ---10)持续维护路线图 - v0.1:cron+ 一次性 + 重试 + DLQ + API - v0.2: 多租户队列隔离、任务标签、并发上限 - v0.3: Web 控制台、告警(失败率/堆积量) - v1.0: 高可用调度器(多实例选主)、审计日志、权限模型 ---11)最容易踩坑的点(重点)1. 多实例重复派发(用 FOR UPDATE SKIP LOCKED 或分布式锁)2. 幂等键设计不稳定(重试必须生成可追踪键)3. 高基数日志字段(执行日志要控字段数量)4. 重试风暴(指数退避 + 最大重试 + DLQ)5. 任务处理器抛错污染主循环(执行层要隔离) --- 这套骨架可以直接作为开源首版:先把 scheduled_tasks + task_executions + tick 调度 + async queue 执行 + retry/DLQ + 管理 API + CI 放上仓库,第一周就能给社区跑通。