当前位置: 首页 > news >正文

MongoDB实现发布订阅机制

一、MongoDB Pub/Sub 的实现原理
MongoDB 的发布订阅不是像 Redis 那样的原生 “频道式” Pub/Sub,而是基于变更流(Change Streams)(MongoDB 3.6+ 推荐)或早期的tailable cursor(可尾游标)实现:
Change Streams:监听集合 / 数据库 / 集群的实时数据变更(插入、更新、删除等),订阅者通过监听这些变更事件来接收 “消息”,发布者则通过向集合插入文档来 “发布消息”。
核心逻辑:把 “消息” 封装成文档插入 MongoDB 集合,订阅者监听该集合的插入事件,从而实现消息的发布与订阅。

const path = require('path'); const paths = require('../paths'); const dbManager = require(path.join(paths.manager, 'dbManager')) const COLLECTION_NAME = 'messages'; // 存储消息的集合名 // 并发 / 吞吐量能力(参考值,普通服务器硬件) // 指标 MongoDB(tailable cursor) Redis Pub/Sub MongoDB(Change Streams) // 单频道订阅者数量 支持 100+(无明显卡顿) 支持 1000+ 支持 50+ // 消息发布 QPS 1 万 - 5 万 / 秒 10 万 + / 秒 5000 - 2 万 / 秒 // 消息延迟(发布→接收) 1-10ms <1ms 5-20ms /** * 初始化:创建固定集合(仅需执行一次) * 固定集合是tailable cursor的前提,大小限制100MB,自动覆盖旧文档 */ async function initCappedCollection() { try { // 检查集合是否存在,不存在则创建固定集合 const collections = await dbManager.getDB().listCollections({ name: COLLECTION_NAME }).toArray(); if (collections.length === 0) { await dbManager.getDB().createCollection(COLLECTION_NAME, { capped: true, // 启用固定集合 size: 100 * 1024 * 1024, // 集合最大大小100MB max: 10000 // 最多存储10000条文档(二选一,满足其一即触发覆盖) }); console.log('固定集合创建成功'); } } catch (err) { console.error('初始化集合失败:', err); } } /** * 发布者:向集合插入消息文档(模拟发布) * @param {string} channel 频道名(区分不同类型的消息) * @param {any} data 要发布的消息内容 */ async function publish(channel, data) { try { const collection = dbManager.getDB().collection(COLLECTION_NAME); // 插入消息文档(包含频道、内容、时间戳) await collection.insertOne({ channel: channel, data: data, timestamp: new Date(), }); console.log(`[发布] 频道${channel}:`, data); } catch (err) { console.error('发布失败:', err); } } /** * 订阅者:监听指定频道的消息(模拟订阅) * @param {string} channel 要订阅的频道名 */ async function subscribe(channel) { try { const collection = dbManager.getDB().collection(COLLECTION_NAME); console.log(`[订阅] 开始监听频道${channel}...`); let lastId = null; const lastDoc = await collection.findOne( { channel: channel }, { sort: { $natural: -1 } } // 仅查询最后一条时用倒序,tailable cursor本身不用 ); if (lastDoc) { lastId = lastDoc._id; } // 创建tailable cursor(持续监听最新文档) const cursor = collection.find( lastId ? { channel: channel, _id: { $gt: lastId } } : { channel: channel }, // 只读新文档 { tailable: true, // 启用可尾游标 awaitData: true, // 等待新数据(阻塞查询) noCursorTimeout: true, // 禁用游标超时 sort: { $natural: 1 } // 仅支持正序 } ); // 持续遍历游标,接收新消息 while (true) { if (await cursor.hasNext()) { // 有新文档时触发 const message = await cursor.next(); console.log(`[接收] 频道${channel}:`, message.data); } else { // 无新数据时短暂等待,避免CPU空转 await new Promise(resolve => setTimeout(resolve, 100)); } } } catch (err) { console.error('订阅失败:', err); } } // -------------------------- 示例调用 -------------------------- async function runDemo() { await initCappedCollection() // 启动订阅者(监听"chat"频道) subscribe('chat'); // 延迟1秒后发布3条测试消息 setTimeout(() => publish('chat', { user: '张三', msg: '你好!' }), 1000); setTimeout(() => publish('chat', { user: '李四', msg: 'MongoDB Pub/Sub测试' }), 2000); setTimeout(() => publish('notice', { content: '这是通知频道,订阅者收不到' }), 3000); // 其他频道不接收 } module.exports = { runDemo, };

这个方案的性能消耗属于轻到中等级别,远低于 Redis Pub/Sub(Redis 是纯内存轻量级),但高于 MongoDB Change Streams(副本集模式),适合中小规模的消息场景(如单服务 / 低并发通知),不适合高并发、低延迟的核心业务场景。

1. CPU 消耗
客户端(Node.js 服务):
核心消耗点:cursor.hasNext() 循环 + 消息处理回调。
关键优化:开启 awaitData: true(代码中已配置)时,MongoDB 驱动会采用阻塞式 IO 等待新数据,而非空轮询,此时 CPU 占用极低(单订阅频道 CPU 使用率 < 1%);若关闭 awaitData 改用主动轮询(如每 100ms 查一次),CPU 会飙升至 10%+(空转消耗)。
多频道订阅:每增加一个频道,CPU 消耗线性增加,但单进程支持 50+ 频道订阅时,CPU 仍可控制在 5% 以内。
服务端(MongoDB):
固定集合(capped collection)的插入 / 查询都是顺序 IO(而非普通集合的随机 IO),CPU 消耗极低;
维护 tailable cursor 的开销可忽略(单节点下,100 个 cursor 仅占用 < 0.5 核 CPU)。
2. 内存消耗
客户端(Node.js 服务):
每个订阅频道对应一个 cursor 对象,内存占用仅几 KB;即使 100 个频道订阅,总内存增加也不足 1MB。
仅需缓存少量消息元数据(如最后一条消息的 _id),无额外内存压力。
服务端(MongoDB):
固定集合的元数据(索引、游标信息)占用内存约几十 MB;
固定集合本身会按 size 配置(如 100MB)加载部分数据到内存(MongoDB 缓存机制),但可通过 wiredTiger 缓存配置限制,避免内存溢出。
3. 磁盘 IO 消耗
固定集合是 MongoDB 中磁盘 IO 效率最高的集合类型:
插入消息时是顺序写(磁盘磁头无需频繁移动),比普通集合的随机写快 5-10 倍;
tailable cursor 读取消息是顺序读,几乎无随机 IO,磁盘 IO 利用率 > 90%;
对比普通集合:普通集合的随机写 IO 消耗是固定集合的 3-5 倍。
4. 网络消耗
连接层面:tailable cursor 会保持长连接(无频繁重连),无额外心跳包开销;
数据层面:仅传输新插入的消息数据(含 _id/channel/data/timestamp 等元数据),单条消息的网络包比 Redis Pub/Sub 大(Redis 仅传输纯消息内容,MongoDB 多文档元数据),但整体网络消耗仍属低级别;
并发订阅:多订阅者监听同一频道时,MongoDB 会为每个订阅者推送消息,网络消耗随订阅者数量线性增加(Redis 同理)。

http://www.jsqmd.com/news/289907/

相关文章:

  • conda下安装cuda11.8和cudnn
  • 2026最新conda镜像源
  • GVHMR输出的.pt文件最全面分析
  • 《知行合一的价值革命:评〈AI元人文:悟空而行〉的思想、方法与伦理突破》
  • 【微服务注册与管理开源框架】从选型到实战(Nacos/Eureka/Consul/etcd/Zookeeper) - 实践
  • 高增长科技股投资法 核心内容深度拆解
  • 特殊符号绕过-ctfshow-web40
  • 国产Jira方案哪家强?2026年Jira替代工具测评指南
  • Halcon彩图阈值分割、腐蚀和膨胀、顶帽和底帽处理、求图像边界轮廓
  • Halcon图像滤波:均值滤波、中值滤波、高斯滤波、高通滤波、标准差滤波
  • 【必藏】零代码实现!告别AI幻觉,搭建专属知识库的RAG实战教程
  • 【强烈收藏】AI Agent全栈开发之路(15):RAG技术详解与向量模型实战
  • 【爆点实战】Spring AI电商客服RAG系统,双Advisor精准解答退换货、物流问题!代码收藏必学!
  • 云平台一键部署【Tencent-YouTu-Research/Youtu-LLM-2B】具备原生智能体能力
  • RAG系统效果差?真正决定成败的竟然是检索与生成之间的隐藏层!收藏这篇深度解析
  • 救命神器10个AI论文写作软件,本科生搞定毕业论文!
  • Java基于Spring Boot+Vue的走散儿童救助信息管理系统的设计与实现
  • Java基于Spring Boot+Vue的学生宿舍管理系统的设计于实现
  • 2026年GEO优化服务商数据监测能力对比:谁才是真正的_数据驱动_?
  • 2026主管药师考试备考资料推荐:3大数据维度测评+全阶段资料对比排行
  • AI Agent从零搭建全流程:手把手教你构建智能助手(附完整代码+避坑指南,建议收藏)
  • 2026主管药师考试备考资料测评:3家主流机构8类资料实测对比排行
  • 【建议收藏】RAG工程化实践:六大模块详解,解决效果/成本/稳定性难题
  • 专业企业心理测评系统推荐:2026这个平台如何破解企业EAP传统困境?
  • 2026年1月成都清洁用品、清洁工具、拖把、尘推、垃圾袋厂商深度测评与选型推荐报告
  • 【必藏】AI智能体全攻略:从架构设计到实战应用,一篇读懂Agent核心技术与未来趋势
  • 2026 年,GEO 优化如何选?风信子传媒:以“内容生态+智能分发”重塑品牌 AI 认知
  • 2026年十大外贸ERP软件深度测评与选型白皮书
  • 2026必备!MBA论文写作痛点全解析:TOP9一键生成论文工具深度测评
  • 2026年电线电缆厂家推荐排行榜:高温/低烟无卤/铁氟龙/硅胶/PVC/XLPE辐照/医疗/AI/无人机/机器人/线束加工/定制电线电缆,精选耐用高质品牌!