当前位置: 首页 > news >正文

详细介绍:用 Flink CDC 将 MySQL 实时同步到 StarRocks

1、前置条件与环境说明

  • 一台 Linux 或 macOS 电脑,已安装 Docker / docker-compose

  • 端口占用:

    • Flink Web UI 默认 8081
    • StarRocks FE:HTTP(8030)、MySQL 协议(9030)(个别 all-in-one 镜像会把 FE HTTP 暴露为 8080,见下方“端口对照”说明)
    • MySQL:3306

端口对照备注(重要)

  • StarRocks 官方默认 FE HTTP 端口是 8030
  • 若你使用的镜像将 FE HTTP 暴露为 8080,请将 load-url 与浏览器访问端口改为 8080
  • 下面示例以 8030 为主,同时在需要处提示如何替换为 8080

2、启动 Flink Standalone(开启 Checkpoint)

下载 Flink 1.20.1 并解压,进入目录:

cd flink-1.20.1

conf/flink-conf.yaml(注意不是 config.yaml)中开启周期性 Checkpoint(每 3 秒一次):

# conf/flink-conf.yaml
execution.checkpointing.interval: 3s

启动集群:

./bin/start-cluster.sh

访问 Flink Web UI:http://localhost:8081/。若需要更多 TaskManager,可重复执行 start-cluster.sh

3、用 Docker Compose 拉起 MySQL 与 StarRocks

创建 docker-compose.yml(建议服务名全部小写,避免兼容性问题):

version: '2.1'
services:
starrocks:
image: starrocks/allin1-ubuntu:3.2.6
ports:
- "8030:8030"   # 若你的镜像以 8080 暴露 FE HTTP,请改成 "8080:8080"
- "9030:9030"   # MySQL 协议端口
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw

启动容器:

docker-compose up -d
docker ps

验证 StarRocks:

  • 若使用 8030:浏览器打开 http://localhost:8030/
  • 若镜像是 8080:打开 http://localhost:8080/

debezium/example-mysql 镜像已为 CDC 配置好 Binlog(ROW),适合演示。

4、在 MySQL 造数(orders / shipments / products)

进入容器并创建库表与示例数据:

docker-compose exec mysql mysql -uroot -p123456

执行 SQL:

-- 1) 创建库
CREATE DATABASE app_db;
USE app_db;
-- 2) 订单表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
INSERT INTO `orders` (`id`, `price`) VALUES
(1, 4.00),
(2, 100.00);
-- 3) 物流表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
INSERT INTO `shipments` (`id`, `city`) VALUES
(1, 'beijing'),
(2, 'xian');
-- 4) 商品表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
INSERT INTO `products` (`id`, `product`) VALUES
(1, 'Beer'),
(2, 'Cap'),
(3, 'Peanut');

5、准备 Flink CDC CLI 与连接器

  1. 解压 flink-cdc-3.5.0-bin.tar.gz → 得到 flink-cdc-3.5.0/{bin, lib, log, conf}

  2. 将以下 CDC 管道连接器 JAR 拷入 Flink CDC 的 lib/(注意:不是 Flink Home 的 lib/):

    • MySQL pipeline connector 3.5.0
    • StarRocks pipeline connector 3.5.0
  3. MySQL Connector/J(JDBC)不再随 CDC 连接器打包,你需要:

    • mysql-connector-j-*.jar 放入 Flink(非 CDC)lib/ 目录,
    • 或在提交作业时通过 --jar 传入。

稳定版本 JAR 可直接下载,SNAPSHOT 需自行从源码构建。

6、编写并提交整库同步 YAML

新建 mysql-to-starrocks.yaml

################################################################################
# Description: Sync MySQL all tables to StarRocks
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db\.\*              # 正则:整库 app_db
server-id: 5400-5404            # 与现有复制/采集避免冲突
server-time-zone: UTC           # 显式时区避免时间字段偏移
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030        # 若镜像用 8080 暴露 FE HTTP,则改为 127.0.0.1:8080
username: root
password: ""
table.create.properties.replication_num: 1  # 演示单副本。生产建议 >= 3
# 可选:若希望主键模型以支持 upsert,可在建表属性里声明 primary_key
# table.create.properties.duplicate_key: false
# table.create.properties.primary_key: "id"
pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 2

提交(不同版本 CLI 写法略有差异,任选其一):

bash bin/flink-cdc.sh run -f mysql-to-starrocks.yaml
# 或
bash bin/flink-cdc.sh mysql-to-starrocks.yaml

输出示例:

Pipeline has been submitted to cluster.
Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
Job Description: Sync MySQL Database to StarRocks

在 Flink Web UI 可见名为 “Sync MySQL Database to StarRocks” 的作业运行中。

查看 StarRocks 数据:用数据库客户端(DBeaver / DataGrip / mysql CLI)连接 mysql://127.0.0.1:9030

7、在线验证:DML + DDL 的实时同步

进入 MySQL 容器:

docker-compose exec mysql mysql -uroot -p123456

逐步执行,观察 StarRocks 的 app_db.orders 变化:

-- 新增
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
-- Schema 演进:新增列
ALTER TABLE app_db.orders ADD amount VARCHAR(100) NULL;
-- 更新(含新列)
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
-- 删除
DELETE FROM app_db.orders WHERE id=2;

刷新客户端(或 SELECT * FROM app_db.orders),可见 StarRocks 实时更新。
同理修改 shipmentsproducts 也会同步变化。

8、路由与“分表并表”示例

Flink CDC 的 route 能把源端库表“改名/迁移”到目标端的其它库表名;也支持正则匹配把多张分表并入一张目标表。

8.1 逐表路由(跨库迁移/改名)

################################################################################
# Description: Sync MySQL all tables to StarRocks with Route
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db\.\*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: starrocks
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030    # 若你的镜像是 8080,请改为 127.0.0.1:8080
username: root
password: ""
table.create.properties.replication_num: 1
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 2

8.2 分表并表(正则聚合到单表)

route:
- source-table: app_db.order\..*   # 如 app_db.order01 / order02 / order03
sink-table: ods_db.ods_orders    # 并入同一目标表

⚠️ 现阶段 不支持“多个分表里存在相同主键”的数据并表(去重/冲突解决需在上游或 Transform 层处理;后续版本会增强)。

9、清理与回收

停止容器(在 docker-compose.yml 所在目录):

docker-compose down

停止 Flink 集群(在 flink-1.20.1 目录):

./bin/stop-cluster.sh

10、常见坑位与排障清单

  • 服务名大小写:Compose 服务名建议全小写(如 mysqlstarrocks),避免大小写导致的兼容问题;相应地 docker-compose exec mysql ...
  • FE HTTP 端口:默认是 8030;若你的 all-in-one 镜像暴露为 8080,请同步修改 load-url 与浏览器访问端口。
  • Flink 配置文件名:是 conf/flink-conf.yaml,不是 config.yaml
  • server-id 冲突:报错 “server-id in use” 时更换不冲突区间。
  • 时区错位:务必在 Source 指定 server-time-zone(如 UTC);否则 TIMESTAMP/DATETIME 可能偏移。
  • StarRocks 表模型:如需 Upsert 语义,建议使用 主键(PRIMARY KEY)模型,并确保有合理的主键;默认 Duplicate Key 也可运行,但与 Upsert 预期不同。
  • 连接器放置路径:CDC 连接器放在 Flink CDC Homelib/mysql-connector-j 放在 Flink Homelib/(或 --jar 传入)。
  • 整库白名单tables: app_db\.\* 会抓整库,生产建议白名单规则与命名规范,避免误采。
  • Exactly-Once 基线:合理设置 Checkpoint 间隔/超时;下游采用主键表 + 幂等/事务装载;重大变更前先做 Savepoint
http://www.jsqmd.com/news/298883/

相关文章:

  • 基数估计的黑魔法:HyperLogLog 原理与实现
  • IO模型有哪几种
  • 01-移植NXP官方的U-Boot
  • 让opencode+GLM-4.7+SKILL一起服务
  • CSS-选择器
  • 整理2026年淮南艺体高考培训学院排名,合肥东辰职业学校性价比高
  • 讲讲高质量铸造钢球特点,山东金池重工产品有哪些功能亮点?
  • 2026年十大靠谱的PVC塑胶地板供应商排名,新凯琳实力入围
  • 武汉德语培训公司哪家口碑好
  • 2026沐浴露专业品牌推荐,恋香花语精准护理产品值得拥有
  • 【第1章·第12节】MATLAB/C语言混合编程应用2——通过PSO粒子群算法实现网络节点最大覆盖率优化
  • 2025年国内知名的花灯加工厂排行榜单,庙会花灯/演绎花灯/马年花灯/大型花灯/传统花灯/春节国潮花灯,花灯定制厂家推荐
  • 2026.1.23 闲话:TopTree 维护仙人掌
  • C++趣味找错误,请找出这个C++程序到底有多少错误!
  • 市场评价高的流化床干燥机厂家,定制服务解析,干燥设备/干燥机/闪蒸干燥机/喷雾干燥机/废液干燥系统,干燥机批发厂家排行榜
  • 聊聊口碑好的电火花加工,汉霸数控为何受众多知名企业信赖?
  • 2025年成都火锅品牌排行,这10家回头客最多!美食/烧菜火锅/火锅/特色美食/社区火锅成都火锅品牌推荐榜单
  • 心灵的三重疆域:弗洛伊德意识三层结构理论解析
  • 43578344
  • 8768756
  • 靠谱的安全阀在线检测仪加工厂技术对比,江西选哪家好?
  • 2026年推荐一下优质的PVC运动地板制造商,新凯琳
  • 揭秘靠谱的金属带材电镀厂家排名,鼎亚电子上榜了吗?
  • SAM exSAM 学习笔记
  • Matlab去除CT扫描图像环形伪影的实现方法
  • 《把脉行业与技术趋势》-100-电动机——永不落幕的能源转换艺术
  • 麻省理工学院人工智能领域有影响力人物
  • 幽冥大陆(一百11)酒店智能门锁系统Larkdll接口函数——东方仙盟筑基期
  • 未来之窗昭和仙君(六十三)可编程子窗口操作功能—东方仙盟练气期
  • 基于开源AI大模型S2B2C商城系统的无人店铺售卖难点解决方案研究