ThinkPHP6 消息队列 think-queue:从配置到高可用部署实战
1. 消息队列基础与ThinkPHP6集成
消息队列是现代Web开发中不可或缺的组件,特别是在电商系统这类高并发场景下。想象一下双11期间,每秒上万笔订单涌入系统,如果每个订单都同步处理库存扣减、优惠券核销、物流单生成等操作,服务器瞬间就会崩溃。这时候消息队列就像个高效的"订单分拣中心",把请求先收进来排队,后台慢慢处理。
ThinkPHP6官方扩展think-queue提供了开箱即用的队列支持,我实测下来发现它有几个明显优势:配置简单(相比直接使用Redis队列)、支持多种驱动(Redis/Database/Sync)、内置失败重试机制。下面我们从一个电商订单处理的实际案例出发,手把手搭建完整解决方案。
先来看基础环境准备:
# 确保已安装Redis服务 sudo apt-get install redis-server redis-cli ping # 应该返回PONG # 在ThinkPHP6项目中安装队列扩展 composer require topthink/think-queue配置文件位于config/queue.php,建议这样设置:
return [ 'default' => 'redis', // 默认使用Redis驱动 'connections' => [ 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', 6379), 'password' => env('redis.password', ''), 'select' => 0, // 使用Redis的0号数据库 'timeout' => 0, // 不超时 'persistent' => false, // 非持久化连接 ], ], 'failed' => [ // 失败任务记录 'type' => 'database', 'table' => 'failed_jobs', ], ];2. 订单异步处理实战开发
电商场景最典型的就是订单异步处理。我们创建一个订单处理任务类:
// app/job/OrderProcess.php namespace app\job; use think\queue\Job; use app\model\Order; class OrderProcess { public function fire(Job $job, $data) { $orderId = $data['order_id']; try { // 1. 扣减库存 $this->reduceInventory($orderId); // 2. 核销优惠券 $this->useCoupon($orderId); // 3. 生成物流单 $this->createShipping($orderId); $job->delete(); // 任务成功则删除 // 可记录成功日志 } catch (\Exception $e) { // 失败处理逻辑 if ($job->attempts() >= 3) { $this->logFailedJob($orderId, $e); $job->delete(); return false; } // 10秒后重试 $job->release(10); } } // 其他业务方法... }在控制器中触发队列任务:
// app/controller/Order.php public function create() { $orderData = input(); // 获取订单数据 // 立即持久化订单到数据库 $order = Order::create($orderData); // 推送到队列异步处理后续逻辑 $isPushed = Queue::push( 'app\job\OrderProcess', ['order_id' => $order->id], 'order_queue' // 指定队列名称 ); return $isPushed ? json(['code'=>200, 'msg'=>'订单创建成功']) : json(['code'=>500, 'msg'=>'系统繁忙请重试']); }3. 高级队列配置与优化
实际生产环境中,简单的队列配置远远不够。我们需要考虑以下几个关键点:
3.1 多队列优先级管理
电商系统通常需要区分订单优先级,比如VIP用户订单需要优先处理:
// 推送到高优先级队列 Queue::push('app\job\OrderProcess', $data, 'vip_order_queue'); // 普通队列 Queue::push('app\job\OrderProcess', $data, 'normal_order_queue');启动不同优先级的消费者:
# 先处理VIP订单,再处理普通订单 php think queue:listen --queue vip_order_queue,normal_order_queue3.2 延迟队列实现
有些业务需要延迟处理,比如30分钟内未支付自动取消订单:
// 30分钟后执行 Queue::later(1800, 'app\job\OrderCancel', $orderId, 'delay_queue');3.3 性能调优参数
根据服务器配置调整消费者参数:
php think queue:work \ --queue order_queue \ --delay 5 \ # 失败后延迟5秒重试 --memory 256 \ # 内存限制256M --sleep 3 \ # 无任务时休眠3秒 --tries 3 \ # 最大重试次数 --timeout 120 # 单个任务最长执行时间建议的监控指标:
- 队列积压数量(通过
redis-cli LLEN queues:order_queue查看) - 消费者进程内存占用
- 任务平均处理时长
4. 高可用部署方案
线上环境必须保证队列服务的稳定性,我分享几个实战经验:
4.1 Supervisor进程守护
安装配置Supervisor:
sudo apt-get install supervisor创建配置文件/etc/supervisor/conf.d/queue.conf:
[program:think_queue] command=php /var/www/your_project/think queue:work --queue order_queue --tries 3 directory=/var/www/your_project user=www-data autostart=true autorestart=true startretries=3 numprocs=4 # 启动4个进程 process_name=%(program_name)s_%(process_num)02d stderr_logfile=/var/log/supervisor/queue_err.log stdout_logfile=/var/log/supervisor/queue_out.log管理命令:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start think_queue:*4.2 失败任务处理
think-queue支持失败任务记录,首先创建数据表:
php think queue:failed-table php think migrate:run然后在配置中启用:
'failed' => [ 'type' => 'database', 'table' => 'failed_jobs', ],可以定期检查失败任务并手动处理:
$faileds = Db::name('failed_jobs')->select(); foreach ($faileds as $failed) { // 分析失败原因并处理 }4.3 Redis高可用方案
生产环境建议使用Redis集群或哨兵模式,配置示例:
'redis' => [ 'type' => 'redis', 'host' => [ 'tcp://redis1:6379', 'tcp://redis2:6379?alias=slave', ], 'options' => [ 'replication' => 'sentinel', 'service' => 'mymaster', 'parameters' => [ 'password' => 'your_redis_password', 'database' => 0, ], ], ],5. 监控与报警机制
完善的监控体系能提前发现问题,我常用的方案:
Prometheus + Grafana监控:
- 自定义指标导出器统计队列积压量
- 设置消费者进程存活检测
关键指标报警:
# 检查队列积压的Shell脚本 backlog=$(redis-cli LLEN queues:order_queue) if [ $backlog -gt 1000 ]; then # 触发邮件/短信报警 fi日志分析ELK:
- 收集Supervisor日志
- 分析任务处理时长分布
- 监控异常错误频率
在电商项目中,我通常会为关键队列设置SLA,比如:
- 普通订单处理延迟不超过5分钟
- VIP订单处理延迟不超过1分钟
- 失败重试间隔采用指数退避策略
这些经验都是在实际项目中踩坑总结出来的。记得第一次上线队列服务时,因为没有设置进程守护,半夜消费者挂了导致积压上万订单,第二天运营直接炸锅。后来引入Supervisor+监控才彻底解决问题。
