当前位置: 首页 > news >正文

【Outbox 事件驱动 + Canal Binlog 增量订阅】:用户关系模块架构实战详解

🔥你好我是fengxin_rou这是我的个人主页fengxin_rou的主页

❄️欢迎查看我的专栏我的专栏

《Java后端学习》、《JAVASE基础》、《JUC并发》、《redis》、《JVM虚拟机》、《MYSQL》、《黑马点评》、《rabbitmq》、《JavaWeb+AI的talis学习系统》、《苍穹外卖》

目录

前言

一、整体架构流程与核心原理

1.1 架构拓扑链路

1.2 关键基础概念解释

二、Canal→Kafka 桥接器核心实现

2.1 组件职责

2.2 核心源码实现

2.3 核心逻辑说明

三、Canal Outbox 消费者实现

3.1 组件职责

3.2 核心源码实现

3.3 设计亮点

四、关系事件处理器幂等与业务处理

4.1 核心职责

4.2 幂等与业务逻辑源码

4.3 关键设计解析

结语


前言

在社交类系统用户关注 / 粉丝关系场景中,直面数据强一致性、高并发解耦、缓存与数据库双写同步等痛点。传统同步落库方式耦合度高、吞吐瓶颈明显,而Outbox 事件驱动结合Canal Binlog 增量订阅架构,可实现业务解耦、数据异步同步、缓存自动维护,是中大型社交系统用户关系模块的最优落地方案之一。本文从架构原理、核心组件源码、事件处理流程、幂等设计四大维度完整拆解实战落地细节。

一、整体架构流程与核心原理

1.1 架构拓扑链路

整套架构采用MySQL 业务表 + Outbox 事件表 + Canal 监听 Binlog+Kafka 消息中转 + 消费业务处理的标准 CDC 事件驱动模型。 核心链路流转:

  1. 用户执行关注 / 取关操作,写入following关注业务表,同时写入Outbox 事件表生成业务事件;
  2. Canal实时监听 MySQL Binlog 日志,捕获 Outbox 表数据变更;
  3. Canal 桥接器过滤无关表事件,解析 Binlog 为标准 JSON 消息投递至 Kafka 指定 Topic;
  4. 消费者订阅 Kafka 消息,反序列化为关系事件,完成伪从表同步、Redis 缓存维护、用户计数更新。

整个架构最大优势是业务代码无侵入,通过 Binlog 增量订阅实现数据变更被动感知,异步化解耦核心业务与缓存、统计、冗余表同步逻辑。

1.2 关键基础概念解释

空批次(Empty Batch):Canal 拉取的消息条目列表为空,代表当前 MySQL 无任何数据变更事件,无业务消息需要处理。

心跳消息(Heartbeat):Canal 服务端推送batchId=-1的特殊消息,核心作用是维持客户端与服务端长连接,避免长时间无数据传输导致连接超时断开,同时做服务健康状态探活。

当检测到空批次或心跳消息时,架构采用间隔休眠轮询策略,减少无效 CPU 空转,保障服务资源合理利用。

二、Canal→Kafka 桥接器核心实现

2.1 组件职责

CanalKafkaBridge作为架构中转核心,负责连接 Canal 服务、监听 Binlog、过滤 Outbox 表事件、序列化为 JSON 并投递 Kafka。只监听配置过滤表达式指定的 Outbox 表,忽略其他业务表变更,减少消息冗余。

2.2 核心源码实现

/** * 启动桥接器:消费 Canal 并投递到 Kafka。 */ @Override public void start() { if (running) { log.info("Canal bridge start skipped: running={} enabled={} host={} port={} dest={} filter={}", running, enabled, host, port, destination, filter); return; } // 标记运行并使用全局线程池异步执行主循环 running = true; taskExecutor.execute(() -> { try { // 创建Canal单实例连接器并建立连接 connector = CanalConnectors.newSingleConnector( new InetSocketAddress(host, port), destination, username, password); log.info("Canal connecting to {}:{} dest={} user={} filter={}", host, port, destination, username, filter); connector.connect(); // 订阅过滤表达式,仅拉取关心的Outbox表 connector.subscribe(filter); // 回滚到上次确认位点,保证消息消费一致性 connector.rollback(); log.info("Canal connected and subscribed: host={} port={} dest={} filter={} batchSize={} intervalMs={}ms", host, port, destination, filter, batchSize, intervalMs); while (running) { // 拉取一批未确认消息,不自动ack Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); // 空批次或心跳消息,休眠后继续轮询 if (batchId == -1 || message.getEntries() == null || message.getEntries().isEmpty()) { try { Thread.sleep(intervalMs); } catch (InterruptedException ignored) {} continue; } // 遍历解析行级数据变更事件 for (CanalEntry.Entry entry : message.getEntries()) { // 只处理行数据变更,忽略事务、DDL等事件 if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue; } CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { continue; } // 仅转发新增、更新事件 CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) { continue; } // 封装消息并投递Kafka ArrayNode dataArray = objectMapper.createArrayNode(); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { ObjectNode rowNode = objectMapper.createObjectNode(); for (CanalEntry.Column col : rowData.getAfterColumnsList()) { // 提取Outbox核心payload事件内容 if ("payload".equalsIgnoreCase(col.getName())) { rowNode.put("payload", col.getValue()); } } dataArray.add(rowNode); } ObjectNode msgNode = objectMapper.createObjectNode(); msgNode.put("table", entry.getHeader().getTableName()); msgNode.put("type", eventType == CanalEntry.EventType.INSERT ? "INSERT" : "UPDATE"); msgNode.set("data", dataArray); // 发送至canal-outbox主题 String json = objectMapper.writeValueAsString(msgNode); kafka.send(OutboxTopics.CANAL_OUTBOX, json); } // 手动确认批次,推进消费位点,避免消息重放 connector.ack(batchId); } } catch (Exception e) { log.error("Canal bridge error", e); } finally { // 资源释放,断开Canal连接 if (connector != null) { try { connector.disconnect(); log.info("Canal disconnected: dest={}", destination); } catch (Exception ex) { log.warn("Canal disconnect failed: dest={} err={}", destination, ex.getMessage()); } } } }); }

2.3 核心逻辑说明

  1. 异步循环监听:通过线程池启动独立循环,阻塞式拉取 Canal 消息;
  2. 事件过滤:只保留行级 INSERT/UPDATE 事件,过滤 DDL、删除、事务事件;
  3. 位点保障:采用getWithoutAck手动拉取、处理完成后ack确认,保证消息至少一次投递;
  4. 消息封装:仅提取 Outbox 表payload核心字段,精简消息体积,提升下游消费效率。

三、Canal Outbox 消费者实现

3.1 组件职责

CanalOutboxConsumer订阅 Kafka 的canal-outbox主题,负责消费消息、解析 JSON 载荷、反序列化为业务事件,并调用事件处理器完成后续业务逻辑,采用手动 ACK保证消息消费可靠性。

3.2 核心源码实现

package com.tongji.relation.outbox; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.tongji.relation.event.RelationEvent; import com.tongji.relation.processor.RelationEventProcessor; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import java.util.List; import com.tongji.common.util.OutboxMessageUtil; /** * Canal Outbox 消费者 * 消费Canal桥接消息,解析payload为关系事件,交由处理器处理 */ @Service public class CanalOutboxConsumer { private final ObjectMapper objectMapper; private final RelationEventProcessor processor; public CanalOutboxConsumer(ObjectMapper objectMapper, RelationEventProcessor processor) { this.objectMapper = objectMapper; this.processor = processor; } @KafkaListener(topics = OutboxTopics.CANAL_OUTBOX, groupId = "relation-outbox-consumer") public void onMessage(String message, Acknowledgment ack) { try { // 工具类提取消息数据行 List<JsonNode> rows = OutboxMessageUtil.extractRows(objectMapper, message); if (rows.isEmpty()) { ack.acknowledge(); return; } // 遍历解析每一条关系事件 for (JsonNode row : rows) { JsonNode payloadNode = row.get("payload"); if (payloadNode == null) { continue; } // 反序列化为业务事件 RelationEvent evt = objectMapper.readValue(payloadNode.asText(), RelationEvent.class); processor.process(evt); } // 全部处理完成后手动提交位点 ack.acknowledge(); } catch (Exception ignored) {} } }

3.3 设计亮点

采用批量处理、统一手动 ACK机制,只有所有事件处理完成后才提交 Kafka 位点。 若中途异常,不会提交位点,重启后可重新消费,天然实现消息重试与业务幂等兜底。

四、关系事件处理器幂等与业务处理

4.1 核心职责

RelationEventProcessor是业务逻辑核心,负责事件幂等去重、粉丝伪从表同步、Redis ZSet 缓存维护、关注 / 粉丝计数原子更新,同时设置缓存 TTL 规避缓存数据陈旧问题。

4.2 幂等与业务逻辑源码

/** * 关系事件处理器 * 职责:事件去重防抖、幂等处理、落库同步、缓存维护、计数更新 */ @Service public class RelationEventProcessor { private final RelationMapper mapper; private final StringRedisTemplate redis; private final UserCounterService userCounterService; public RelationEventProcessor(RelationMapper mapper, StringRedisTemplate redis, UserCounterService userCounterService) { this.mapper = mapper; this.redis = redis; this.userCounterService = userCounterService; } public void process(RelationEvent evt) { // 构造幂等去重Key:事件类型+发起用户+目标用户+事件ID String dk = "dedup:rel:" + evt.type() + ":" + evt.fromUserId() + ":" + evt.toUserId() + ":" + (evt.id() == null ? "0" : String.valueOf(evt.id())); // 10分钟幂等锁,防止重复消费 Boolean first = redis.opsForValue().setIfAbsent(dk, "1", Duration.ofMinutes(10)); if (first == null || !first) { return; } // 关注创建事件 if ("FollowCreated".equals(evt.type())) { mapper.insertFollower(evt.id(), evt.toUserId(), evt.fromUserId(), 1); long now = System.currentTimeMillis(); // 维护关注、粉丝ZSet有序缓存 redis.opsForZSet().add("uf:flws:" + evt.fromUserId(), String.valueOf(evt.toUserId()), now); redis.opsForZSet().add("uf:fans:" + evt.toUserId(), String.valueOf(evt.fromUserId()), now); // 设置缓存2小时TTL redis.expire("uf:flws:" + evt.fromUserId(), Duration.ofHours(2)); redis.expire("uf:fans:" + evt.toUserId(), Duration.ofHours(2)); // 原子更新计数 userCounterService.incrementFollowings(evt.fromUserId(), 1); userCounterService.incrementFollowers(evt.toUserId(), 1); } // 取关取消事件 else if ("FollowCanceled".equals(evt.type())) { mapper.cancelFollower(evt.toUserId(), evt.fromUserId()); // 移除缓存中对应关系 redis.opsForZSet().remove("uf:flws:" + evt.fromUserId(), String.valueOf(evt.toUserId())); redis.opsForZSet().remove("uf:fans:" + evt.toUserId(), String.valueOf(evt.fromUserId())); redis.expire("uf:flws:" + evt.fromUserId(), Duration.ofHours(2)); redis.expire("uf:fans:" + evt.toUserId(), Duration.ofHours(2)); // 扣减关注、粉丝计数 userCounterService.incrementFollowings(evt.fromUserId(), -1); userCounterService.incrementFollowers(evt.toUserId(), -1); } } }

4.3 关键设计解析

  1. 幂等去重:基于 RedissetIfAbsent构造唯一去重键,10 分钟有效期,规避 Kafka 重复消费、消息重放导致的数据错乱;
  2. ZSet 缓存设计:采用时间戳作为分数,有序维护关注 / 粉丝列表,支持分页、排序查询;
  3. 缓存 TTL 策略:设置 2 小时过期,自动清理冷数据,减少内存占用,兜底缓存一致性问题;
  4. 计数原子更新:通过独立计数服务增减关注数、粉丝数,避免数据库频繁聚合查询,提升接口响应速度。

结语

本文完整拆解了Outbox 事件驱动 + Canal Binlog 增量订阅在用户关系模块的落地架构,涵盖 Canal 桥接、Kafka 消息中转、消费者解析、事件幂等处理四大核心环节。

该架构核心价值在于业务解耦、无侵入数据同步、天然支持高并发与削峰填谷,同时通过手动位点、Redis 幂等锁、缓存 TTL 三重机制保障数据一致性。适用于社交平台、社区 APP 等需要维护关注 / 粉丝关系、异步缓存更新、数据冗余表同步的业务场景。

进阶优化可从 Canal 集群化、Kafka 分区并行消费、本地消息表兜底、缓存预热等方向扩展,进一步提升架构可用性与吞吐能力。

http://www.jsqmd.com/news/860909/

相关文章:

  • 如何快速掌握《鸣潮》游戏模组开发:专业逆向工程与AES加密技术完整指南
  • DicomObjects COM -Release Date: 2026-05-18
  • minecraft-ondemand自动化运维:Watchdog容器原理与实现
  • 如何安全提取未知文件:unblob的5大安全防护机制实战指南
  • AALC自动化工具完整指南:如何用智能助手彻底优化《Limbus Company》游戏时间
  • 龙鱼缸设备怎么配不踩坑?灯光+水泵+滤材的搭配清单 - 华旭传媒
  • NCM文件转换终极指南:3步快速解密网易云音乐加密音频
  • 企业AI开发包含哪些内容:从需求分析到交付落地的完整指南 - 华旭传媒
  • MapReduce数据倾斜解决方案
  • gibMacOS终极指南:三步完成macOS组件下载与系统部署
  • 5分钟快速上手!网易云无损音乐下载完整指南:免费获取高品质音乐
  • Tunasync多数据库后端支持:Bolt、Badger、Redis、LevelDB对比分析
  • Magma高可用部署:如何构建企业级可靠网络基础设施
  • 重庆白发养黑理疗机构哪家好?黑奥秘牵头制定行业标准,专业服务更规范 - 美业信息观察
  • 【卷卷观察】Google I/O 2026 炸场:AI 不再跟你聊天了,它开始替你干活了
  • 3步搞定B站直播助手:新手主播的智能场控终极指南
  • 如何快速获取精准歌词?LDDC 跨平台歌词下载工具完整指南
  • TextShot快速入门:5分钟学会跨平台截图文字识别
  • Elog多平台支持对比:语雀、Notion、FlowUs、飞书哪个更适合你
  • 如何快速搭建家庭游戏串流服务器:Sunshine完整配置教程
  • Obsidian Full Calendar:在笔记中实现高效日程管理的完整指南
  • 瑞士ZuriQ研发新型彭宁离子阱处理器,大幅增强离子阱量子计算机计算能力
  • parse库自定义类型转换器开发指南:从简单函数到复杂模式匹配
  • Spark 安装与使用完全指南【保姆级教程】
  • 如何构建企业级无人机应用:DJI Android SDK V5架构设计与实战指南
  • 2026佛山搬家公司全攻略 大型工厂整体搬迁极简流程 - 从来都是英雄出少年
  • Navicat Premium Mac重置终极方案:3分钟恢复14天试用期
  • LLPlayer:终极语言学习视频播放器 - 用AI技术革新你的外语学习方式
  • 西安正规高三补习学校TOP5推荐:基于口碑与教学质量全解析 - 科技焦点
  • EditorConfig-Sublime高级技巧:Git集成与多项目配置管理终极指南