告别脚本!用Apache SeaTunnel搞定MySQL多表同步的三种实战场景(附完整配置文件)
用Apache SeaTunnel实现MySQL多表同步的三大实战场景解析
每次面对复杂的多表数据同步需求时,你是否还在为编写繁琐的ETL脚本而头疼?Apache SeaTunnel的声明式配置方式正在改变这一现状。作为一款高效的数据集成工具,它通过简洁的配置文件即可实现从简单到复杂的多表同步场景,大幅提升数据工程师的工作效率。
1. 用户数据分表归档实战
电商平台的用户表通常包含数千万条记录,直接查询性能堪忧。按照注册年份分表存储是常见优化手段,但传统脚本需要手动处理分表逻辑。SeaTunnel的SQL transform插件让这件事变得简单。
假设原始用户表结构如下:
CREATE TABLE `t_user` ( `id` bigint PRIMARY KEY, `username` varchar(50), `reg_year` int COMMENT '注册年份', `gender` varchar(10), `last_login` datetime );我们需要按注册年份归档到不同子表,以下是完整的配置文件:
env { execution.parallelism = 4 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://prod-db:3306/ecommerce" driver = "com.mysql.cj.jdbc.Driver" user = "etl_user" password = "secure_password" query = "SELECT * FROM t_user WHERE reg_year BETWEEN 2018 AND 2023" result_table_name = "source_users" } } transform { Sql { source_table_name = "source_users" result_table_name = "users_2018" query = "SELECT * FROM source_users WHERE reg_year = 2018" } Sql { source_table_name = "source_users" result_table_name = "users_2019" query = "SELECT * FROM source_users WHERE reg_year = 2019" } # 2020-2023年类似配置省略... } sink { Jdbc { url = "jdbc:mysql://archive-db:3306/user_archive" driver = "com.mysql.cj.jdbc.Driver" user = "etl_user" password = "secure_password" source_table_name = "users_2018" query = "INSERT INTO t_user_2018 VALUES(?,?,?,?,?)" } Jdbc { url = "jdbc:mysql://archive-db:3306/user_archive" driver = "com.mysql.cj.jdbc.Driver" user = "etl_user" password = "secure_password" source_table_name = "users_2019" query = "INSERT INTO t_user_2019 VALUES(?,?,?,?,?)" } # 其他年份表sink配置省略... }关键参数解析:
execution.parallelism:设置合理的并行度可显著提升归档速度result_table_name:每个transform阶段需要指定结果表名供后续使用query参数中的WHERE条件是实现分表逻辑的核心
实际部署时建议将不同年份的配置拆分为多个独立job,便于单独调度和重试
2. 多源数据合并报表场景
零售企业常有多个业务系统的订单数据需要合并分析。传统方式需要分别抽取再合并,而SeaTunnel可以在一个流程中完成。
假设有以下两个系统的订单表:
ERP系统订单表
CREATE TABLE erp_orders ( order_id varchar(32), customer_id int, order_amount decimal(12,2), order_date date, payment_type varchar(20) );电商系统订单表
CREATE TABLE ec_orders ( id varchar(32), user_id int, total_price decimal(12,2), created_at datetime, pay_method varchar(20) );合并到数据仓库的配置示例:
env { job.mode = "BATCH" job.name = "order_consolidation" } source { Jdbc { url = "jdbc:mysql://erp-db:3306/erp_system" driver = "com.mysql.cj.jdbc.Driver" user = "erp_reader" password = "erp_pwd" query = "SELECT order_id, customer_id, order_amount, order_date, payment_type FROM erp_orders" result_table_name = "erp_orders" } Jdbc { url = "jdbc:mysql://ec-db:3306/ecommerce" driver = "com.mysql.cj.jdbc.Driver" user = "ec_reader" password = "ec_pwd" query = "SELECT id, user_id, total_price, created_at, pay_method FROM ec_orders" result_table_name = "ec_orders" } } transform { Sql { source_table_name = "erp_orders" result_table_name = "erp_standard" query = """ SELECT order_id AS unified_id, customer_id AS user_id, order_amount AS amount, order_date AS order_time, payment_type AS payment_method, 'ERP' AS source_system FROM erp_orders """ } Sql { source_table_name = "ec_orders" result_table_name = "ec_standard" query = """ SELECT id AS unified_id, user_id, total_price AS amount, created_at AS order_time, pay_method AS payment_method, 'E-Commerce' AS source_system FROM ec_orders """ } Sql { source_table_name = ["erp_standard", "ec_standard"] result_table_name = "combined_orders" query = """ SELECT * FROM erp_standard UNION ALL SELECT * FROM ec_standard """ } } sink { Jdbc { url = "jdbc:mysql://dwh:3306/data_warehouse" driver = "com.mysql.cj.jdbc.Driver" user = "dwh_writer" password = "dwh_pwd" source_table_name = "combined_orders" query = """ INSERT INTO consolidated_orders (order_id, user_id, amount, order_time, payment_method, source_system) VALUES (?,?,?,?,?,?) """ } }技术要点:
- 多source配置:每个数据源独立配置连接信息
- 字段标准化:通过SQL转换统一不同系统的字段命名
- 合并操作:使用UNION ALL合并数据集
- 数据溯源:添加source_system字段标识数据来源
3. 设备数据分类存储方案
IoT场景下,不同类型的设备数据往往需要存储到不同的分析表中。以下是将传感器数据按类型分类存储的典型配置。
设备原始数据表结构:
CREATE TABLE raw_device_data ( device_id varchar(32), device_type varchar(20), metrics json, timestamp datetime );目标表包括温度传感器表、湿度传感器表等。SeaTunnel配置如下:
env { execution.parallelism = 8 job.mode = "STREAMING" checkpoint.interval = 60000 } source { Jdbc { url = "jdbc:mysql://iot-db:3306/iot_platform" driver = "com.mysql.cj.jdbc.Driver" user = "iot_reader" password = "iot_pwd" query = "SELECT device_id, device_type, metrics, timestamp FROM raw_device_data WHERE timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)" result_table_name = "raw_devices" } } transform { Sql { source_table_name = "raw_devices" result_table_name = "temp_sensors" query = """ SELECT device_id, JSON_EXTRACT(metrics, '$.temperature') AS temp_value, JSON_EXTRACT(metrics, '$.unit') AS temp_unit, timestamp FROM raw_devices WHERE device_type = 'temperature' """ } Sql { source_table_name = "raw_devices" result_table_name = "humidity_sensors" query = """ SELECT device_id, JSON_EXTRACT(metrics, '$.humidity') AS humidity_value, timestamp FROM raw_devices WHERE device_type = 'humidity' """ } } sink { Jdbc { url = "jdbc:mysql://analytics-db:3306/iot_analytics" driver = "com.mysql.cj.jdbc.Driver" user = "analytics_writer" password = "analytics_pwd" source_table_name = "temp_sensors" query = """ INSERT INTO temperature_readings (device_id, temp_value, temp_unit, read_time) VALUES (?,?,?,?) """ } Jdbc { url = "jdbc:mysql://analytics-db:3306/iot_analytics" driver = "com.mysql.cj.jdbc.Driver" user = "analytics_writer" password = "analytics_pwd" source_table_name = "humidity_sensors" query = """ INSERT INTO humidity_readings (device_id, humidity_value, read_time) VALUES (?,?,?) """ } }流式处理关键点:
job.mode = "STREAMING":启用流式处理模式checkpoint.interval:设置检查点间隔保证故障恢复- WHERE条件过滤最近数据:避免全表扫描
- JSON字段提取:使用JSON_EXTRACT函数处理metrics字段
4. 高级配置与性能优化
当处理海量数据时,合理的配置对性能至关重要。以下是经过实战验证的优化方案:
连接池配置示例:
source { Jdbc { url = "jdbc:mysql://db-host:3306/prod_db" driver = "com.mysql.cj.jdbc.Driver" user = "user" password = "password" query = "SELECT * FROM large_table" # 连接池参数 connection_check_timeout_sec = 30 connection_max_active = 10 connection_max_idle = 5 connection_min_idle = 2 } }批量写入优化参数对比:
| 参数名 | 默认值 | 推荐值 | 作用 |
|---|---|---|---|
| batch_size | 100 | 1000-5000 | 每次批量写入的记录数 |
| batch_interval_ms | 1000 | 200 | 批量写入间隔(ms) |
| max_retries | 3 | 5 | 写入失败重试次数 |
| retry_backoff_multiplier_ms | 100 | 200 | 重试间隔倍数 |
并行度设置经验公式:
理想并行度 = min(源表分区数, 目标数据库连接池大小, CPU核心数×2)常见错误处理:
- 字符集问题:在JDBC URL中添加
characterEncoding=utf8 - 时区不一致:添加
serverTimezone=Asia/Shanghai - 大事务超时:调整
wait_timeout和interactive_timeout - 内存不足:增加
job.memory并优化execution.parallelism
生产环境建议配合监控系统使用,Prometheus配置示例:
metrics.reporters = "prometheus" metrics.reporter.prometheus.class = "org.apache.seatunnel.metrics.prometheus.PrometheusReporter" metrics.reporter.prometheus.port = "9091"经过多个项目的实践验证,这些配置能够将同步性能提升3-5倍。特别是在处理千万级数据时,合理的批量参数和并行度设置可以避免数据库连接被打满。
