TDengine TMQ 消费流程 — 从 Subscribe 到 Commit 的完整链路
分类:6.数据订阅 TMQ |篇章:02 消费流程
适用版本:TDengine v3.x(v3.3.x / v3.4.x) | 最后更新:2026-06-30
理解 TMQ 消费的内部链路有助于诊断性能问题、设计可靠的消费应用。本文按 Subscribe → Assign → Poll → Process → Commit 五个阶段拆解。
核心概念速查表
| 概念 | 说明 |
|---|---|
| Subscribe | 订阅 Topic |
| Assign | 分区分配 |
| Fetch | 从服务端拉取消息 |
| Poll | 客户端循环调用拉取 |
| Process | 业务处理 |
| Commit | 提交位点 |
| Heartbeat | 心跳保活 |
详细解析
1. Subscribe 阶段
Subscribe 流程: ① 客户端调用 subscribe(["topic1", "topic2"]) ② 客户端向 MNode 注册 Consumer: - 上报 group_id, consumer_id, 订阅的 Topic 列表 ③ MNode 加入 Consumer 到对应 Group ④ 触发 Rebalance: - 重新计算分区分配 - 通知所有组员 ⑤ Consumer 收到分配方案: - 知道自己负责哪些 VGroup ⑥ 准备从各 VGroup 的对应 Offset 开始消费2. Assign 与 Rebalance
分区分配算法(Range / Round-Robin 等): 示例: Topic 在 6 个 VGroup 有数据 Group 有 3 个 Consumer 分配: Consumer-1: VG1, VG2 Consumer-2: VG3, VG4 Consumer-3: VG5, VG6 当 Consumer 加入/离开: ① MNode 检测到变化 ② 发起 Rebalance: - 收回所有分区 - 按新成员重新分配 - 通知所有 Consumer ③ Consumer 暂停 Poll ④ 提交未提交 Offset(可配置) ⑤ 接收新分区 ⑥ 恢复 Poll Rebalance 期间: - 消费暂停(短暂) - 可能重复消费(未 Commit 部分) - 应用应设计幂等3. Fetch 阶段
Fetch 从服务端拉取数据: Client VNode │ │ │── Fetch (offset=N) ───→│ │ │── 从 WAL 读取 N 之后的数据 ─┐ │ │← ────────── 返回 ─────────┘ │ │── 应用 Topic SQL 过滤 ────┐ │ │← ────── 过滤后数据 ─────────┘ │ │ │← ── Messages ──────────│ │ │ Fetch 特性: - 长轮询(无数据时短暂等待) - 批量返回(多条消息打包) - 每个 VGroup 独立 Fetch - 多 VGroup 并发拉取4. Poll 循环
应用层 Poll 循环: while running: msgs = consumer.poll(timeout=1.0) if msgs is None: continue // 超时无消息 for msg in msgs: process(msg) if should_commit: consumer.commit() Poll 内部: ① 从各 VGroup 缓冲队列取消息 ② 缓冲不足 → 异步 Fetch 补充 ③ 返回当前可用的消息批 ④ 超时返回 None5. Topic SQL 过滤
Topic 中的 SQL 在哪里执行? CREATE TOPIC topic_high_current AS SELECT * FROM meters WHERE current > 100; 执行位置: ① 写入时:数据写入 WAL(未过滤) ② Fetch 时:VNode 读取 WAL → 应用 SQL 过滤 → 返回过滤后数据 优势: - 网络传输已过滤数据(节省带宽) - Consumer 端处理逻辑简化 - 多 Consumer 共享过滤计算6. Commit 与位点持久化
Commit 流程: ① Consumer 调用 commit(offsets) ② 客户端组装 Commit 请求 ③ 发送到 MNode ④ MNode 持久化位点: - 写入元数据库 - 多副本同步 ⑤ 返回成功 Commit 类型: - Sync Commit:等待服务端确认 - Async Commit:发送后立即返回 - Auto Commit:定期自动 Commit 位点存储: - 按 (group_id, topic, vgroup_id) 维度存储 - 仅存储最新位点(不存历史) - Group 删除时位点清理7. 心跳保活
Consumer 心跳: Consumer 定期向 MNode 发送 Heartbeat: - 报告存活 - 上报消费进度(可选) MNode 维护 Consumer 状态: - 收到 Heartbeat → 标记活跃 - 超过 session.timeout → 标记失联 - 失联触发 Rebalance 关键参数: heartbeat.interval.ms // 心跳间隔 session.timeout.ms // 失联阈值8. 异常处理
常见异常及处理: ① 网络断开: - 客户端 SDK 自动重连 - 重连成功后继续 Poll ② Rebalance: - Poll 短暂阻塞 - 注册 RebalanceCallback 处理状态 ③ 消息处理失败: - 不 Commit → 下次 Poll 重发 - 业务必须幂等 ④ Consumer 崩溃: - 心跳超时 → 触发 Rebalance - 其他 Consumer 接管分区 - 从最近 Commit 位置继续 ⑤ 服务端 WAL 滚动后丢失早期数据: - Consumer Offset 落后过多 - 错误:offset out of range - 处理:重置为 earliest 或调大 WAL 保留期代码示例
完整消费骨架(Python)
fromtaos.tmqimportConsumerdefprocess(msg):forblockinmsg:forrowinblock:handle_row(row)consumer=Consumer({"group.id":"worker_group","auto.offset.reset":"earliest","enable.auto.commit":"false","session.timeout.ms":"30000",})consumer.subscribe(["topic_meters"])try:whileTrue:msg=consumer.poll(timeout=1.0)ifmsgisNone:continuetry:process(msg)consumer.commit()exceptExceptionase:log.error(f"Process failed:{e}")# 不 commit → 下次重试finally:consumer.close()Rebalance 监听
defon_assign(consumer,partitions):print(f"Assigned:{partitions}")defon_revoke(consumer,partitions):print(f"Revoked:{partitions}")consumer.commit()# 失去分区前提交consumer.subscribe(["topic_meters"],on_assign=on_assign,on_revoke=on_revoke)性能考量
消费延迟分析
| 阶段 | 典型延迟 |
|---|---|
| 写入到 WAL 可读 | < 10ms |
| Fetch 网络往返 | 1~10ms |
| 过滤计算 | 取决于 SQL 复杂度 |
| 客户端处理 | 业务决定 |
| Commit 持久化 | 5~20ms |
高吞吐配置
高吞吐场景: - 大批量 Fetch(max.poll.records 调大) - 异步 Commit - 增加 Consumer 数(不超过 VGroup 数) - 处理逻辑异步化(线程池) 低延迟场景: - 短 timeout - 频繁 Poll - 同步 Commit 保证有序FAQ
Q1: 一次 Poll 返回多少消息?
由服务端 batch 决定,通常几十~几千行。客户端可通过参数限制最大批量。
Q2: Commit 失败怎么办?
捕获异常重试。重复 Commit 是幂等的(多次 Commit 同一 Offset 无副作用)。
Q3: 同一消息能被多次 Commit 吗?
可以。Commit 只是更新位点最大值,不影响消息本身。
Q4: Consumer 长时间不 Commit 会怎样?
- 占用大量 WAL 空间(无法清理)
- Rebalance 后会重复消费很多数据
- 建议至少定期 Commit
Q5: 删除 Consumer Group 怎么做?
DROPCONSUMERGROUPgroup_idONtopic_name;参考
系统构架篇
- 01-《TDengine 整体架构全景》
- 02-《集群拓扑深度解析》
- 03-《MNode 内部机制深度解析》
- 04-《RPC 通信层深度解析》
- 05-《VNode 生命周期》
- 06-《RAFT 共识协议》
- 07-《端到端的消息流》
数据模型
- 01-《数据库创建与参数详解》
- 02-《超级表/子表/普通表》
- 03-《支持数据类型深度解析》
- 04-《TDengine Tag 设计哲学与 Schema 变更机制》
- 05-《TDengine 虚拟表实现原理》
存储引擎
- 01-《TDengine 存储引擎概览》
- 02-《TDengine MemTable 深度解析》
- 03-《TDengine WAL 预写日志机制》
- 04-《TDengine 数据文件格式》
- 05-《TDengine Commit 与 Flush 机制 》
- 06-《TDengine Compaction 合并策略 》
- 07-《TDengine 数据保留与 TTL》
- 08-《TDengine 压缩编码机制》
- 09-《TDengine Cache 与 Last 查询加速》
- 10-《TDengine 逻辑计划生成》
查询引擎
- 01-《TDengine 查询引擎概览》
- 02-《TDengine SQL 解析与词法分析》
- 03-《TDengine 语义分析与 AST 重写》
- 04-《TDengine 逻辑计划生成》
- 05-《TDengine 物理计划生成》
- 06-《TDengine 扫描算子》
- 07-《TDengine 聚合算子》
- 08-《TDengine 聚合算子》
- 09-《TDengine 连接算子》
- 10-《TDengine 排序、填充与投影》
- 11-《TDengine 分布式查询执行》
- 12-《TDengine EXPLAIN 与查询优化》
数据写入
- 01-《TDengine SQL INSERT》
- 02-《TDengine 无模式写入》
- 03-《TDengine STMT 写入》
- 04-《TDengine 写入内部流程》
- 05-《TDengine 数据更新删除》
数据订阅
- 01-《TDengine 数据订阅》
- 02-《TDengine 订阅 vs Kafka》
关于 TDengine
TDengine 专为物联网IoT平台、工业大数据平台设计。其中,TDengine TSDB 是一款高性能、分布式的时序数据库(Time Series Database),同时它还带有内建的缓存、流式计算、数据订阅等系统功能;TDengine IDMP 是一款AI原生工业数据管理平台,它通过树状层次结构建立数据目录,对数据进行标准化、情景化,并通过 AI 提供实时分析、可视化、事件管理与报警等功能。
