当前位置: 首页 > news >正文

TDengine TMQ 消费流程 — 从 Subscribe 到 Commit 的完整链路

分类:6.数据订阅 TMQ |篇章:02 消费流程

适用版本:TDengine v3.x(v3.3.x / v3.4.x) | 最后更新:2026-06-30

理解 TMQ 消费的内部链路有助于诊断性能问题、设计可靠的消费应用。本文按 Subscribe → Assign → Poll → Process → Commit 五个阶段拆解。

核心概念速查表

概念说明
Subscribe订阅 Topic
Assign分区分配
Fetch从服务端拉取消息
Poll客户端循环调用拉取
Process业务处理
Commit提交位点
Heartbeat心跳保活

详细解析

1. Subscribe 阶段

Subscribe 流程: ① 客户端调用 subscribe(["topic1", "topic2"]) ② 客户端向 MNode 注册 Consumer: - 上报 group_id, consumer_id, 订阅的 Topic 列表 ③ MNode 加入 Consumer 到对应 Group ④ 触发 Rebalance: - 重新计算分区分配 - 通知所有组员 ⑤ Consumer 收到分配方案: - 知道自己负责哪些 VGroup ⑥ 准备从各 VGroup 的对应 Offset 开始消费

2. Assign 与 Rebalance

分区分配算法(Range / Round-Robin 等): 示例: Topic 在 6 个 VGroup 有数据 Group 有 3 个 Consumer 分配: Consumer-1: VG1, VG2 Consumer-2: VG3, VG4 Consumer-3: VG5, VG6 当 Consumer 加入/离开: ① MNode 检测到变化 ② 发起 Rebalance: - 收回所有分区 - 按新成员重新分配 - 通知所有 Consumer ③ Consumer 暂停 Poll ④ 提交未提交 Offset(可配置) ⑤ 接收新分区 ⑥ 恢复 Poll Rebalance 期间: - 消费暂停(短暂) - 可能重复消费(未 Commit 部分) - 应用应设计幂等

3. Fetch 阶段

Fetch 从服务端拉取数据: Client VNode │ │ │── Fetch (offset=N) ───→│ │ │── 从 WAL 读取 N 之后的数据 ─┐ │ │← ────────── 返回 ─────────┘ │ │── 应用 Topic SQL 过滤 ────┐ │ │← ────── 过滤后数据 ─────────┘ │ │ │← ── Messages ──────────│ │ │ Fetch 特性: - 长轮询(无数据时短暂等待) - 批量返回(多条消息打包) - 每个 VGroup 独立 Fetch - 多 VGroup 并发拉取

4. Poll 循环

应用层 Poll 循环: while running: msgs = consumer.poll(timeout=1.0) if msgs is None: continue // 超时无消息 for msg in msgs: process(msg) if should_commit: consumer.commit() Poll 内部: ① 从各 VGroup 缓冲队列取消息 ② 缓冲不足 → 异步 Fetch 补充 ③ 返回当前可用的消息批 ④ 超时返回 None

5. Topic SQL 过滤

Topic 中的 SQL 在哪里执行? CREATE TOPIC topic_high_current AS SELECT * FROM meters WHERE current > 100; 执行位置: ① 写入时:数据写入 WAL(未过滤) ② Fetch 时:VNode 读取 WAL → 应用 SQL 过滤 → 返回过滤后数据 优势: - 网络传输已过滤数据(节省带宽) - Consumer 端处理逻辑简化 - 多 Consumer 共享过滤计算

6. Commit 与位点持久化

Commit 流程: ① Consumer 调用 commit(offsets) ② 客户端组装 Commit 请求 ③ 发送到 MNode ④ MNode 持久化位点: - 写入元数据库 - 多副本同步 ⑤ 返回成功 Commit 类型: - Sync Commit:等待服务端确认 - Async Commit:发送后立即返回 - Auto Commit:定期自动 Commit 位点存储: - 按 (group_id, topic, vgroup_id) 维度存储 - 仅存储最新位点(不存历史) - Group 删除时位点清理

7. 心跳保活

Consumer 心跳: Consumer 定期向 MNode 发送 Heartbeat: - 报告存活 - 上报消费进度(可选) MNode 维护 Consumer 状态: - 收到 Heartbeat → 标记活跃 - 超过 session.timeout → 标记失联 - 失联触发 Rebalance 关键参数: heartbeat.interval.ms // 心跳间隔 session.timeout.ms // 失联阈值

8. 异常处理

常见异常及处理: ① 网络断开: - 客户端 SDK 自动重连 - 重连成功后继续 Poll ② Rebalance: - Poll 短暂阻塞 - 注册 RebalanceCallback 处理状态 ③ 消息处理失败: - 不 Commit → 下次 Poll 重发 - 业务必须幂等 ④ Consumer 崩溃: - 心跳超时 → 触发 Rebalance - 其他 Consumer 接管分区 - 从最近 Commit 位置继续 ⑤ 服务端 WAL 滚动后丢失早期数据: - Consumer Offset 落后过多 - 错误:offset out of range - 处理:重置为 earliest 或调大 WAL 保留期

代码示例

完整消费骨架(Python)

fromtaos.tmqimportConsumerdefprocess(msg):forblockinmsg:forrowinblock:handle_row(row)consumer=Consumer({"group.id":"worker_group","auto.offset.reset":"earliest","enable.auto.commit":"false","session.timeout.ms":"30000",})consumer.subscribe(["topic_meters"])try:whileTrue:msg=consumer.poll(timeout=1.0)ifmsgisNone:continuetry:process(msg)consumer.commit()exceptExceptionase:log.error(f"Process failed:{e}")# 不 commit → 下次重试finally:consumer.close()

Rebalance 监听

defon_assign(consumer,partitions):print(f"Assigned:{partitions}")defon_revoke(consumer,partitions):print(f"Revoked:{partitions}")consumer.commit()# 失去分区前提交consumer.subscribe(["topic_meters"],on_assign=on_assign,on_revoke=on_revoke)

性能考量

消费延迟分析

阶段典型延迟
写入到 WAL 可读< 10ms
Fetch 网络往返1~10ms
过滤计算取决于 SQL 复杂度
客户端处理业务决定
Commit 持久化5~20ms

高吞吐配置

高吞吐场景: - 大批量 Fetch(max.poll.records 调大) - 异步 Commit - 增加 Consumer 数(不超过 VGroup 数) - 处理逻辑异步化(线程池) 低延迟场景: - 短 timeout - 频繁 Poll - 同步 Commit 保证有序

FAQ

Q1: 一次 Poll 返回多少消息?

由服务端 batch 决定,通常几十~几千行。客户端可通过参数限制最大批量。

Q2: Commit 失败怎么办?

捕获异常重试。重复 Commit 是幂等的(多次 Commit 同一 Offset 无副作用)。

Q3: 同一消息能被多次 Commit 吗?

可以。Commit 只是更新位点最大值,不影响消息本身。

Q4: Consumer 长时间不 Commit 会怎样?

  • 占用大量 WAL 空间(无法清理)
  • Rebalance 后会重复消费很多数据
  • 建议至少定期 Commit

Q5: 删除 Consumer Group 怎么做?

DROPCONSUMERGROUPgroup_idONtopic_name;

参考

系统构架篇

  • 01-《TDengine 整体架构全景》
  • 02-《集群拓扑深度解析》
  • 03-《MNode 内部机制深度解析》
  • 04-《RPC 通信层深度解析》
  • 05-《VNode 生命周期》
  • 06-《RAFT 共识协议》
  • 07-《端到端的消息流》

数据模型

  • 01-《数据库创建与参数详解》
  • 02-《超级表/子表/普通表》
  • 03-《支持数据类型深度解析》
  • 04-《TDengine Tag 设计哲学与 Schema 变更机制》
  • 05-《TDengine 虚拟表实现原理》

存储引擎

  • 01-《TDengine 存储引擎概览》
  • 02-《TDengine MemTable 深度解析》
  • 03-《TDengine WAL 预写日志机制》
  • 04-《TDengine 数据文件格式》
  • 05-《TDengine Commit 与 Flush 机制 》
  • 06-《TDengine Compaction 合并策略 》
  • 07-《TDengine 数据保留与 TTL》
  • 08-《TDengine 压缩编码机制》
  • 09-《TDengine Cache 与 Last 查询加速》
  • 10-《TDengine 逻辑计划生成》

查询引擎

  • 01-《TDengine 查询引擎概览》
  • 02-《TDengine SQL 解析与词法分析》
  • 03-《TDengine 语义分析与 AST 重写》
  • 04-《TDengine 逻辑计划生成》
  • 05-《TDengine 物理计划生成》
  • 06-《TDengine 扫描算子》
  • 07-《TDengine 聚合算子》
  • 08-《TDengine 聚合算子》
  • 09-《TDengine 连接算子》
  • 10-《TDengine 排序、填充与投影》
  • 11-《TDengine 分布式查询执行》
  • 12-《TDengine EXPLAIN 与查询优化》

数据写入

  • 01-《TDengine SQL INSERT》
  • 02-《TDengine 无模式写入》
  • 03-《TDengine STMT 写入》
  • 04-《TDengine 写入内部流程》
  • 05-《TDengine 数据更新删除》

数据订阅

  • 01-《TDengine 数据订阅》
  • 02-《TDengine 订阅 vs Kafka》

关于 TDengine

TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。

http://www.jsqmd.com/news/1099786/

相关文章:

  • RedisDesktopManager Windows版:Windows平台终极Redis数据库管理工具完整指南
  • 计算机Java毕设实战-基于 SpringBoot 的二次元游戏周边购物商城系统的设计与实现 基于 SpringBoot 的游戏周边商品买卖管理【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 从声学参数看入门吉他选择——法雅特梵高日记与雅马哈FS系列实测对比
  • 2026年买口碑好的TPU薄膜,这些销售厂家值得重点关注!
  • 原始字面量 _
  • 6款论文降AI率软件横评:AI率直降安全线,学生党必入平价款
  • Bubble Tea:用 Go 写终端 UI,这事没那么难
  • GPT-5.6全面公开与Cerebras 750 t/s上线:从受限预览到开发者普惠
  • 第9篇:《AMS1117输出振荡排查:输出电容用陶瓷替代钽电容的稳定性问题》
  • MiniMax Code Plan 限时 9 折!分享我的订阅体验和优惠领取方式
  • 孟获MengHuo——一站式智能直播信息采集分析工具
  • 泰戈尔的诗歌
  • 【毕业设计】基于 SpringBoot 的动漫游戏周边线上交易服务系统的设计与实现 基于 SpringBoot 的游戏手办周边销售管理系统(源码+文档+远程调试,全bao定制等)
  • ChatGPT Pro 200美元付款失败怎么办?国内用户没有海外卡怎么开通更稳妥
  • 第十章 结构体与共用体 结构体仿真测试
  • 计算机Java毕设实战-基于 SpringBoot 的高校心理咨询服务管理系统的设计与实现 基于 SpringBoot 的学生心理健康档案管理系【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 开源多Agent投资研究框架ai-berkshire:从架构到部署实战
  • AIGC 应用上线前安全能力清单:模型、内容、账号、业务与合规
  • 强强联手赴慕展!中国星坤 × 立创商城,一站式解锁互连方案 + 全链条研发采购
  • 计算机毕业设计之二手书回收平台设计与实现
  • Web渗透测试课程学习心得:零基础入门Web安全攻防实战总结
  • 覆盖 190 国、400 品牌:中国 TV OS 如何撬开全球智慧家庭市场
  • Python学习笔记·第25天:Pandas高级技巧——用最通俗的话讲懂重采样、多索引和数据合并
  • Java毕设选题推荐:基于 SpringBoot 的潮流游戏周边网购交易平台的设计与实现 基于 SpringBoot 的游戏周边商品订单管理系统【附源码、mysql、文档、调试+代码讲解+全bao等】
  • VSCode Remote SSH 中 Codex 连接超时的排查与解决记录
  • 一句指令完成电脑操作!腾讯的AI助手Marvis让我电脑会干活了:改设置、查文件、整理文档
  • 请问微信小程序域名迁移有懂行的吗?有偿咨询
  • ChatGPTPlus和Pro怎么选?普通用户别再乱花钱了
  • SeaweedFS:33000 Star 的分布式文件系统,小文件读写做到了极致
  • Java毕设项目:基于 SpringBoot 的高校心理健康普查管理系统的设计与实现 基于 SpringBoot 的学生心理测评统计分析系统 (源码+文档,讲解、调试运行,定制等)