当前位置: 首页 > news >正文

NestJS 系列教程(十七):异步任务与消息队列(Bull + Redis 企业级实战)

🚀 NestJS 系列教程(十七):异步任务与消息队列(Bull + Redis 企业级实战)

✨ 本篇目标

本篇你将学会:

  • 为什么要用消息队列(削峰填谷、解耦、提升响应速度)

  • 在 NestJS 中如何集成 Bull(Redis Queue)

  • 如何实现:

    • Producer(生产任务)
    • Consumer / Processor(消费任务)
    • 重试(attempts + backoff)
    • 延迟任务(delay)
    • 并发控制(concurrency)
  • 如何把traceId贯穿到异步任务,做到可追踪


🧠 一、为什么要队列?

典型下单流程往往包含很多“非核心动作”:

  • 发短信/邮件
  • 写埋点日志
  • 通知第三方系统
  • 生成优惠券、积分等

如果全部同步做,接口耗时会暴涨,且第三方慢/挂会拖死主业务。

正确姿势:

核心逻辑(扣库存、写订单)同步完成
非核心逻辑(通知、日志、延迟取消)丢队列异步做


🧱 二、准备环境与依赖

1)Redis(本地或 Docker)

Docker 启动 Redis:

dockerrun--nameredis-nest-p6379:6379-dredis:7

2)安装依赖

npminstall@nestjs/bull bull ioredis

Bull 内部使用 Redis;前面教程已经用过 ioredis,因此体系一致。


🧩 三、模块结构(本章最小闭环可运行)

我们基于一个“下单示例”来做队列:

  • /orders/create/:productId:创建订单(同步)

  • 同步成功后写入队列:

    • 发送通知(重试)
    • 写操作日志(并发控制)
    • 延迟取消订单(delay)

目录结构建议:

src/ ├── app.module.ts ├── common/ │ └── context/ │ ├── request-context.ts │ └── trace.util.ts ├── queue/ │ ├── queue.module.ts │ ├── jobs/ │ │ ├── order.jobs.ts │ │ └── order.processor.ts │ └── queue.constants.ts └── orders/ ├── orders.module.ts ├── orders.service.ts └── orders.controller.ts

如果你跟着教程做,那你现在应该已经有 traceId(第13章),这里直接复用即可。


🔧 四、配置 BullModule(全局)

src/app.module.ts

import{Module}from'@nestjs/common';import{BullModule}from'@nestjs/bull';import{QueueModule}from'./queue/queue.module';import{OrdersModule}from'./orders/orders.module';@Module({imports:[// Bull 全局连接 RedisBullModule.forRoot({redis:{host:'127.0.0.1',port:6379},// Bull 默认参数也可以在这里统一配置defaultJobOptions:{removeOnComplete:true,// 生产建议开启,避免 Redis 堆积removeOnFail:false,// 失败任务保留用于排查(或配置保留数量)},}),QueueModule,OrdersModule,],})exportclassAppModule{}

🧱 五、QueueModule:注册队列 + 注册 Processor

1)常量定义

src/queue/queue.constants.ts

exportconstQUEUE_ORDER='orderQueue';exportconstJOB_SEND_NOTIFICATION='sendNotification';exportconstJOB_WRITE_AUDIT_LOG='writeAuditLog';exportconstJOB_AUTO_CANCEL_ORDER='autoCancelOrder';

2)QueueModule

src/queue/queue.module.ts

import{Module}from'@nestjs/common';import{BullModule}from'@nestjs/bull';import{QUEUE_ORDER}from'./queue.constants';import{OrderJobs}from'./jobs/order.jobs';import{OrderProcessor}from'./jobs/order.processor';@Module({imports:[BullModule.registerQueue({name:QUEUE_ORDER,}),],providers:[OrderJobs,OrderProcessor],exports:[OrderJobs],// 给 OrdersService 调用(Producer)})exportclassQueueModule{}

🧑‍🍳 六、Producer:封装任务投递(OrderJobs)

这样做的好处:业务层不用关心 Bull 细节,代码更干净。

src/queue/jobs/order.jobs.ts

import{Injectable}from'@nestjs/common';import{InjectQueue}from'@nestjs/bull';import{Queue}from'bull';import{QUEUE_ORDER,JOB_SEND_NOTIFICATION,JOB_WRITE_AUDIT_LOG,JOB_AUTO_CANCEL_ORDER,}from'../queue.constants';@Injectable()exportclassOrderJobs{constructor(@InjectQueue(QUEUE_ORDER)privatereadonlyqueue:Queue){}/** * 发送通知:失败自动重试 + 指数退避 */asyncsendNotification(payload:{orderId:string;userId:number;traceId?:string;}){returnthis.queue.add(JOB_SEND_NOTIFICATION,payload,{attempts:3,backoff:{type:'exponential',delay:2000},});}/** * 写审计日志:通常不需要重试太多,失败可落库/落文件 */asyncwriteAuditLog(payload:{action:string;orderId:string;traceId?:string;}){returnthis.queue.add(JOB_WRITE_AUDIT_LOG,payload,{attempts:2,backoff:{type:'fixed',delay:1000},});}/** * 延迟任务:未支付订单自动取消 */asyncautoCancelOrder(payload:{orderId:string;traceId?:string}){returnthis.queue.add(JOB_AUTO_CANCEL_ORDER,payload,{delay:30*60*1000,// 30分钟后执行attempts:1,});}}

🧠 七、Consumer:Processor 处理任务(OrderProcessor)

src/queue/jobs/order.processor.ts

import{Processor,Process}from'@nestjs/bull';import{Job}from'bull';import{QUEUE_ORDER,JOB_SEND_NOTIFICATION,JOB_WRITE_AUDIT_LOG,JOB_AUTO_CANCEL_ORDER,}from'../queue.constants';@Processor(QUEUE_ORDER)exportclassOrderProcessor{/** * 发送通知(并发 5) * 适合:发送邮件/短信/站内信等第三方调用 */@Process({name:JOB_SEND_NOTIFICATION,concurrency:5})asynchandleSendNotification(job:Job){const{orderId,userId,traceId}=job.data;// 生产建议:这里用你自己的 Logger(带 traceId)console.log(`[sendNotification] traceId=${traceId}orderId=${orderId}userId=${userId}`);// 模拟第三方慢调用awaitnewPromise((r)=>setTimeout(r,800));// 模拟偶发失败(用于验证重试机制)if(Math.random()<0.1){thrownewError('第三方短信服务失败(模拟)');}return{ok:true};}/** * 写审计日志(并发 10) * 适合:埋点、行为记录、关键操作审计 */@Process({name:JOB_WRITE_AUDIT_LOG,concurrency:10})asynchandleWriteAuditLog(job:Job){const{action,orderId,traceId}=job.data;console.log(`[writeAuditLog] traceId=${traceId}action=${action}orderId=${orderId}`);// 模拟写入日志系统/数据库awaitnewPromise((r)=>setTimeout(r,100));return{ok:true};}/** * 延迟取消订单(并发 3) * 适合:超时自动取消、自动确认收货、自动退款等 */@Process({name:JOB_AUTO_CANCEL_ORDER,concurrency:3})asynchandleAutoCancel(job:Job){const{orderId,traceId}=job.data;console.log(`[autoCancelOrder] traceId=${traceId}orderId=${orderId}`);// 真实逻辑:查订单状态,如果未支付则取消并回滚库存awaitnewPromise((r)=>setTimeout(r,200));return{canceled:true};}}

🔗 八、Orders:在下单后投递队列(完整闭环)

OrdersModule

src/orders/orders.module.ts

import{Module}from'@nestjs/common';import{OrdersService}from'./orders.service';import{OrdersController}from'./orders.controller';import{QueueModule}from'../queue/queue.module';@Module({imports:[QueueModule],providers:[OrdersService],controllers:[OrdersController],})exportclassOrdersModule{}

OrdersService:同步成功后投递任务

src/orders/orders.service.ts

import{Injectable}from'@nestjs/common';import{OrderJobs}from'../queue/jobs/order.jobs';// 如果你第13章用了 AsyncLocalStorage,这里可直接 getTraceId()// 我这里做成可选,避免你没接入时跑不起来functiongetTraceIdSafe():string|undefined{returnundefined;}@Injectable()exportclassOrdersService{constructor(privatereadonlyorderJobs:OrderJobs){}asynccreateOrder(productId:number){// 1️⃣ 核心逻辑:扣库存 + 写订单(本章用模拟)// 真实项目:这里会调用第16章的库存扣减 + 事务写订单constorderId=`ORDER_${Date.now()}`;constuserId=1;consttraceId=getTraceIdSafe();// 2️⃣ 非核心逻辑:投递队列(异步)awaitthis.orderJobs.sendNotification({orderId,userId,traceId});awaitthis.orderJobs.writeAuditLog({action:'CREATE_ORDER',orderId,traceId});awaitthis.orderJobs.autoCancelOrder({orderId,traceId});// 3️⃣ 主接口快速返回return{orderId,productId,userId,queued:true};}}

OrdersController

src/orders/orders.controller.ts

import{Controller,Post,Param,ParseIntPipe}from'@nestjs/common';import{OrdersService}from'./orders.service';@Controller('orders')exportclassOrdersController{constructor(privatereadonlyordersService:OrdersService){}@Post('create/:productId')create(@Param('productId',ParseIntPipe)productId:number){returnthis.ordersService.createOrder(productId);}}

🧪 九、测试方式

1)启动 Redis(如果没启动)

dockerps|grepredis-nest||dockerrun--nameredis-nest-p6379:6379-dredis:7

2)启动 Nest

npmrun start

3)请求下单接口

POST http://localhost:3000/orders/create/1

你会看到:

  • 接口立刻返回{ queued: true }

  • 控制台异步输出:

    • sendNotification(可能重试)
    • writeAuditLog
    • autoCancelOrder(30 分钟后触发)

🧠 十、生产级注意事项(非常重要)

1)任务投递时机

务必保证:

数据库事务提交成功后再 add job

否则会出现:

  • 队列执行了,但订单数据没落库(事务回滚)

2)失败任务处理策略

  • 重要任务(发货、扣款)必须可重试 + 告警
  • 非重要任务(埋点)失败可降级

3)队列拆分建议

不要把所有任务丢到一个队列里,推荐按职责拆:

  • orderQueue
  • notifyQueue
  • auditQueue

这样可控性更强,也更容易扩容。


✅ 本章小结

本篇你已经完成了一个“企业级队列闭环”:

  • Bull + Redis 集成
  • Producer(OrderJobs)封装投递
  • Processor(OrderProcessor)消费执行
  • 自动重试 + 退避
  • 延迟任务
  • 并发控制
  • 与订单主流程解耦,实现削峰填谷
http://www.jsqmd.com/news/611549/

相关文章:

  • MT5 Zero-Shot中文增强效果展示:儿童读物语言简化改写案例
  • 飞猫M7随身WiFi去云控,解限速,改后台,改壁纸
  • Qwen3.5-9B应用案例:用它做智能客服、分析图表,简单高效
  • EVA-02重建技术面试题:Java八股文的知识点梳理与重构
  • Xinference-v1.17.1视频内容审核系统实战
  • Java开发者必备:GME-Qwen2-VL-2B Spring Boot后端集成全攻略
  • mac上安装openclaw从入门到删除
  • Aloha 机械臂实战指南:基于ACT策略的sim_transfer_cube任务训练与调优
  • MediaPipe Hands彩虹骨骼版入门指南:从零开始学习手势识别技术
  • Dify前端Docker镜像瘦身与优化实战:告别网络依赖,提升构建速度
  • Beautiful Soup
  • 跨平台协同:Windows主机+Mac笔记本共享Qwen3-32B-Chat镜像方案
  • internlm2-chat-1.8b开源模型深度解析:SFT+RLHF对齐带来的指令遵循提升
  • 配电系统里充电站怎么报价才能既赚到钱又不被市场机制反噬?这问题最近折腾得我够呛。今天咱们就扒一扒这个两阶段投标策略的代码实现,保证您看完能自己动手写个简化版
  • Z-Image-Turbo-辉夜巫女实战教程:GPU算力弹性伸缩——按需加载LoRA模型
  • S2-Pro辅助3D建模与场景描述:连接自然语言与Blender脚本生成
  • 2026年知名的组合式中空锚杆/隧道支护中空锚杆稳定供应商推荐 - 品牌宣传支持者
  • 手把手教你用社区预编译轮子在 Windows 上快速安装 flash_attn(含常见错误解决方案)
  • 卡证检测模型固件升级:嵌入式设备模型OTA更新
  • NestJS 系列教程(十八):文件上传与对象存储架构(Multer + S3/OSS + 访问控制)
  • Vue实战:从零构建黑马后台管理系统全流程解析
  • [特殊字符] 第72课:杨辉三角
  • 2026年热门的隧道支护中空注浆锚杆/自钻式中空注浆锚杆/螺纹钢中空注浆锚杆/预应力中空注浆锚杆口碑好的厂家推荐 - 品牌宣传支持者
  • Sambert多情感语音合成保姆级教程:从部署到生成你的第一段语音
  • 逆变器核心技术解析:锁相环(PLL)在并网系统中的应用与优化
  • Verilog中pullup和pulldown的实战应用:从I2C到Open-Drain的完整指南
  • 基于PyTorch 2.8 的代码生成实践:使用Codex模型辅助编写深度学习脚本
  • 2026年知名的电渗析高盐水处理设备/垃圾渗滤液高盐水处理设备/冷冻法高盐水处理设备/撬装式高盐水处理设备源头厂家 - 品牌宣传支持者
  • 基于Simulink的无差拍(Deadbeat)电流控制高动态性能
  • Java 接入多家大模型 API 实战对比