当前位置: 首页 > news >正文

BullMQ:AI系统缺失的队列层

你是否曾经花了大量时间只是在等待?等待 API 调用完成。看着请求超时。盯着加载中的转圈动画。在某个时刻你会意识到:问题不在于代码,而在于架构。我们不能只是调用一个慢的东西然后期望一切顺利。这就是人们发明任务队列的原因。

1、什么是"任务队列"?

这是一种朴素的方法——在完美世界中可行的方法:

用户上传文档 → 服务器调用 AI API → 服务器等待 → 服务器响应用户

很简单,对吧?直到 AI API 需要 30 秒。或者你的 HTTP 网关在 25 秒时超时。或者第三方服务在第 50 次请求后对你限流。或者三个用户同时上传,你同时冲击同一个端点。

同步方法假设一切都是快速和可靠的。没有什么是快速和可靠的

任务队列通过解耦请求和实际工作来解决这个问题。与其立即执行工作,你将一个便签放入队列,上面写着"嘿,有人需要做这件事"。一个独立的进程——一个Worker——拾取这个便签并在自己的时间以可控的速度完成实际工作。

把它想象成邮局。你不需要在柜台等待邮递员亲自把包裹送到目的地。你交出包裹,拿到一个追踪号,然后回家。投递在后台进行。你可以稍后查看追踪号以确认是否到达。那个追踪号?就是你的任务 ID。

2、如果你来自 RabbitMQ 或 Kafka 的快速说明

在这个入门教程中,我们将使用 BullMQ。如果你之前用过 RabbitMQ、Apache Kafka,甚至 Amazon Simple Queue Service,BullMQ 可能会感觉太简单了。

那是因为 BullMQ 并不试图成为通用的消息代理或事件流平台。它是一个任务队列:一个用来表达"请稍后做这项工作"的工具。

使用 RabbitMQ 时,你可能会想到交换机、路由键、发布者、消费者,以及多个服务订阅同一条消息。

使用 Kafka 时,你可能会想到主题、分区、偏移量和可重放的事件流。

BullMQ 并不做这些事情。没有交换机。没有分区。没有扇出式发布/订阅。一个任务通常有一个生产者、一个队列和一个最终处理它的 Worker。这是有意为之的。

对于本文所讨论的问题——后台任务,例如:

  • 文档分块
  • 生成嵌入向量
  • 发送电子邮件
  • 调整图片大小
  • 调用慢速 AI API
    ——BullMQ 通常更容易理解,也更快上手。特别是如果你的技术栈已经使用了 Node.js 和 Redis。

如果 RabbitMQ 是一个邮件分拣中心,Kafka 是整个货运铁路网络,那么 BullMQ 就是熟食店柜台上的取号机:取一个号码,排队等候,最终轮到你时有人会叫你。

权衡之处在于 BullMQ 的范围更窄。如果你以后需要多个独立的服务消费同一个事件、高级路由规则或重放历史事件,那通常就是你升级到 RabbitMQ 或 Kafka 的时候。

3、 BullMQ的三个主要角色

让我们先建立正确的思维模型。系统中有三个角色:

生产者(Producer)

你的应用中创建任务的部分。通常是你的 API 处理程序或控制器。当用户触发一些可能很耗资源的操作时,你不会立即执行工作,而是将其入队。

队列(Queue)

等待区域。它是一个数据结构(在 BullMQ 中:由 Redis 支持),用于存储任务并跟踪它们的状态。任务在这里等待,直到有 Worker 准备好。

Worker

实际执行工作的进程。它监视队列,逐个(或批量)拾取任务,运行你的处理逻辑,并将任务标记为完成或失败。

生产者和 Worker 不需要知道彼此。它们只是都与队列通信。这就是全部的诀窍。

4、前后对比:队列实际改变了什么

让我用一个真实的例子来说明——将文档导入 RAG 系统。这意味着从 PDF 中提取文本、将其分成块、评估质量、生成嵌入向量,并将所有内容写入向量数据库。

没有队列时,这在你的 HTTP 请求处理程序中同步运行:

这是消息队列系统(在我们的例子中:任务队列)能极大改善用户体验的典型案例。使用队列后,HTTP 处理程序只做一件事:入队。然后立即返回,如下面的时序图所示:

差异不仅仅是外观上的。有了队列:

  • API 保持快速,无论后台工作需要多长时间
  • 系统可以处理 100 个并发上传,而不会有 100 个被阻塞的线程
  • 嵌入 API 的速率限制由 Worker 遵守,而不是由 100 个竞争请求管理
  • 如果嵌入 API 宕机,任务会在队列中排队并自动重试。不会丢失任何工作。

5、代码示例

下面是一些代码示例,如果你想尝试使用 BullMQ 创建任务队列。

5.1 设置 Redis

BullMQ 将所有内容存储在 Redis 中:任务负载、状态、重试计数,所有东西。Redis 很快,能在重启后存活,而且 BullMQ 专门围绕其数据结构构建。

在本地运行 Redis 最快的方式是 Docker:

docker run -d -p 6379:6379 redis:7

就这样。Redis 现在运行在localhost:6379。本地开发无需额外配置。

5.2 安装 BullMQ

如果你已经在使用 NodeJS,BullMQ 非常适合。我们需要安装这两个库:bullmq队列库;和ioredis它底层使用的 Redis 客户端。

npm install bullmq ioredis

5.3 你的第一个队列和生产者

假设我们正在构建一个文档处理功能。当用户上传文件时,我们想要启动一个后台任务,而不是阻塞 HTTP 响应。

首先,设置一个共享的 Redis 连接并创建一个队列:

import { Queue } from 'bullmq'; import IORedis from 'ioredis'; // create a connection to Redis const connection = new IORedis({ maxRetriesPerRequest: null }); // create a queue with the name 'document-processing' const document_queue = new Queue('document-processing', { connection });

现在,在我们的 API 处理程序中(通常我们会同步执行工作的地方),我们只需入队:

// create a new job and // enqueue it in the 'document-processing' queue const job = await document_queue.add( 'process-document', { documentId: 'doc_abc123', userId: 'user_xyz', fileUrl: 'https://storage.example.com/uploads/abc123.pdf', } ); console.log(`Job enqueued with ID: ${job.id}`); // return job.id to the client so they can poll for status later

queue.add()接受一个任务名称和一个负载。负载是我们的 Worker 需要的任何数据。该方法返回一个带有 ID 的任务对象;请保存好它。

5.4 我们的第一个 Worker

现在我们需要一些东西来实际拾取这些任务。下面是一个基本的 Worker:

import { Worker } from 'bullmq'; import IORedis from 'ioredis'; // create a connection to Redis const connection = new IORedis({ maxRetriesPerRequest: null }); // create the worker for // the 'document-processing' queue we have created previously const worker = new Worker( 'document-processing', // <- the name of the queue async (job) => { console.log(`Processing job ${job.id}`); console.log(`Document ID: ${job.data.documentId}`); // This is where your actual processing logic goes // e.g., download file, call AI API, save results. // Before using job queue, an API handler would just // immediately call this function. await process_document(job.data); return { status: 'done', processedAt: new Date().toISOString() }; }, { connection } ); // event listener 1 worker.on('completed', (job) => { console.log(`Job ${job.id} finished.`); }); // event listener 2 worker.on('failed', (job, err) => { console.error(`Job ${job?.id} failed:`, err.message); });

Worker 通过名称连接到同一个队列。BullMQ 处理所有的轮询——你的处理函数只在有任务需要处理时运行。你从处理器中return的任何内容都会被保存为任务的结果数据。

6、任务状态:入队后会发生什么

每个任务都会经历一个生命周期。了解这一点会让调试容易得多。

还有waiting-children(用于任务依赖)和paused状态,但这些是高级主题。

对于上面的例子:waiting → active → completed/failed是核心循环。

6.1 重试和退避:因为事情会失败

外部 API 会失败。网络会出问题。你的 AI 提供商在凌晨 3 点返回 503 错误。我们希望任务能在不需要人工干预的情况下自动重试。

我们可以在入队时配置重试。前面的示例代码可以这样写:

const job = await document_queue.add( 'process-document', { documentId: 'doc_abc123', userId: 'user_xyz', fileUrl: 'https://storage.example.com/uploads/abc123.pdf', }, // retry configuration { attempts: 5, backoff: { type: 'exponential', delay: 2000, // start with 2 seconds }, } );

指数退避意味着:等待 2 秒,然后 4 秒,然后 8 秒,然后 16 秒……每次重试大约等待上一次的两倍时间。这很重要,因为如果一个服务正在挣扎中,每秒都去冲击它只会让情况更糟。给它恢复的空间。

使用attempts: 5,BullMQ 在第一次失败后最多会再重试四次,之后才会最终将任务标记为failed

7、限流

这是我最初在 RAG 系统中使用队列的原因之一。假设我们正在调用一个允许每分钟 100 次请求的外部 API。在并发用户足够多的情况下,我们会超出限制并开始到处收到429错误。

BullMQ 有一个内置的限流器。我们在 Worker 上配置它:

// adjusting the previous worker by adding a limiter configuration const worker = new Worker( 'document-processing', ... { connection, // adding limiter limiter: { max: 100, // max jobs processed duration: 60000 // per 60 seconds (in ms) }, } );

现在 BullMQ 自动将处理速度限制为每分钟 100 个任务,无论队列中有多少任务或有多少 Worker 实例在运行。任务只是停留在delayed状态,直到有处理能力。没有429错误,不需要手动限流逻辑。仅这一点就值得采用队列系统。

8、 检查任务状态(轮询)

还记得我们从queue.add()获得的那个任务 ID 吗?以下是我们如何使用它。这就是我们的前端在显示"处理中…"状态时轮询的内容:

import { Queue } from 'bullmq'; const document_queue = new Queue('document-processing', { connection }); // check job status async function getJobStatus(job_id: string) { const job = await document_queue.getJob(job_id); if (!job) { return { status: 'not_found' }; } const state = await job.getState(); const result = job.returnvalue; // only populated when completed return { id: job.id, state, // 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' result, failedReason: job.failedReason, }; }

我们的 API 端点暴露了这个功能,我们的前端每隔几秒轮询一次,我们在不保持开放连接的情况下显示进度。干净简洁。

9、常见问题解答

为什么选 BullMQ 而不是 RabbitMQ?

这个问题经常被问到。两者都是合法的队列系统,但它们解决的是略有不同的问题。

RabbitMQ 是一个完整的消息代理;一个独立的服务,有自己的协议(AMQP)、自己的概念(交换机、绑定、路由键)和自己的运维开销。它功能强大且久经考验,特别适用于需要根据复杂规则在许多不同消费者之间路由消息的微服务。

BullMQ 是一个运行在 Redis 之上的任务队列库,而 Redis 我们可能已经有了。它专注于一个用例:将任务入队、可靠地处理它们、跟踪它们的状态。

简而言之:BullMQ 最适合后台任务和任务队列。RabbitMQ 最适合事件流和微服务消息传递。

何时切换到 RabbitMQ 或 Kafka

在某些场景下,你确实会想要迁移到 RabbitMQ 或 Kafka。

切换到 RabbitMQ 的时机:

  • 你需要扇出——一个事件需要同时触发多个独立的消费者(例如,"文档已上传"应该同时触发导入流水线、通知服务和分析事件)

  • 你正在构建一个微服务网格,其中许多服务需要通过消息以复杂的路由规则进行通信

  • 你需要协议级别的互操作性——你的一些服务不是 Node.js,需要使用 AMQP 协议
    切换到 Kafka 的时机:

  • 你需要一个事件日志——你想要重放过去的事件,而不仅仅是处理一次

  • 你正在处理极高的吞吐量——想想每秒数百万事件(社交媒体信息流、IoT 遥测、点击流)

  • 你需要流处理——实时聚合、转换或连接事件流

  • 你的保留要求意味着你需要将事件保存数天或数周,以便下游消费者赶上

10、结束语

我们涵盖了以下内容:为什么同步 API 调用在现实条件下会崩溃;生产者/队列/Worker的思维模型;一个可以处理重试、限流和任务状态跟踪的 BullMQ 工作设置;以及最后,什么时候坚持使用 BullMQ,什么时候该使用更重的工具。

BullMQ 确实足以处理大多数用例的后台处理。从你的 API 处理程序中投入任务,让 Worker 以可控的速度处理它们,然后从客户端轮询状态。

后续方向:

  • Webhook 替代轮询——不再每隔几秒轮询一次检查状态,我们可以让 Worker 在任务完成时调用一个 webhook URL。更高效,更少噪音。
  • 优先级队列——有些任务比其他任务更紧急。BullMQ 支持queue.add()上的priority字段。
  • BullMQ 仪表盘——bull-board是一个用于监控队列、检查失败任务和手动重试的仪表盘 UI。在生产环境中非常有用。
  • 任务流——BullMQ 有父/子任务的概念,父任务会等待所有子任务完成。适用于多步骤流水线。
    如果你是消息队列系统的新手,队列模式可能需要一些思维上的开销来设置。但一旦它在那里,添加新的后台任务类型就是小事一桩。而且你将为未来更复杂的消息队列系统做好准备。添加队列是那些能快速获得回报的架构决策之一。

一如既往,持续学习,快乐编码!


原文链接:BullMQ:AI系统缺失的队列层 - 汇智网

http://www.jsqmd.com/news/736636/

相关文章:

  • Anything to RealCharacters 2.5D转真人引擎部署教程:四重显存防爆优化详解
  • 写于“AI元人文”思想体系初步完成之际
  • glutin社区贡献指南:从问题报告到代码提交的完整流程
  • 【大数据存储与管理】NoSQL数据库:06 从NoSQL到NewSQL数据库
  • 开源社区自动化协作:基于事件驱动的GitHub机器人开发实践
  • Steam成就管理工具完整指南:3步轻松解锁游戏成就
  • Android开发工程师职位聚焦蓝牙技术开发指南
  • Windows 11安卓子系统深度解析:开发者实战指南与技术决策框架
  • 操作系统(四)
  • 《UNIX环境高级编程》读书笔记05: 文件和目录
  • nli-MiniLM2-L6-H768详细步骤:supervisor日志轮转配置防止/workspace日志爆满
  • ToastFish:如何在工作间隙悄无声息地提升英语词汇量?
  • 手机千问 文心 元宝 Kimi怎么导出pdf
  • 【金融级容器安全合规白皮书】:Docker 27等保2.0三级适配全栈落地指南(含央行《金融科技产品认证规则》映射表)
  • Conductor微服务编排引擎:5步掌握分布式工作流管理
  • 2026年3月知名的保温被品牌推荐,温室大棚遮阳网/散射幕布/内遮阳保温幕/保温被/黑白遮阳网,保温被品牌口碑推荐 - 品牌推荐师
  • C++初阶:入门基础
  • StructBERT中文large模型效果展示:句式变换(主动/被动)、同义词替换高鲁棒性案例
  • 【踩坑】你以为在过人机验证,实际上正亲手把木马装进电脑 | ClickFix攻击
  • JSON 小传:从 JavaScript 捡来的“数据网红”
  • 必知必会:大模型对齐数据构造与PPO算法详解
  • 2026五一出行运动扭伤,五种常用止痛药怎么选?
  • 2026变频互感器测试仪技术解析:互感器励磁特性综合测试仪/互感器特性测试仪/充气式试验变压器/变压器综合特性测试仪/选择指南 - 优质品牌商家
  • Android蓝牙开发深度解析:从技术基础到面试准备
  • 如何快速掌握AssetRipper:Unity资源逆向工程的完整指南
  • CMOS与双极型运算放大器特性对比与应用设计
  • 收藏!2026年大模型红利爆发|程序员+小白必看,阿里跳槽案例+薪资表
  • 2026年郑州博亚财务服务有限公司性价比高吗? - myqiye
  • Phi-3-mini-4k-instruct-gguf部署教程:多模型并行服务配置与端口路由策略
  • 必知必会:奖励模型训练与PPO稳定训练方法详解