本文记录了一个完整的端到端 CDC(Change Data Capture)数据管道的构建过程:从 PostgreSQL 数据库的变更捕获,到 Kafka 消息队列的实时传输,再到 Flink SQL 的流式处理,最终写入 Paimon 数据湖表。整个过程涵盖架构设计、Docker 容器编排、依赖问题排查、Flink SQL 编写与调试等核心环节。无论你是初次接触 CDC 技术,还是正在寻找生产级别的实时数据同步方案,这份实战指南都能提供从零到一的完整参考。
CDC概念和原理
CDC(Change Data Capture)是一种数据库设计模式,用于实时捕获和跟踪数据库中的数据变更(INSERT、UPDATE、DELETE),并将这些变更以事件流的形式传递给下游系统。
CDC 有三种主流实现方式:
- 基于查询的 CDC:定期执行
SELECT语句轮询数据库变更。优点是简单、无需特殊权限;缺点是延迟高、无法捕获中间变更(如同一条记录被多次 UPDATE)、性能开销大。 - 基于触发器的 CDC:在表上创建
INSERT/UPDATE/DELETE触发器,变更时实时推送。优点是实时性好;缺点是侵入性强、每条变更都触发额外逻辑、影响源库性能。 - 基于日志的 CDC:读取数据库的 WAL(PostgreSQL)或 Binlog(MySQL)。优点是低延迟、无侵入、完整的变更历史;缺点是需要数据库开启特殊配置(如
wal_level=logical)。
本方案采用基于日志的 CDC,通过 Debezium 读取 PostgreSQL 的 WAL。
PostgreSQL 逻辑复制原理
PostgreSQL 通过 WAL(Write-Ahead Logging) 实现数据持久化和复制。数据流向为Debezium 主动拉取,而非 PostgreSQL 推送
PostgreSQL 逻辑复制涉及四个核心组件:
- WAL(Write-Ahead Log):PostgreSQL 的预写日志。所有数据变更在写入数据文件之前,先写入 WAL,确保数据持久性。
- 复制槽(Replication Slot):记录消费者已读取的 WAL 位置。只要复制槽存在,PostgreSQL 就不会清理未消费的 WAL,保证数据不丢失。
- pgoutput 插件:PostgreSQL 内置的逻辑解码插件,将二进制 WAL 格式转换为结构化的变更事件。
- Debezium:作为主动消费者,通过 PostgreSQL 的流式复制协议持续拉取变更。
数据流有两种基本模式:
- 推送模式:数据库主动将变更推送给消费者。例如 PostgreSQL 的 LISTEN/NOTIFY、Webhook 等。优点是延迟低,缺点是背压控制困难——如果消费者处理速度跟不上,数据库可能需要缓存或丢弃数据。
- 拉取模式:消费者主动从数据库拉取变更。例如 Debezium、Flink CDC、AWS DMS 等。优点是消费者可以控制速率,支持精确一次语义。
注意:如果 Debezium 长时间停止,复制槽会保留 WAL,可能导致磁盘空间耗尽。需监控
pg_replication_slots。
pgsql的关键配置如下
wal_level = logical:启用逻辑复制,WAL 包含足够信息用于逻辑解码max_replication_slots:复制槽数量,每个 CDC 连接器需要一个max_wal_senders:WAL 发送进程数REPLICA IDENTITY FULL:表级别配置,UPDATE/DELETE 时记录完整旧行数据
Debezium 使用 PostgreSQL 的流式复制协议:
- 建立连接:Debezium 作为"伪 Standby"连接到 PostgreSQL
- 请求流式数据:使用
START_REPLICATION命令 - 持续拉取:PostgreSQL 保持连接开放,持续发送变更
- 确认消费:Debezium 定期发送
standby_status_update确认 LSN 位置
复制槽(Replication Slot)确保Debezium 未消费的 WAL 不会被删除,Debezium 重启后可以从断点继续
SELECT slot_name, slot_type, active, restart_lsn FROM pg_replication_slots;slot_name | slot_type | active | restart_lsn
------------------+-----------+--------+--------------debezium_slot | logical | t | 0/1A5B3C0
Kafka KRaft 模式
KRaft(Kafka Raft)是 Kafka 2.8+ 引入的新共识协议,用于替代 Zookeeper 进行集群元数据管理。
KRaft 模式引入了以下核心概念:
- Node ID:每个节点的唯一标识符,在集群中不能重复。
- Process Roles:节点承担的角色。可以是
broker(处理消息)、controller(管理元数据)或两者兼具。 - Controller:负责管理集群元数据,包括 Topic 创建、Partition 分配、ISR(In-Sync Replicas)管理等。
- Quorum Voters:参与 Raft 选举投票的节点列表,格式为
ID@host:port。 - Cluster ID:集群唯一标识符,必须是 Base64 编码的 UUID,用于区分不同集群。
单节点 KRaft 配置解析
KAFKA_NODE_ID: 1 # 节点 ID
KAFKA_PROCESS_ROLES: broker,controller # 同时承担 Broker 和 Controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:9093 # 投票者列表:ID@host:port
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" # 集群唯一 ID(Base64 编码)
Debezium CDC 原理
Debezium 基于 Kafka Connect 框架运行,通过 Connector 连接源数据库。
Debezium 发送的 CDC 消息采用 JSON 格式,包含 schema 和 payload 两部分:
{"schema": { ... }, // 字段类型定义(可选)"payload": {"before": { ... }, // 变更前的数据(UPDATE/DELETE 时有值)"after": { ... }, // 变更后的数据(INSERT/UPDATE 时有值)"source": { // 变更来源元数据"version": "2.7.3.Final","connector": "postgresql","name": "cdc", // topic.prefix"ts_ms": 1779612336252, // 变更时间戳(毫秒)"db": "cdc_demo","table": "orders","lsn": 27553184 // WAL 日志序列号},"op": "c", // 操作类型"ts_ms": 1779612336477 // Debezium 处理时间戳}
}
Debezium 消息中的 op 字段标识变更类型:
r(Read):快照读取,表示初始全量同步时读取的历史数据。before为 null,after包含完整数据。c(Create):插入操作。before为 null,after包含新插入的数据。u(Update):更新操作。before包含更新前的数据,after包含更新后的数据。d(Delete):删除操作。before包含被删除的数据,after为 null。
Apache Paimon
Apache Paimon(原 Flink Table Store)是 Apache 顶级项目,定位为流批一体的数据湖表格式。与 Iceberg、Hudi 不同,Paimon 从设计之初就考虑了流式写入场景。Paimon 提供两种表类型:
- Primary Key 表:支持 INSERT/UPDATE/DELETE,自动处理主键冲突。这是 CDC 场景的首选,Paimon 会根据主键自动进行 Upsert 操作,遇到相同主键的记录,UPDATE 操作会覆盖旧值,DELETE 操作会删除记录。
- Append Only 表:不支持更新和删除,只支持追加写入。适合日志类数据,写入吞吐量更高。
Paimon 的其他核心特性:
- 自动 Compaction:写入时自动合并小文件,无需手动维护 Optimize 作业。
- Snapshot 隔离:MVCC 机制,读写互不阻塞,支持 Time Travel(查询历史快照)。
- Schema Evolution:支持无停机变更表结构。
Paimon 和 Iceberg 都是数据湖表格式,但设计目标不同:
- CDC 支持:Paimon 的 Primary Key 表原生支持 UPDATE/DELETE,Iceberg 需要通过 MERGE INTO 语句或 Iceberg Actions 实现类似功能。
- 小文件处理:Paimon 在写入时自动 Compaction,Iceberg 需要手动触发 Optimize 作业。
- 写入延迟:Paimon 支持毫秒级延迟的流式写入,Iceberg 的 Commit 机制通常带来分钟级延迟。
- 流批一体:Paimon 从设计之初就支持流批一体,Iceberg 主要面向批处理场景。
Paimon 文件组织结构如下
s3://paimon-warehouse/
├── cdc/ # 数据库名
│ └── orders/ # 表名
│ ├── snapshot/ # 快照目录
│ │ ├── snapshot-1 # 快照元数据
│ │ └── snapshot-2
│ ├── manifest/ # 清单目录
│ │ ├── manifest-list-1 # 清单列表
│ │ └── manifest-1 # 数据文件清单
│ ├── data/ # 数据文件目录
│ │ ├── bucket-0/ # 分桶目录
│ │ │ ├── data-1.orc # 数据文件(ORC 格式)
│ │ │ └── data-2.orc
│ │ └── bucket-1/
│ └── schema/ # Schema 目录
│ └── schema-0
Primary Key 表写入机制
Paimon Primary Key 表的核心优势是自动 Upsert,写入时自动处理 INSERT/UPDATE/DELETE,无需用户编写去重逻辑。
当 Flink 向 Paimon Primary Key 表写入数据时:
- 每条记录根据主键哈希分配到固定的 bucket(由
bucket参数控制) - 同一个 bucket 内,相同主键的记录可能存在多个版本(存放在不同的 SST 文件中)
- 写入时不立即合并,而是追加写入,保持高吞吐
异步 Compaction 由 Paimon 内置的 Compaction Coordinator 实现。默认模式下Compaction 逻辑嵌入在 Flink Sink 算子中(生产推荐环境可以考虑独立 Compaction Job)。合并时,相同主键的多条记录只保留最新版本(基于写入时间戳),实现 Upsert 语义。
- Flink TaskManager 的线程负责执行 Compaction
- 与写入操作共享同一个 slot,但通过异步线程池隔离
- 由 Flink 的 checkpoint 机制触发 Compaction
Compaction 在以下条件触发:
- 文件数量触发:当 Level 0 文件数达到
compaction.min.file-num(默认 5),触发合并 - 文件大小触发:当 Level 0 总大小超过阈值,触发合并
- Checkpoint 触发:Flink checkpoint 完成后,触发 Compaction(确保数据一致性)
读取时,Paimon 会合并所有相关的 SST 文件,对相同主键的记录只返回最新版本。这保证了即使写入时未立即合并,查询结果始终正确。即使 Compaction 尚未完成,读取时也能正确获取最新数据。
Flink SQL Streaming
Flink 采用事件驱动架构,每条记录触发一次处理:
Flink 和 Spark Streaming 的核心区别在于处理模型:
- 处理模型:Flink 是事件驱动架构,每条记录到达后立即处理,延迟可达毫秒级。Spark Streaming 是微批处理架构,收集一批记录后再处理,延迟通常在秒级。
- 时间语义:Flink 支持事件时间(Event Time)、处理时间(Processing Time)、摄入时间(Ingestion Time)三种时间语义,可以正确处理乱序事件。Spark Streaming 主要使用处理时间。
- 状态管理:Flink 内置 State Backend,支持 Keyed State 和 Operator State,可以轻松实现有状态计算。Spark Streaming 需要借助外部存储(如 Redis、HBase)管理状态。
- Exactly-Once 语义:Flink 通过 Checkpoint 机制原生支持端到端的精确一次语义。Spark Streaming 需要 Enable Checkpoint 才能实现类似保证。
Exactly-Once 语义
Flink 通过 Checkpoint 实现 Exactly-Once 语义,Checkpoint Barrier 是特殊标记,随数据流流动:
- JobManager 触发 Checkpoint:向所有 Source 算子注入 Barrier
- Barrier 随数据流动:Barrier 像一条分隔线,将数据分为两个部分
- 算子收到 Barrier
- 将当前状态持久化(本地状态 + source offset)
- 向下游发送 Barrier
- 所有算子完成:Checkpoint 成功
每个算子的状态都会保存:
- Source:Kafka offset
- Map/FlatMap:聚合状态、累加器
- Sink:写入状态、事务 ID
但是仅 Checkpoint 不够,还需要 Sink 端配合才能实现端到端 Exactly-Once(Paimon Sink 实现了 TwoPhaseCommitSinkFunction)
环境配置
服务编排文件
version: '3.8'services:postgres:build:context: .dockerfile: docker/postgres/Dockerfilecontainer_name: cdc-postgresports:- "5433:5432"environment:POSTGRES_USER: ${POSTGRES_USER:-postgres}POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres}POSTGRES_DB: ${POSTGRES_DB:-cdc_demo}volumes:- ./init-scripts:/docker-entrypoint-initdb.dhealthcheck:test: ["CMD-SHELL", "pg_isready -U postgres"]interval: 10stimeout: 5sretries: 5start_period: 10skafka:image: confluentinc/cp-kafka:7.5.0container_name: cdc-kafkaports:- "9095:9092"- "29095:29092"environment:KAFKA_NODE_ID: 1KAFKA_PROCESS_ROLES: broker,controllerKAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cdc-kafka:9092,PLAINTEXT_HOST://localhost:29094KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:9093KAFKA_CONTROLLER_LISTENERS: CONTROLLER://0.0.0.0:9093CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"healthcheck:test: ["CMD", "bash", "-c", "kafka-broker-api-versions --bootstrap-server localhost:9092 | head -1"]interval: 10stimeout: 10sretries: 10start_period: 30sdebezium:image: debezium/connect:2.7container_name: cdc-debeziumdepends_on:kafka:condition: service_healthyports:- "8084:8083"environment:BOOTSTRAP_SERVERS: cdc-kafka:9092GROUP_ID: cdc-connect-clusterCONFIG_STORAGE_TOPIC: connect-configsOFFSET_STORAGE_TOPIC: connect-offsetsSTATUS_STORAGE_TOPIC: connect-statusCONFIG_STORAGE_REPLICATION_FACTOR: 1OFFSET_STORAGE_REPLICATION_FACTOR: 1STATUS_STORAGE_REPLICATION_FACTOR: 1healthcheck:test: ["CMD", "curl", "-f", "http://localhost:8083/"]interval: 15stimeout: 10sretries: 10start_period: 30sminio:image: minio/minio:latestcontainer_name: cdc-miniocommand: server /data --console-address ":9001"ports:- "9002:9000"- "9003:9001"environment:MINIO_ROOT_USER: minioadminMINIO_ROOT_PASSWORD: minioadminhealthcheck:test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]interval: 10stimeout: 5sretries: 5start_period: 10sminio-init:image: minio/mc:latestcontainer_name: cdc-minio-initdepends_on:minio:condition: service_healthyentrypoint: >/bin/sh -c "mc alias set myminio http://minio:9000 minioadmin minioadmin;mc mb myminio/paimon-warehouse --ignore-existing;exit 0;"jobmanager:build:context: .dockerfile: docker/flink/Dockerfileimage: cdc-flink-paimon:1.20container_name: cdc-flink-jobmanagercommand: jobmanagerports:- "8085:8081"- "6123:6123"environment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagerstate.backend.type: hashmapstate.checkpoints.dir: s3://paimon-warehouse/checkpointss3.endpoint: http://minio:9000s3.access.key: minioadmins3.secret.key: minioadmins3.path.style.access: truevolumes:- ./flink-sql-job:/opt/flink/usrlibdepends_on:kafka:condition: service_healthyminio:condition: service_healthytaskmanager:image: cdc-flink-paimon:1.20container_name: cdc-flink-taskmanagercommand: taskmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 2state.backend.type: hashmapstate.checkpoints.dir: s3://paimon-warehouse/checkpointss3.endpoint: http://minio:9000s3.access.key: minioadmins3.secret.key: minioadmins3.path.style.access: truevolumes:- ./flink-sql-job:/opt/flink/usrlibdepends_on:- jobmanager
关键配置
Kafka KRaft 配置
KAFKA_NODE_ID: 1 # 节点唯一标识,单节点为 1
KAFKA_PROCESS_ROLES: broker,controller # 角色:broker 处理消息,controller 管理元数据
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,... # 监听地址:0.0.0.0 表示接受所有网卡
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cdc-kafka:9092 # 广播地址:容器内使用容器名
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:9093 # 选举投票者:ID@host:port
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" # 集群 ID,必须是 Base64 编码的 UUID
Debezium 环境变量
BOOTSTRAP_SERVERS: cdc-kafka:9092 # Kafka 连接地址
GROUP_ID: cdc-connect-cluster # Connect 集群 ID
CONFIG_STORAGE_TOPIC: connect-configs # 存储Connector配置的Topic
OFFSET_STORAGE_TOPIC: connect-offsets # 存储消费偏移量的Topic
STATUS_STORAGE_TOPIC: connect-status # 存储Connector状态的Topic
Flink 环境变量
jobmanager.rpc.address: jobmanager # JobManager 地址
taskmanager.numberOfTaskSlots: 2 # TaskManager 槽位数
state.backend.type: hashmap # 状态后端类型
state.checkpoints.dir: s3://paimon-warehouse/checkpoints # Checkpoint 目录
s3.endpoint: http://minio:9000 # S3 端点(MinIO)
s3.path.style.access: true # 使用路径风格访问(MinIO 必需)
PostgreSQL Dockerfile
FROM postgres:15
RUN echo "wal_level = logical" >> /usr/share/postgresql/postgresql.conf.sample && \echo "max_replication_slots = 4" >> /usr/share/postgresql/postgresql.conf.sample && \echo "max_wal_senders = 4" >> /usr/share/postgresql/postgresql.conf.sample
Paimon 镜像构建
Dockerfile 完整内容
FROM flink:1.20-java17# ========================================
# 创建 S3 文件系统插件目录
# Flink 插件机制:每个插件必须在独立子目录中
# ========================================
RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \cd /opt/flink/plugins/s3-fs-hadoop && \curl -sL "https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.4/flink-s3-fs-hadoop-1.20.4.jar" -o flink-s3-fs-hadoop.jar# ========================================
# 下载 Paimon 和依赖到 lib 目录
# ========================================
RUN mkdir -p /opt/flink/lib && \cd /opt/flink/lib && \# Paimon 核心curl -sL "https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.20/1.0.0/paimon-flink-1.20-1.0.0.jar" -o paimon-flink.jar && \# Kafka SQL Connectorcurl -sL "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.20/flink-sql-connector-kafka-3.3.0-1.20.jar" -o flink-sql-connector-kafka.jar && \# Hadoop 客户端(包含 HdfsConfiguration)curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar" -o hadoop-client-api.jar && \curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar" -o hadoop-client-runtime.jar && \# commons-logging(Hadoop 依赖)curl -sL "https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar" -o commons-logging.jar && \# Paimon S3 支持curl -sL "https://repo1.maven.org/maven2/org/apache/paimon/paimon-s3/1.0.0/paimon-s3-1.0.0.jar" -o paimon-s3.jar# 验证下载
RUN ls -la /opt/flink/lib/*.jar && ls -la /opt/flink/plugins/s3-fs-hadoop/
Dockerfile 下载了以下 JAR 文件:
- paimon-flink.jar(48MB):Paimon 核心 API,提供 Catalog、Table 等核心类。
- paimon-s3.jar(32MB):Paimon S3 文件系统实现,支持访问 MinIO 和 AWS S3。
- hadoop-client-api.jar(19MB):Hadoop 客户端 API,包含
Configuration、HdfsConfiguration等核心类。 - hadoop-client-runtime.jar(30MB):Hadoop 运行时依赖,包含 Guava、Protobuf 等传递依赖。
- commons-logging.jar(60KB):日志门面,Hadoop 的必需依赖。
- flink-sql-connector-kafka.jar(5.6MB):Flink Kafka SQL Connector,支持读取 Debezium JSON 格式。
- flink-s3-fs-hadoop.jar:Flink S3 文件系统插件,必须放在
plugins/目录。
Flink 有两个加载 JAR 的目录,区别在于加载方式:
/opt/flink/lib/:存放通用依赖 JAR。Flink 使用系统类加载器加载这些 JAR,所有 Job 共享。适合放置 Paimon、Hadoop、Kafka Connector 等必需依赖。
/opt/flink/plugins/<name>/:存放可选插件。每个插件使用独立的类加载器加载,避免依赖冲突。适合放置 S3 文件系统等可选功能。S3 文件系统插件必须放在 plugins/ 目录,因为它是可选的文件系统实现。
问题和解决
本节记录在实现 Flink SQL + Paimon 过程中遇到的依赖问题,以及每个问题的详细分析和解决方案。理解这些问题的根本原因,有助于在未来快速定位类似错误。
缺少 Hadoop Configuration 类
执行 CREATE CATALOG paimon 时报错:
org.apache.flink.table.api.ValidationException: Unable to create catalog 'paimon'.
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurationat org.apache.paimon.catalog.CatalogContext.<init>(CatalogContext.java:53)at org.apache.paimon.flink.FlinkCatalogFactory.createCatalog(...)
Paimon 在初始化 Catalog 时需要读取 Hadoop 配置(如 S3 endpoint、access key 等),这依赖于 org.apache.hadoop.conf.Configuration 类。然而 Flink 官方镜像 flink:1.20-java17 是精简镜像,不包含任何 Hadoop 相关 JAR。
在 Dockerfile 中添加 Hadoop 客户端依赖:
curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar" -o hadoop-client-api.jar
curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar" -o hadoop-client-runtime.jar
HdfsConfiguration 类找不到
错误内容
java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfigurationat org.apache.paimon.catalog.CatalogContext.<init>(CatalogContext.java:53)
最初尝试下载 hadoop-common.jar,它应该包含 Hadoop 的通用类。但 HdfsConfiguration 类实际上不在 hadoop-common 中,而是属于 HDFS 模块。Paimon 需要这个类来检测和配置 HDFS/S3 文件系统。正确的做法是使用 hadoop-client-api 这个聚合 JAR,它包含了客户端所需的所有核心类。
使用 hadoop-client-api-3.3.4.jar 替代分散的 Hadoop JAR。这个 JAR 是一个聚合包,包含:
- Hadoop Common(通用工具类)
- Hadoop HDFS(HDFS 客户端)
- Hadoop MapReduce Client(MapReduce 客户端 API)
S3 协议不支持
错误日志
org.apache.paimon.fs.UnsupportedSchemeException: Could not find a file io implementation for scheme 's3' in the classpath.
FlinkFileIOLoader also cannot access this path.
Hadoop FileSystem also cannot access this path 's3://paimon-warehouse'.
即使配置了 warehouse = 's3://paimon-warehouse',Paimon 默认也不知道如何处理 S3 协议。Paimon 支持多种文件系统(本地文件系统、HDFS、S3 等),但 S3 支持需要单独的依赖包。
添加 Paimon S3 模块,这个 JAR 包含了 Paimon 对 S3 协议的实现,包括 AWS SDK 和 S3 客户端。
curl -sL "https://repo1.maven.org/maven2/org/apache/paimon/paimon-s3/1.0.0/paimon-s3-1.0.0.jar" -o paimon-s3.jar
MinIO Bucket 不存在
这个错误说明 Paimon 已经能够连接到 MinIO,但找不到指定的 bucket。与 AWS S3 不同,MinIO 不会自动创建 bucket必须显式创建。
com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket)
有两种方式创建 bucket:
手动创建(适合测试)
docker exec cdc-minio sh -c "mc alias set myminio http://localhost:9000 minioadmin minioadmin && mc mb myminio/paimon-warehouse"
自动创建(推荐用于生产)
# docker-compose.yml
minio-init:image: minio/mc:latestdepends_on:minio:condition: service_healthyentrypoint: >/bin/sh -c "mc alias set myminio http://minio:9000 minioadmin minioadmin;mc mb myminio/paimon-warehouse --ignore-existing;exit 0;"
Flink S3 文件系统插件未加载
Flink 需要 S3 文件系统插件,但插件必须放在正确位置。
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3'.
The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop.
Please ensure that each plugin resides within its own subfolder within the plugins directory.
Flink 的插件机制有一个重要规则:每个插件必须放在独立的子目录中。例如:
- 正确:
/opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop.jar - 错误:
/opt/flink/lib/flink-s3-fs-hadoop.jar
这是因为 Flink 使用独立的类加载器加载每个插件,避免依赖冲突。如果插件 JAR 直接放在 lib/ 目录,它会被系统类加载器加载,可能导致类冲突。
在 Dockerfile 中正确配置插件目录:
RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \cd /opt/flink/plugins/s3-fs-hadoop && \curl -sL "https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.4/flink-s3-fs-hadoop-1.20.4.jar" \-o flink-s3-fs-hadoop.jar
运行与验证
构建镜像并启动服务
# 进入项目目录
cd cdc-pipeline# 构建 Flink + Paimon 镜像
docker build -t cdc-flink-paimon:1.20 -f docker/flink/Dockerfile .# 启动所有服务
docker-compose up -d# 查看服务状态
docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"
预期输出
NAMES STATUS PORTS
cdc-postgres Up (healthy) 0.0.0.0:5433->5432/tcp
cdc-kafka Up (healthy) 0.0.0.0:9095->9092/tcp
cdc-debezium Up (healthy) 0.0.0.0:8084->8083/tcp
cdc-minio Up (healthy) 0.0.0.0:9002->9000/tcp
cdc-flink-jobmanager Up 0.0.0.0:8085->8081/tcp
cdc-flink-taskmanager Up 6123/tcp, 8081/tcp
验证 PostgreSQL 数据
docker exec cdc-postgres psql -U postgres -d cdc_demo -c "SELECT * FROM orders;"order_id | customer_id | product_name | quantity | amount | status
----------+-------------+---------------------+----------+---------+---------- 1 | 1001 | Wireless Headphones | 2 | 89.99 | pending 2 | 1002 | Mechanical Keyboard | 1 | 149.99 | shipped3 | 1003 | USB-C Hub | 3 | 45.50 | delivered
(3 rows)
注册 Debezium Connector
topic.prefix:Topic 前缀。最终 Topic 格式为<prefix>.<schema>.<table>,例如cdc.public.orders。plugin.name:PostgreSQL 逻辑复制插件。pgoutput是 PostgreSQL 原生插件(推荐),wal2json是第三方插件。slot.name:PostgreSQL 复制槽名称。Debezium 会自动创建这个复制槽。snapshot.mode:快照模式。initial表示首次全量同步后增量,never表示仅增量(不读取历史数据)。
curl -X POST http://localhost:8084/connectors \-H "Content-Type: application/json" \-d '{"name": "postgres-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.hostname": "postgres","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "cdc_demo","topic.prefix": "cdc","table.include.list": "public.orders","plugin.name": "pgoutput","slot.name": "debezium_slot","snapshot.mode": "initial"}}'
验证 Kafka CDC 消息
docker exec cdc-kafka kafka-console-consumer \--bootstrap-server cdc-kafka:9092 \--topic cdc.public.orders \--from-beginning --max-messages 1 --timeout-ms 5000
测试 Paimon Catalog
# 创建测试 SQL 文件
cat > flink-sql-job/test-paimon.sql << 'EOF'
CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 's3://paimon-warehouse/','s3.endpoint' = 'http://minio:9000','s3.access-key' = 'minioadmin','s3.secret-key' = 'minioadmin','s3.path.style.access' = 'true'
);
SHOW CATALOGS;
EOF# 执行 SQL
docker exec cdc-flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/flink/usrlib/test-paimon.sql
输出结果表明catalog注册
[INFO] Execute statement succeeded.+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| paimon |
+-----------------+
2 rows in set
Flink SQL CDC Job
以下是 flink-sql-job/cdc-to-paimon.sql 完整内容
-- Step 1: Create Paimon Catalog with MinIO (S3-compatible storage)
CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 's3://paimon-warehouse','s3.endpoint' = 'http://minio:9000','s3.access-key' = 'minioadmin','s3.secret-key' = 'minioadmin','s3.path.style.access' = 'true'
);USE CATALOG paimon;-- Step 2: Create database and table
CREATE DATABASE IF NOT EXISTS cdc;
USE cdc;-- Step 3: Create Paimon table with Primary Key for CDC support
CREATE TABLE IF NOT EXISTS orders (order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at TIMESTAMP,updated_at TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket' = '4','compaction.min.file-num' = '5','compaction.max.file-num' = '50','compaction.target-file-size' = '128 mb'
);-- Step 4: Create Kafka source table using Debezium JSON format
CREATE TEMPORARY TABLE kafka_cdc (op STRING,ts_ms BIGINT,`after` ROW<order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at BIGINT,updated_at BIGINT>
) WITH ('connector' = 'kafka','topic' = 'cdc.public.orders','properties.bootstrap.servers' = 'cdc-kafka:9092','properties.group.id' = 'flink-cdc-consumer','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json','debezium-json.schema-include' = 'true'
);-- Step 5: Insert CDC data into Paimon table
INSERT INTO orders
SELECT `after`.order_id,`after`.customer_id,`after`.product_name,`after`.quantity,`after`.amount,`after`.status,TO_TIMESTAMP_LTZ(`after`.created_at / 1000, 3),TO_TIMESTAMP_LTZ(`after`.updated_at / 1000, 3)
FROM kafka_cdc
WHERE op IN ('c', 'r', 'u');
创建 Paimon Catalog。这个语句创建一个 Paimon Catalog,用于管理 Paimon 表。
warehouse:Paimon 表的存储位置。使用 S3 协议访问 MinIO。s3.endpoint:MinIO 的 S3 API 端点。注意使用容器名minio而非localhost。s3.path.style.access:设为true表示使用路径风格 URL(http://minio:9000/bucket/key),而非虚拟主机风格(http://bucket.minio:9000/key)。MinIO 必须使用路径风格。
CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 's3://paimon-warehouse','s3.endpoint' = 'http://minio:9000','s3.access-key' = 'minioadmin','s3.secret-key' = 'minioadmin','s3.path.style.access' = 'true'
);
创建 Paimon Primary Key 表,Paimon Primary Key 表的关键配置:
-
PRIMARY KEY (order_id) NOT ENFORCED:定义主键。NOT ENFORFORCED表示 Flink 不强制执行主键约束(由 Paimon 处理)。Primary Key 表支持 INSERT、UPDATE、DELETE 操作,Paimon 会根据主键自动进行 Upsert。 -
bucket:分桶数。数据按主键哈希分配到不同桶中,影响并发写入能力。建议设为并发度的 2-4 倍。 -
compaction.min.file-num:触发 Compaction 的最小文件数。当文件数达到 5 个时,自动合并。 -
compaction.max.file-num:最大文件数阈值。超过 50 个文件时强制 Compaction。 -
compaction.target-file-size:Compaction 目标文件大小。合并后生成 128MB 的大文件。
CREATE TABLE IF NOT EXISTS orders (order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at TIMESTAMP,updated_at TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket' = '4','compaction.min.file-num' = '5','compaction.max.file-num' = '50','compaction.target-file-size' = '128 mb'
);
创建 Kafka Source 表。Kafka Source 表使用 Debezium JSON 格式解析 Kafka 消息:
connector:使用 Kafka 连接器。topic:Debezium 创建的 CDC Topic,格式为<prefix>.<schema>.<table>。properties.group.id:Kafka Consumer Group ID。Flink 通过这个 ID 管理 offset。scan.startup.mode:earliest-offset表示从最早的消息开始消费。其他选项:latest-offset(仅消费新消息)、specific-offsets(从指定 offset 开始)。format:debezium-json表示消息格式为 Debezium JSON。debezium-json.schema-include:true表示 Kafka 消息包含 schema 信息。
Schema 定义直接映射 Debezium JSON 的 payload 结构:
op:操作类型(c、r、u、d)ts_ms:Debezium 处理时间戳after:变更后的数据,使用ROW类型定义嵌套结构
CREATE TEMPORARY TABLE kafka_cdc (op STRING,ts_ms BIGINT,`after` ROW<order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at BIGINT,updated_at BIGINT>
) WITH ('connector' = 'kafka','topic' = 'cdc.public.orders','properties.bootstrap.servers' = 'cdc-kafka:9092','properties.group.id' = 'flink-cdc-consumer','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json','debezium-json.schema-include' = 'true'
);
INSERT INTO 语句。这个语句将从 Kafka 读取的 CDC 数据写入 Paimon 表:
-
op IN ('c', 'r', 'u'):过滤操作类型。c是插入,r是快照读取,u是更新。不处理d(删除)操作,因为演示场景不需要。 -
TO_TIMESTAMP_LTZ(value / 1000, 3):Debezium 的时间戳是微秒,需要除以 1000 转换为毫秒,然后用 3 位精度解析。 -
after.field:从after结构中提取字段值。
INSERT INTO orders
SELECT `after`.order_id,`after`.customer_id,`after`.product_name,`after`.quantity,`after`.amount,`after`.status,TO_TIMESTAMP_LTZ(`after`.created_at / 1000, 3),TO_TIMESTAMP_LTZ(`after`.updated_at / 1000, 3)
FROM kafka_cdc
WHERE op IN ('c', 'r', 'u');
如何提交 Flink SQL Job
有两种方式提交 Flink SQL Job:
使用 SQL Client
# 进入 Flink JobManager 容器
docker exec -it cdc-flink-jobmanager bash# 使用 SQL Client 执行 SQL 文件
./bin/sql-client.sh -f /opt/flink/usrlib/cdc-to-paimon.sql
使用 SQL Gateway
# 启动 SQL Gateway(需要额外配置)
./bin/sql-gateway.sh start# 通过 REST API 提交 SQL
curl -X POST http://localhost:8083/v1/sessions \-H "Content-Type: application/json" \-d '{"session_name": "cdc-job"}'
文件路径验证
MinIO Bucket 内容
# 列出 bucket
docker exec cdc-minio mc alias set myminio http://localhost:9000 minioadmin minioadmin
docker exec cdc-minio mc ls myminio/# 输出
# [2026-05-24 09:32:46 UTC] 0B paimon-warehouse/
Paimon 表文件结构
# 列出表目录
docker exec cdc-minio mc ls myminio/paimon-warehouse/cdc.db/orders/# 预期结构
# schema/ - Schema 定义文件
# snapshot/ - 快照文件(如果已插入数据)
# manifest/ - 清单文件(如果已插入数据)
# data/ - 数据文件(如果已插入数据)
Schema 文件内容
# 查看 schema 文件
docker exec cdc-minio mc cat myminio/paimon-warehouse/cdc.db/orders/schema/schema-0# 输出(JSON 格式的表结构)
{"id": 0,"fields": [{"id": 0, "name": "order_id", "type": "BIGINT"},{"id": 1, "name": "customer_id", "type": "BIGINT"},...],"primaryKeys": ["order_id"],...
}
