当前位置: 首页 > news >正文

从零到一:基于Canal-Adapter 1.1.7构建MySQL实时数据同步链路

1. 开篇:为什么你需要一条实时数据同步链路?

大家好,我是老张,在数据领域摸爬滚打了十几年。今天想和大家聊聊一个非常实际的问题:当你的业务数据在MySQL里“唰唰”地增删改查时,如何能让另一个数据库、或者你的缓存、搜索引擎,甚至是数据分析平台,也能几乎实时地看到这些变化?

想象一下这个场景:用户在你的电商App下单了,订单数据写入了主库。但负责生成报表的数据库、或者商品库存缓存,如果还停留在“每隔一小时同步一次”的古老模式,那库存可能不准,报表永远是“过去式”。这种数据延迟,在今天的业务环境下,往往是不可接受的。我们需要的是“实时”或“准实时”的数据流动。

这就是实时数据同步要解决的问题。而今天我们要上手的,就是阿里巴巴开源的一款经典工具——Canal。特别是它的Canal-Adapter 1.1.7版本,它是一个“适配器”,能帮我们把Canal抓取到的MySQL数据变更,轻松地同步到各种目标数据源,比如另一个MySQL、Elasticsearch、HBase等。

为什么选1.1.7?就像原始文章里提到的,这是个比较稳定且对JDK环境要求(JDK8)更友好的版本,很多现有项目还在用这个版本,兼容性好,踩坑的资料也多。咱们今天的目标,就是抛开复杂的理论,手把手带你从零开始,搭起一条从MySQL到MySQL的实时同步链路。你不需要是架构师,只要会基本的Linux命令和MySQL操作,跟着我做,一定能成。

2. 核心原理:Canal是如何“监听”MySQL的?

在动手之前,咱们花几分钟搞明白Canal是怎么工作的。理解了原理,后面配置时遇到问题你才知道该往哪个方向排查。

你可以把Canal想象成一个非常聪明的“间谍”。它想要获取MySQL数据库的所有数据变更记录。那MySQL会把谁当成自己人呢?答案是:从库(Slave)。MySQL的主从复制原理,就是主库(Master)把数据变更写入二进制日志(Binary Log),从库连接主库,拉取这些日志,然后在自己身上重放一遍,从而实现数据同步。

Canal就巧妙地扮演了这个“从库”的角色。它实现了MySQL的复制协议,伪装成一个MySQL从库,向主库发送一个“dump”请求,说:“大哥,我是你的从库,把binlog发给我吧。”主库一看,协议都对,就会源源不断地把binlog推送给Canal。

Canal拿到这些二进制的日志流(binlog)后,它本身并不直接重放。它的核心工作是解析。它会把这些原始的字节流,解析成我们容易理解的结构化数据,比如:在哪个数据库(schema)、哪张表(table)、发生了哪种操作(INSERT、UPDATE、DELETE)、变更前和变更后的数据分别是什么。

解析完之后,数据变更事件就产生了。这时候,Canal-Deployer(服务端)会把这些事件暂存起来。而我们的主角Canal-Adapter(适配器)就登场了。Adapter会作为一个客户端,从Deployer那里订阅这些变更事件,然后根据我们的配置,把这些事件转换成目标数据源能理解的语句(比如SQL),最终写入到目标库中。

整个过程,对主库的压力极小,因为它只是多了一个“从库”在拉取日志,这是一种非常优雅的增量数据订阅消费模式。总结一下关键角色:

  • MySQL:数据源头,需开启binlog。
  • Canal-Deployer:服务端,伪装从库,拉取并解析binlog。
  • Canal-Adapter:客户端,订阅解析后的事件,并同步到目标数据源。

3. 环境准备:兵马未动,粮草先行

好了,原理清楚了,咱们开始动手。首先得把基础环境准备好。我假设你已经在本地或者一台Linux服务器上安装好了MySQL,并且有操作权限。

3.1 第一步:检查并开启MySQL的Binlog

这是整个流程的基石,如果MySQL没开binlog,后面的一切都白搭。咱们登录MySQL命令行来检查。

-- 查看binlog是否开启,ON表示已开启 SHOW VARIABLES LIKE 'log_bin'; -- 查看当前的binlog文件名称和位置(Position),这个信息后面配置Canal会用到 SHOW MASTER STATUS; -- 查看binlog的格式,**必须为ROW**,Canal依赖ROW格式的详细数据 SHOW VARIABLES LIKE 'binlog_format';

如果log_bin的值是OFF,或者binlog_format不是ROW,那就需要修改MySQL的配置文件,通常是my.cnf(Linux)或my.ini(Windows)。找到[mysqld]段落,添加或修改如下配置:

[mysqld] # 启用二进制日志,并指定基础名称 log-bin=mysql-bin # 设置binlog格式为ROW,这是最关键的一步 binlog-format=ROW # 设置服务器ID,这在主从复制环境中必须唯一,单机可随意设一个正数 server-id=1 # 可选:指定binlog过期时间,避免磁盘被占满 expire_logs_days=7

修改保存后,重启MySQL服务。再次执行上面的SQL命令,确认log_binONbinlog_formatROW

3.2 第二步:为Canal创建专属数据库用户

Canal需要连接MySQL来拉取binlog,我们不应该直接使用root账号。最好创建一个专属用户,并授予必要的权限。

-- 创建一个用户,用户名canal,密码你自己定一个复杂的,这里示例用 Canal@123456 CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456'; -- 授予必要的权限。`SELECT`用于读取表信息,`REPLICATION SLAVE, REPLICATION CLIENT`是复制所必需的。 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- 刷新权限,使授权立即生效 FLUSH PRIVILEGES;

这里解释一下权限:

  • SELECT:Canal在启动时需要获取数据库和表的元数据信息。
  • REPLICATION SLAVE:这是最核心的权限,允许Canal以从库身份连接主库,读取binlog。
  • REPLICATION CLIENT:允许使用SHOW MASTER STATUS,SHOW SLAVE STATUS等命令。

用户创建好后,可以用mysql -u canal -p测试一下是否能正常登录。

3.3 第三步:下载Canal组件

我们需要三个核心组件,去Canal的GitHub Release页面下载1.1.7版本:

  1. canal.deployer-1.1.7.tar.gz:服务端,负责binlog解析。
  2. canal.adapter-1.1.7.tar.gz:适配器,负责数据同步。
  3. canal.admin-1.1.7.tar.gz:管理端(Web界面),可选,但有了它管理和监控会更直观。咱们按“全套”来配置。

下载后,分别解压到三个独立的目录,比如/opt/canal/deployer/opt/canal/adapter/opt/canal/admin。记住这些路径,后面配置要用。

4. 配置与启动Canal-Deployer(服务端)

服务端是数据管道的“水泵”,它从MySQL抽水(binlog)。我们先来搞定它。

4.1 基础配置:canal.properties

进入canal.deployerconf目录,你会看到一堆配置文件。核心是canal.properties,它定义了服务端的全局参数。咱们先关注几个必须修改的项。

用文本编辑器打开canal.properties,找到并修改以下关键配置:

# canal server的运行端口,客户端(如Adapter)会连接这个端口。默认11111,一般不用改。 canal.port = 11111 # 服务模式,我们通过TCP连接,所以是tcp。如果要用Kafka等消息队列,可以改。 canal.serverMode = tcp # canal实例(instance)的列表,多个用逗号隔开。我们先配一个叫‘example’的实例。 canal.destinations = example # 如果你也安装了canal-admin,需要配置admin的连接信息,这样deployer能自动注册到admin canal.admin.manager = 127.0.0.1:8089 # canal-admin的地址和端口 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # 这是admin密码123456的密文

这里有个坑点:canal.admin.passwd填的不是明文密码,而是密文。这个密文在哪里找呢?如果你启动了canal-admin并初始化了数据库(后面会讲),密文存储在canal_manager数据库的canal_user表里。你可以先启动admin,从数据库里查出来,再填到这里。或者,如果你暂时不用admin,可以把这三行注释掉。

4.2 实例配置:instance.properties

canal.destinations = example指定了一个名为example的实例。那么,这个实例具体监听哪个MySQL、哪个库表,就在conf/example/目录下的instance.properties文件里配置。

打开conf/example/instance.properties,这是配置的重中之重:

# 主库的地址和端口 canal.instance.master.address = 127.0.0.1:3306 # 连接主库的账号密码,就是我们之前创建的canal用户 canal.instance.dbUsername = canal canal.instance.dbPassword = Canal@123456 # 字符集,保持UTF-8 canal.instance.connectionCharset = UTF-8 # 指定从哪个binlog文件开始拉取。如果不指定,Canal会从当前最新的binlog开始。 # canal.instance.master.journal.name = mysql-bin.000001 # 指定从binlog的哪个位置开始。如果不指定,就从文件开头。 # canal.instance.master.position = 154 # !!!非常重要的过滤规则:监听哪些库、哪些表 # .*\\..* 表示监听所有库的所有表。生产环境请务必按需配置,减少不必要的同步。 canal.instance.filter.regex = .*\\..* # 黑名单,匹配的表不同步 canal.instance.filter.black.regex =

关于filter.regex的规则,我多解释几句:

  • 库名\\.表名,点号需要转义。
  • test\\..*:同步test库下的所有表。
  • test\\.user:只同步test库下的user表。
  • .*\\..*:同步所有库所有表(慎用)。
  • 多个规则用逗号分隔:test\\..*, order\\.order_detail

强烈建议:在测试时,可以先配置成.*\\..*确保能跑通。但在生产环境,一定要精确配置到你真正需要同步的库和表,这对性能和资源管理至关重要。

4.3 启动与验证

配置保存后,就可以启动服务端了。进入bin目录。

  • Linux:sh startup.sh
  • Windows: 双击startup.bat

查看日志是最直接的验证方式:

tail -f ../logs/canal/canal.log # 查看主日志,关注是否有ERROR tail -f ../logs/example/example.log # 查看example实例的日志

如果看到类似com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......以及com.alibaba.otter.canal.instance.core.AbstractCanalInstance - start successful....的日志,说明服务端启动成功,并且已经连接上MySQL了。

5. 配置与启动Canal-Admin(管理端,可选但推荐)

Admin提供了一个Web界面,可以方便地管理多个Canal-Server和Instance,不用每次都去改配置文件。咱们把它也装上。

5.1 初始化数据库

Admin需要用一个数据库来存储配置信息。我们在MySQL里创建一个数据库,比如叫canal_manager,然后执行Admin包conf目录下的canal_manager.sql脚本,创建所需的表。

CREATE DATABASE `canal_manager` DEFAULT CHARACTER SET utf8; USE canal_manager; -- 然后执行 canal_manager.sql 文件中的SQL语句

5.2 修改Admin配置

打开canal.adminconf/application.yml文件,修改数据库连接信息:

spring: datasource: address: 127.0.0.1:3306 database: canal_manager username: root # 你的MySQL用户名 password: your_password # 你的MySQL密码 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

下面的canal.adminUsercanal.adminPasswd是登录Web控制台的账号密码,默认admin/123456,可以不改。

5.3 启动与访问

进入bin目录启动Admin。

  • Linux:sh startup.sh
  • Windows: 双击startup.bat

启动成功后,打开浏览器访问http://你的服务器IP:8089,用admin/123456登录。如果能看到管理界面,说明Admin启动成功。

关键一步:回到Canal-Deployer的canal.properties,确保canal.admin.managercanal.admin.usercanal.admin.passwd配置正确且Deployer在运行。稍等片刻,刷新Admin页面,在“Server管理”或“Instance管理”中,你应该能看到自动注册上来的example实例。如果没有,可以尝试在Admin界面手动创建并配置一个Instance,内容就和我们在instance.properties里配的一样。

6. 核心实战:配置Canal-Adapter(适配器)

前面Deployer已经把数据变更“抓”出来了,现在需要Adapter来“送”到目的地。我们以同步到另一个MySQL(目标库)为例。

6.1 基础配置:application.yml

进入canal.adapterconf目录,打开application.yml。这个文件是Adapter的总控配置。

server: port: 8081 # Adapter自身的服务端口 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 canal.conf: mode: tcp # 消费模式,我们Deployer是tcp模式,这里对应填tcp flatMessage: true # 建议true,消息格式更简单 syncBatchSize: 1000 # 每次同步的批量大小 retries: 10 # 失败重试次数 timeout: # 超时时间 srcDataSources: # !!!源数据源定义,这里定义的是“数据来源”的库,其实就是我们的业务主库 defaultDS: # 这是一个数据源Key,可以自定义,后面会引用 url: jdbc:mysql://127.0.0.1:3306/source_db?useUnicode=true&characterEncoding=UTF-8&useSSL=false username: root password: your_source_db_password canalAdapters: - instance: example # !!!对应Deployer中配置的destination名称,必须一致 groups: - groupId: g1 # 分组ID,可以自定义 outerAdapters: - name: rdb # 适配器类型,rdb表示关系型数据库 key: mysql1 # 这个目标数据源的Key,自定义 properties: jdbc.driverClassName: com.mysql.jdbc.Driver # !!!这是目标库的连接信息 jdbc.url: jdbc:mysql://127.0.0.1:3306/target_db?useUnicode=true&characterEncoding=UTF-8&useSSL=false jdbc.username: root jdbc.password: your_target_db_password

这里有两个url配置,新手特别容易混淆:

  1. srcDataSources.defaultDS.url:这是源库,也就是Canal-Deployer正在监听的那个MySQL业务库。Adapter需要连接它来执行一些数据初始化操作(ETL)。
  2. outerAdapters.properties.jdbc.url:这是目标库,你要把数据同步过去的那个库。

6.2 表映射配置:mytest_user.yml

Adapter支持为不同的同步任务创建独立的配置文件。我们在conf/rdb/目录下创建一个配置文件,例如my_product_sync.yml。文件名可以自定义,但必须在application.yml中通过key来引用(我们上面配的key: mysql1)。

conf/rdb/my_product_sync.yml内容如下:

dataSourceKey: defaultDS # 引用application.yml中定义的源数据源 destination: example # 对应Deployer的instance和application.yml中的instance groupId: g1 # 对应application.yml中的groupId outerAdapterKey: mysql1 # 对应application.yml中outerAdapters的key concurrent: true # 是否开启并发同步,提升性能 dbMapping: database: source_db # 源数据库名 table: product # 源表名 targetTable: target_db.product # 目标库名.目标表名(如果目标表名相同,可只写表名) targetPk: # 目标表的主键映射,如果主键名一致可省略 id: id mapAll: true # 为true时,所有字段自动映射(要求源表和目标表结构一致)。为false时,需用targetColumns手动指定 # targetColumns: # 当mapAll为false时,需手动配置字段映射 # id: id # name: product_name # price: price # etlCondition: "where create_time >='{}'" # 可选的ETL初始化条件 commitBatch: 3000 # 批量提交大小

重要提示mapAll: true非常方便,但前提是源表和目标表的字段名、类型必须完全一致,或者至少兼容。如果不一致,就需要设置mapAll: false,然后在targetColumns下一个个字段进行映射。etlCondition用于在首次启动时,进行历史数据的全量同步(ETL),{}是一个占位符,Adapter会处理。

6.3 启动Adapter并验证

保存所有配置,进入canal.adapterbin目录启动。

  • Linux:sh startup.sh
  • Windows: 双击startup.bat

查看Adapter日志:

tail -f ../logs/adapter/adapter.log

看到com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService - ## the canal adapter adapters are started now ......com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader - Load adapter: rdb succeed等字样,说明启动成功,并且加载了我们配置的rdb适配器。

7. 全链路测试与排坑指南

激动人心的时刻到了!我们来测试整个链路是否畅通。

  1. 准备表:确保源库(source_db)和目标库(target_db)都有product表,且结构一致。
  2. 在源库操作:用MySQL客户端或任何工具,对source_db.product表执行INSERT、UPDATE、DELETE操作。
  3. 观察日志
    • Deployer日志(logs/example/example.log):会看到解析出的SQL细节,例如DML: ....
    • Adapter日志(logs/adapter/adapter.log):会看到同步执行的SQL,例如SQL: INSERT INTO target_db.product ...,以及耗时:xx ms
  4. 检查目标库:立刻查询target_db.product表,数据应该已经同步过来了。

常见踩坑点

  • 防火墙/端口:确保11111(Deployer)、8081(Adapter)、8089(Admin)端口在服务器上已开放。
  • 权限问题:反复检查canal数据库用户的REPLICATION SLAVE权限,以及Adapter配置中源库和目标库连接用户的增删改查权限。
  • Binlog格式:必须是ROWSTATEMENTMIXED格式Canal无法正确解析数据变更。
  • 表名大小写:Linux下MySQL默认表名区分大小写,确保配置中的库名、表名大小写与实际完全一致。
  • Adapter连接失败:检查application.ymlcanal.conf.mode是否为tcp,以及canal.tcp.server.host是否正确指向了Deployer的地址和端口(11111)。
  • 数据不同步但无报错:首先检查instance.properties中的canal.instance.filter.regex是否包含了你要同步的表。然后检查Adapter的my_product_sync.yml中的databasetable配置是否正确。

8. 进阶与展望:不止于MySQL到MySQL

成功实现了MySQL到MySQL的同步,你已经掌握了Canal-Adapter最核心的用法。但这只是开始,Canal-Adapter的强大之处在于其“适配器”架构,可以轻松切换同步目标。

  • 同步到Elasticsearch:只需将application.ymlouterAdaptersname改为es,并配置ES的连接信息和索引映射文件。这样,数据库的变更就能实时反映到搜索引擎中,实现搜索结果的即时更新。
  • 同步到Kafka/RocketMQ:如果你需要将数据变更作为事件流发布出去,供多个下游系统消费,可以将Deployer的canal.serverMode改为kafkarocketMQ,并配置对应的MQ信息。这样,Canal解析出的数据会直接发到消息队列。
  • 同步到HBase/StarRocks等:社区或企业版提供了更多适配器,你可以将数据实时同步到大数据平台进行分析。
  • 集群与高可用:生产环境通常需要部署多个Canal-Server实例,并通过ZooKeeper进行协调,实现高可用。同时,Adapter也可以部署多个实例,通过groupId进行分组,实现负载均衡。

配置这些东西的细节更多,但核心思路和我们今天搭建的这条基础链路是一样的:理解数据流向(MySQL -> Deployer -> Adapter -> Target),然后正确配置每一环的连接和映射关系。

整个过程走下来,你可能觉得配置文件有点多,但每一步都有它的道理。一旦跑通,这种基于数据库日志的、对业务无侵入的实时同步方案,会为你的系统架构带来巨大的灵活性。我最早在项目里用Canal是为了解决跨业务系统的数据孤岛问题,当看到数据几乎无延迟地流动起来时,那种感觉真的很棒。希望这篇超详细的指南能帮你少走弯路,顺利搭起自己的数据同步管道。如果在操作中遇到问题,多看看日志,那里面藏着绝大部分答案。

http://www.jsqmd.com/news/456470/

相关文章:

  • Xinference-v1.17.1作品集:用开源模型生成高质量文案、代码、对话案例
  • Instagram:视觉叙事、滤镜魔法与「视觉图谱」的终极套利
  • 丹青幻境在电商设计中的应用:汉服店铺主图、节气海报、包装插画生成案例
  • iOSDeviceSupport:跨版本兼容与开发效率优化路径
  • Youtu-Parsing应用场景:学术论文截图解析,快速提取公式图表信息
  • Java国密SM2签名验签实战:从Bouncy Castle配置到在线工具验证
  • 立创开源磁带随身听DIY全攻略:基于LAG668与CXA1552M的杜比降噪与自动倒带实现
  • GLM-4.7-Flash入门指南:Ollama部署及基础对话测试
  • Keil5调试实战:如何通过map文件精准分析栈空间占用(附内存初始化技巧)
  • 春联生成模型操作系统兼容性测试:Windows与Linux部署对比
  • iOSDeviceSupport:解决Xcode设备兼容性问题的全版本方案
  • 百度飞桨OCR(PP-OCRv4_server_det|PP-OCRv4_server_rec_doc)在Java企业级文档处理中的实战应用
  • 如何突破B站评论采集限制?智能爬虫工具让全量数据获取效率提升300%
  • STM32H7总线架构与时钟系统深度解析
  • OpenCV调试版报错:libraryLoad失败?别慌,这可能是正常现象
  • 基于BERT的中文智能客服系统效率优化实战:从模型压缩到推理加速
  • 【ELRS实战】从开箱到首飞:遥控器与接收机快速配置全攻略
  • Qwen3与Git工作流结合:AI辅助代码审查与文档生成
  • 老笔记本起死回生指南:手把手教你用20元硅脂拯救自动关机故障
  • Windows平台CosyVoice开发入门指南:从环境搭建到第一个语音应用
  • 告别繁琐操作:这款轻量级Android管理工具让应用管理效率提升300%
  • Spring Boot Maven插件版本号避坑指南:为什么你的pom.xml总是爆红?
  • Fun-ASR-MLT-Nano语音识别模型识别准确率实测:93%的惊喜
  • 突破PT下载效率瓶颈:PT助手Plus的革新性工作流指南
  • 突破实时语音壁垒:多GPU部署与负载均衡策略全解析
  • 如何用WebAssembly技术实现音频自由:突破加密音乐格式限制的完整指南
  • 5个步骤教你实现极米投影仪智能家居设备集成
  • 突破传统!3步实现宝可梦数据自动化合法性验证
  • Nunchaku-FLUX.1-dev镜像免配置优势:预装Gradio1.0+Diffusers0.32+torch2.7
  • 图图的嗨丝造相-Z-Image-Turbo保姆级教程:Xinference日志分析定位启动失败原因