当前位置: 首页 > news >正文

SeaTunnel 数据采集实战指南(K8S Docker)

SeaTunnel 数据采集实战指南

概述

本文档提供了一个完整的 SeaTunnel(V2.3.8) 数据采集部署和使用指南,适用于 MongoDB 和 MySQL(RDS)的数据同步场景。通过本文,您将学会如何搭建一个自动化的数据采集系统,实现每日定时的数据同步任务。
其他版本大同小异,实际数据同步配置文档说明以官方文档为准

一、环境准备

1.1 系统要求

环境类型要求
操作系统Linux (x86_64 / ARM64)
Docker20.10+
Kubernetes1.20+(可选)
SeaTunnel2.3.8

1.2 目录结构

seatunnel/ ├── bin/ │ ├── mongo_start.sh# MongoDB任务启动脚本│ ├── rds_start.sh# RDS任务启动脚本│ ├── mongoshell-linux-amd64 │ └── mongoshell-linux-arm64 ├── config/ │ ├── mongo_dynamic.template# MongoDB动态任务配置│ ├── mongo_static.template# MongoDB静态任务配置│ └── rds.template# RDS任务配置├── seatunnel_mongo.yaml# K8s MongoDB部署文件└── seatunnel_rds.yaml# K8s RDS部署文件

二、配置文件详解

2.1 配置模板机制

配置模板使用$(变量名)作为占位符,启动脚本运行时会替换为实际环境变量值。同步脚本语言类型:Hocon

支持的占位符:

占位符说明
$(MONGODB_URI)MongoDB连接字符串
$(MONGODB_DATABASE)MongoDB数据库名称
$(RDS_URI)MySQL JDBC连接前缀
$(RDS_USERNAME)MySQL用户名
$(RDS_PASSWORD)MySQL密码

2.2 MongoDB静态任务配置

用于同步固定的MongoDB集合:

env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "skyladder_flowline_logs" result_table_name = "skyladder_flowline_logs" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["skyladder_flowline_logs"] result_table_name = "sub_skyladder_flowline_logs" query = "select projectId as project_id, _id as _id from skyladder_flowline_logs;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_skyladder_flowline_logs"] generate_sink_sql = true database = "metric" table = "metric.sub_skyladder_flowline_logs" primary_keys = ["_id"] } }

2.3 MongoDB动态任务配置

支持按项目ID动态遍历多个集合:

env { parallelism = 1 job.mode = "BATCH" } source { MongoDB { uri = "$(MONGODB_URI)" database = "$(MONGODB_DATABASE)" collection = "3e9d762f34d944c782876ef07723e3ac.npm_allItemData" result_table_name = "npm_allItemData" schema = { columns = [ { name = "_id", type = STRING, nullable = true }, { name = "projectId", type = STRING, nullable = true } ] } } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { jdbc { user = "$(RDS_USERNAME)" driver = "com.mysql.cj.jdbc.Driver" url = "$(RDS_URI)/metric?useSSL=false&characterEncoding=utf-8" password = "$(RDS_PASSWORD)" source_table_name = ["sub_kb_workbench"] generate_sink_sql = true database = "metric" table = "metric.sub_kb_workbench" primary_keys = ["_id"] } }

2.4 RDS任务配置

env { parallelism = 1 job.mode = "BATCH" } source { Jdbc { "result_table_name"=pms_unit_info table_path="portal.pms_unit_info" url="$(RDS_URI)/portal?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" driver = "com.mysql.cj.jdbc.Driver" user="$(RDS_USERNAME)" password="$(RDS_PASSWORD)" } } transform { sql { source_table_name = ["npm_allItemData"] result_table_name = "sub_kb_workbench" query = "select _id as _id, projectId as project_id from npm_allItemData;" } } sink { Jdbc { "source_table_name"=["pms_unit_info"] "generate_sink_sql"=true database="metric" table="a_pms_unit_info" user="$(RDS_USERNAME)" driver="com.mysql.cj.jdbc.Driver" url= "$(RDS_URI)/metric?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true" password="$(RDS_PASSWORD)" schema_save_mode=IGNORE data_save_mode=CUSTOM_PROCESSING custom_sql="truncate table a_pms_unit_info" } }

三、启动脚本编写

3.1 MongoDB启动脚本

#!/bin/bash:"${ARCH:?Error:ARCH not set,use 'x86' or 'arm'}":"${MONGODB_URI:?Error:MONGODB_URI not set}":"${MONGODB_DATABASE:?Error:MONGODB_DATABASE not set}":"${RDS_URI:?Error:RDS_URI not set}":"${RDS_USERNAME:?Error:RDS_USERNAME not set}":"${RDS_PASSWORD:?Error:RDS_PASSWORD not set}"home="/sea"case"$ARCH"inx86)mongoshell="${home}/bin/mongoshell-linux-amd64";;arm)mongoshell="${home}/bin/mongoshell-linux-arm64";;*)echo"Error: Unsupported ARCH:$ARCH";exit1;;esac\cp"${home}/config/mongo_dynamic.template""${home}/config/mongo_dynamic.conf"\cp"${home}/config/mongo_static.template""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_static.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_static.conf"sed-i"s#\$(MONGODB_URI)#${MONGODB_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g""${home}/config/mongo_dynamic.conf"sed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g""${home}/config/mongo_dynamic.conf"mkdir-p"${home}/logs"whiletrue;donow=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)[-z"$tomorrow"]&&tomorrow=$((now-now%86400+86400))sleep_seconds=$((tomorrow-now))echo"Starting daily task:$(date)"/opt/seatunnel/bin/seatunnel.sh--config"${home}/config/mongo_static.conf"-elocal>>"${home}/logs/mongo_static-$(date+%Y%m%d).log"2>&1echo"Waiting$sleep_secondsseconds for next run..."sleep"$sleep_seconds"done

3.2 RDS启动脚本

#!/bin/bash:"${RDS_URI:?错误:环境变量 RDS_URI 未设置}":"${RDS_USERNAME:?错误:环境变量 RDS_USERNAME 未设置}":"${RDS_PASSWORD:?错误:环境变量 RDS_PASSWORD 未设置}"# 定义其他路径(使用环境变量)# 根目录home="/sea"rds_config_file="${home}/config/rds.conf"log_dir="${home}/logs"\cp"${home}/config/rds.template""${home}/config/rds.conf"sed-i"s#\$(RDS_URI)#${RDS_URI}#g"$rds_config_filesed-i"s#\$(RDS_USERNAME)#${RDS_USERNAME}#g"$rds_config_filesed-i"s#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g"$rds_config_file# 确保日志目录存在mkdir-p"$log_dir"# 注意:原脚本中的 chmod +x /config/* 可能路径错误,已修正为 ${home}/config/*chmod+x${home}/config/*2>/dev/null# 无限循环,每天0点执行一次任务whiletrue;do# 计算距离下一个0点的秒数now=$(date+%s)tomorrow=$(date-d"tomorrow 00:00:00"+%s2>/dev/null||date-v+1d-v0H-v0M-v0S+%s2>/dev/null)if[-z"$tomorrow"];thenseconds_today=$((now%86400))sleep_seconds=$((86400-seconds_today))elsesleep_seconds=$((tomorrow-now))fiecho"开始执行每日任务:$(date)"# 使用当天日期作为日志文件名(按天分割)today=$(date+%Y%m%d)static_log="${log_dir}/rds-${today}.log"# 将本次执行的开始时间记录到日志(追加)echo"===== 开始执行任务:$(date)=====">>"$static_log"# 执行一次rds任务echo"执行配置文件任务:$rds_config_file"# 使用追加模式 >> 将 seatunnel 输出写入当天日志文件/opt/seatunnel/bin/seatunnel.sh--config"$rds_config_file"-elocal>>"$static_log"2>&1echo"每日任务完成:$(date)"echo"当前时间:$(date),等待$sleep_seconds秒后到达下一个0点..."sleep"$sleep_seconds"done

四、Docker部署

4.1 准备目录

mkdir-p/opt/data/seatunnel/{bin,config}

4.2 启动RDS任务

dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/rds_start.sh

4.3 启动MongoDB任务

dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eARCH="x86"\-eMONGODB_URI="mongodb://user:pass@mongo-host:27017"\-eMONGODB_DATABASE="dbname"\-eRDS_URI="jdbc:mysql://mysql-host:3306"\-eRDS_USERNAME="user"\-eRDS_PASSWORD="password"\apache/seatunnel:2.3.8\sh/sea/bin/mongo_start.sh

五、Kubernetes部署

5.1 Deployment示例-MongoDB

apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-mongonamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-mongotemplate:metadata:labels:app:seatunnel-mongospec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:ARCHvalue:"x86"-name:MONGODB_URIvalue:"mongodb://user:pass@mongo-host:27017"-name:MONGODB_DATABASEvalue:"dbname"-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/mongo_start.sh && bash /sea/bin/mongo_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea

5.1 Deployment示例-RDS

apiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-rdsnamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-rdstemplate:metadata:labels:app:seatunnel-rdsspec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:RDS_URIvalue:"jdbc:mysql://mysql-host:3306"-name:RDS_USERNAMEvalue:"user"-name:RDS_PASSWORDvalue:"password"command:-/bin/sh--c-|chmod +x /sea/bin/rds_start.sh && bash /sea/bin/rds_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea

六、常见问题

问题解决方案
脚本换行符错误sed -i 's/\r$//' script.sh
连接失败检查网络和认证信息
占位符未替换确认环境变量正确传递
mongo客户端可替换为本地客户端

七、参考链接

  • SeaTunnel官方文档
  • SeaTunnel GitHub
http://www.jsqmd.com/news/1020626/

相关文章:

  • 模板驱动的文档操作系统:自动化排版原理与实战
  • 算法设计中的贪心思想与其边界条件分析的技术
  • DSPy:从提示工程到程序编译的大模型开发范式迁移
  • Sqribble:面向专业文档生产的模板化操作系统
  • 从Cursor迁移后的vibe coding体验:聊聊几款平替工具的中文适配差异
  • Windows系统文件XAPOFX1_4.dll文件丢失找不到问题解决
  • 2026全业务链条断层破解:智能体如何重构端到端业务闭环
  • 3D模型格式转换终极指南:如何轻松实现STL到STEP的专业转换
  • 多维聚合实战:从SQL CUBE到Pandas透视的工程化方法
  • 如何快速掌握Klipper 3D打印机固件:从入门到精通的完整指南
  • 混合嵌入式间断伽辽金法求解相场晶体方程
  • 离散渗流与水平线树:统计物理中的连通性与相关性分析
  • CV项目工程化工具箱:轻量级可嵌入函数解决数据标注评估部署痛点
  • 天梯赛团体设计L2-013 红色警报
  • 3分钟免费教程:让通达信变身智能缠论分析系统
  • 白银高口碑黄金铂金回收白银回收实体老店排行 5 家靠谱门店电话地址全收录
  • E-Hentai Viewer完全指南:iOS上最佳的E-Hentai阅读器终极教程
  • 阿里云云解析DNS从零到一:从基础解析到智能调度与安全防护的完全指南
  • 如何免费解锁完整Office功能:Ohook终极激活指南
  • 重庆音响改装门店,6月给你的音响来一次完美蜕变,宝马音响改装/豪车音响改装/问界音响改装,音响改装门店找哪家 - 音响改装门店分享
  • 深入解析RPM包管理系统:从核心原理到实战运维
  • 终极英雄联盟助手:7大自动化功能提升你的游戏体验
  • 《给阿嬷的情书》电影迅雷BT完整下载[HD-1080P/3.67GB/MKV字]百度4k云高清资源分享
  • 【2027最新】基于SpringBoot+Vue的html网上团购系统管理系统源码+MyBatis+MySQL
  • 机器学习中的导数实战:一阶与二阶测试诊断模型行为
  • t分布原理与Python实战:小样本统计推断核心指南
  • 为什么越来越多开发者开始放弃直连 API?
  • 2026年豆皮生产机械行业深度分析:品牌格局、技术趋势与采购指南 - 优质品牌商家
  • 墒情监测站:低功耗模式带你进入新的灌溉时代
  • 终极Unity游戏视觉恢复指南:6款免费插件完全解锁游戏画面