当前位置: 首页 > news >正文

使用Debezium读取CDC事件并通过Flink任务写入Paimon表来构建实时数据管道的实践

本文记录了一个完整的端到端 CDC(Change Data Capture)数据管道的构建过程:从 PostgreSQL 数据库的变更捕获,到 Kafka 消息队列的实时传输,再到 Flink SQL 的流式处理,最终写入 Paimon 数据湖表。整个过程涵盖架构设计、Docker 容器编排、依赖问题排查、Flink SQL 编写与调试等核心环节。无论你是初次接触 CDC 技术,还是正在寻找生产级别的实时数据同步方案,这份实战指南都能提供从零到一的完整参考。

graph LRsubgraph 源端PG["PostgreSQL 15<br/>WAL (logical)"]endsubgraph 采集层DBZ["Debezium Connect 2.7<br/>pgoutput 插件"]KFK["Kafka 7.5 (KRaft)<br/>topic: cdc.public.orders"]endsubgraph 处理层FLINK["Flink 1.20<br/>SQL CDC Job"]endsubgraph 存储层MINIO["MinIO (S3)"]PAIMON["Paimon 表<br/>Primary Key"]endPG -->|"WAL 逻辑复制"| DBZDBZ -->|"CDC JSON"| KFKKFK -->|"消费"| FLINKFLINK -->|"Upsert"| PAIMONPAIMON -->|"ORC/Parquet"| MINIO

CDC概念和原理

CDC(Change Data Capture)是一种数据库设计模式,用于实时捕获和跟踪数据库中的数据变更(INSERT、UPDATE、DELETE),并将这些变更以事件流的形式传递给下游系统。

CDC 有三种主流实现方式:

  • 基于查询的 CDC:定期执行 SELECT 语句轮询数据库变更。优点是简单、无需特殊权限;缺点是延迟高、无法捕获中间变更(如同一条记录被多次 UPDATE)、性能开销大。
  • 基于触发器的 CDC:在表上创建 INSERT/UPDATE/DELETE 触发器,变更时实时推送。优点是实时性好;缺点是侵入性强、每条变更都触发额外逻辑、影响源库性能。
  • 基于日志的 CDC:读取数据库的 WAL(PostgreSQL)或 Binlog(MySQL)。优点是低延迟、无侵入、完整的变更历史;缺点是需要数据库开启特殊配置(如 wal_level=logical)。

本方案采用基于日志的 CDC,通过 Debezium 读取 PostgreSQL 的 WAL。

PostgreSQL 逻辑复制原理

PostgreSQL 通过 WAL(Write-Ahead Logging) 实现数据持久化和复制。数据流向为Debezium 主动拉取,而非 PostgreSQL 推送

sequenceDiagramparticipant App as 应用程序participant PG as PostgreSQLparticipant WAL as WAL 日志participant Slot as 复制槽participant DBZ as DebeziumApp->>PG: INSERT/UPDATE/DELETEPG->>WAL: 写入变更记录(二进制)Note over WAL: WAL 是顺序写入的日志文件PG->>Slot: 更新复制槽位置loop Debezium 轮询DBZ->>PG: 发送 replication 协议请求PG->>WAL: 从复制槽位置读取PG->>DBZ: 返回变更数据流DBZ->>DBZ: 解析并转换为 JSONDBZ->>Kafka: 发送 CDC 事件end

PostgreSQL 逻辑复制涉及四个核心组件:

  • WAL(Write-Ahead Log):PostgreSQL 的预写日志。所有数据变更在写入数据文件之前,先写入 WAL,确保数据持久性。
  • 复制槽(Replication Slot):记录消费者已读取的 WAL 位置。只要复制槽存在,PostgreSQL 就不会清理未消费的 WAL,保证数据不丢失。
  • pgoutput 插件:PostgreSQL 内置的逻辑解码插件,将二进制 WAL 格式转换为结构化的变更事件。
  • Debezium:作为主动消费者,通过 PostgreSQL 的流式复制协议持续拉取变更。

数据流有两种基本模式:

  • 推送模式:数据库主动将变更推送给消费者。例如 PostgreSQL 的 LISTEN/NOTIFY、Webhook 等。优点是延迟低,缺点是背压控制困难——如果消费者处理速度跟不上,数据库可能需要缓存或丢弃数据。
  • 拉取模式:消费者主动从数据库拉取变更。例如 Debezium、Flink CDC、AWS DMS 等。优点是消费者可以控制速率,支持精确一次语义。

注意:如果 Debezium 长时间停止,复制槽会保留 WAL,可能导致磁盘空间耗尽。需监控 pg_replication_slots

pgsql的关键配置如下

  • wal_level = logical:启用逻辑复制,WAL 包含足够信息用于逻辑解码
  • max_replication_slots:复制槽数量,每个 CDC 连接器需要一个
  • max_wal_senders:WAL 发送进程数
  • REPLICA IDENTITY FULL:表级别配置,UPDATE/DELETE 时记录完整旧行数据

Debezium 使用 PostgreSQL 的流式复制协议

  1. 建立连接:Debezium 作为"伪 Standby"连接到 PostgreSQL
  2. 请求流式数据:使用 START_REPLICATION 命令
  3. 持续拉取:PostgreSQL 保持连接开放,持续发送变更
  4. 确认消费:Debezium 定期发送 standby_status_update 确认 LSN 位置

复制槽(Replication Slot)确保Debezium 未消费的 WAL 不会被删除,Debezium 重启后可以从断点继续

SELECT slot_name, slot_type, active, restart_lsn FROM pg_replication_slots;slot_name     | slot_type | active | restart_lsn
------------------+-----------+--------+--------------debezium_slot    | logical   | t      | 0/1A5B3C0

Kafka KRaft 模式

KRaft(Kafka Raft)是 Kafka 2.8+ 引入的新共识协议,用于替代 Zookeeper 进行集群元数据管理。

graph TBsubgraph "传统模式 (Zookeeper)"ZK[Zookeeper 集群]K1[Kafka Broker]K2[Kafka Broker]K3[Kafka Broker]K1 --> ZKK2 --> ZKK3 --> ZKendsubgraph "KRaft 模式"C1[Controller<br/>+ Broker]C2[Broker]C3[Broker]C1 -.->|Raft 协议| C2C2 -.-> C3C3 -.-> C1end

KRaft 模式引入了以下核心概念:

  • Node ID:每个节点的唯一标识符,在集群中不能重复。
  • Process Roles:节点承担的角色。可以是 broker(处理消息)、controller(管理元数据)或两者兼具。
  • Controller:负责管理集群元数据,包括 Topic 创建、Partition 分配、ISR(In-Sync Replicas)管理等。
  • Quorum Voters:参与 Raft 选举投票的节点列表,格式为 ID@host:port
  • Cluster ID:集群唯一标识符,必须是 Base64 编码的 UUID,用于区分不同集群。

单节点 KRaft 配置解析

KAFKA_NODE_ID: 1                                    # 节点 ID
KAFKA_PROCESS_ROLES: broker,controller              # 同时承担 Broker 和 Controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:9093    # 投票者列表:ID@host:port
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"                # 集群唯一 ID(Base64 编码)

Debezium CDC 原理

Debezium 基于 Kafka Connect 框架运行,通过 Connector 连接源数据库。

graph LRsubgraph 源数据库PG[(PostgreSQL)]endsubgraph Debezium ConnectC[Connector] --> T[Transformation]T --> H[Heartbeat]endsubgraph KafkaTOP[cdc.public.orders Topic]endPG -->|WAL| CH --> TOP

Debezium 发送的 CDC 消息采用 JSON 格式,包含 schemapayload 两部分:

{"schema": { ... },              // 字段类型定义(可选)"payload": {"before": { ... },            // 变更前的数据(UPDATE/DELETE 时有值)"after": { ... },             // 变更后的数据(INSERT/UPDATE 时有值)"source": {                   // 变更来源元数据"version": "2.7.3.Final","connector": "postgresql","name": "cdc",              // topic.prefix"ts_ms": 1779612336252,     // 变更时间戳(毫秒)"db": "cdc_demo","table": "orders","lsn": 27553184             // WAL 日志序列号},"op": "c",                    // 操作类型"ts_ms": 1779612336477        // Debezium 处理时间戳}
}

Debezium 消息中的 op 字段标识变更类型:

  • r(Read):快照读取,表示初始全量同步时读取的历史数据。before 为 null,after 包含完整数据。
  • c(Create):插入操作。before 为 null,after 包含新插入的数据。
  • u(Update):更新操作。before 包含更新前的数据,after 包含更新后的数据。
  • d(Delete):删除操作。before 包含被删除的数据,after 为 null。

Apache Paimon

Apache Paimon(原 Flink Table Store)是 Apache 顶级项目,定位为流批一体的数据湖表格式。与 Iceberg、Hudi 不同,Paimon 从设计之初就考虑了流式写入场景。Paimon 提供两种表类型:

  • Primary Key 表:支持 INSERT/UPDATE/DELETE,自动处理主键冲突。这是 CDC 场景的首选,Paimon 会根据主键自动进行 Upsert 操作,遇到相同主键的记录,UPDATE 操作会覆盖旧值,DELETE 操作会删除记录。
  • Append Only 表:不支持更新和删除,只支持追加写入。适合日志类数据,写入吞吐量更高。

Paimon 的其他核心特性:

  • 自动 Compaction:写入时自动合并小文件,无需手动维护 Optimize 作业。
  • Snapshot 隔离:MVCC 机制,读写互不阻塞,支持 Time Travel(查询历史快照)。
  • Schema Evolution:支持无停机变更表结构。

Paimon 和 Iceberg 都是数据湖表格式,但设计目标不同:

  • CDC 支持:Paimon 的 Primary Key 表原生支持 UPDATE/DELETE,Iceberg 需要通过 MERGE INTO 语句或 Iceberg Actions 实现类似功能。
  • 小文件处理:Paimon 在写入时自动 Compaction,Iceberg 需要手动触发 Optimize 作业。
  • 写入延迟:Paimon 支持毫秒级延迟的流式写入,Iceberg 的 Commit 机制通常带来分钟级延迟。
  • 流批一体:Paimon 从设计之初就支持流批一体,Iceberg 主要面向批处理场景。

Paimon 文件组织结构如下

s3://paimon-warehouse/
├── cdc/                              # 数据库名
│   └── orders/                       # 表名
│       ├── snapshot/                 # 快照目录
│       │   ├── snapshot-1            # 快照元数据
│       │   └── snapshot-2
│       ├── manifest/                 # 清单目录
│       │   ├── manifest-list-1       # 清单列表
│       │   └── manifest-1            # 数据文件清单
│       ├── data/                     # 数据文件目录
│       │   ├── bucket-0/             # 分桶目录
│       │   │   ├── data-1.orc        # 数据文件(ORC 格式)
│       │   │   └── data-2.orc
│       │   └── bucket-1/
│       └── schema/                   # Schema 目录
│           └── schema-0

Primary Key 表写入机制

Paimon Primary Key 表的核心优势是自动 Upsert,写入时自动处理 INSERT/UPDATE/DELETE,无需用户编写去重逻辑。

当 Flink 向 Paimon Primary Key 表写入数据时:

  • 每条记录根据主键哈希分配到固定的 bucket(由 bucket 参数控制)
  • 同一个 bucket 内,相同主键的记录可能存在多个版本(存放在不同的 SST 文件中)
  • 写入时不立即合并,而是追加写入,保持高吞吐

异步 Compaction 由 Paimon 内置的 Compaction Coordinator 实现。默认模式下Compaction 逻辑嵌入在 Flink Sink 算子中(生产推荐环境可以考虑独立 Compaction Job)。合并时,相同主键的多条记录只保留最新版本(基于写入时间戳),实现 Upsert 语义。

  • Flink TaskManager 的线程负责执行 Compaction
  • 与写入操作共享同一个 slot,但通过异步线程池隔离
  • 由 Flink 的 checkpoint 机制触发 Compaction

Compaction 在以下条件触发:

  • 文件数量触发:当 Level 0 文件数达到 compaction.min.file-num(默认 5),触发合并
  • 文件大小触发:当 Level 0 总大小超过阈值,触发合并
  • Checkpoint 触发:Flink checkpoint 完成后,触发 Compaction(确保数据一致性)

读取时,Paimon 会合并所有相关的 SST 文件,对相同主键的记录只返回最新版本。这保证了即使写入时未立即合并,查询结果始终正确。即使 Compaction 尚未完成,读取时也能正确获取最新数据。

sequenceDiagramparticipant Flink as Flink SQLparticipant Paimon as Paimon Tableparticipant MemTable as MemTableparticipant LSM as LSM Treeparticipant S3 as MinIO/S3Flink->>Paimon: INSERT (id=1, ...)Paimon->>MemTable: 写入内存Note over MemTable: 内存中按主键排序<br/>支持去重和更新MemTable->>LSM: 内存满,刷写到磁盘Note over LSM: LSM-Tree 结构<br/>Level 0 → Level 1 → Level 2LSM->>S3: 异步 Compaction 合并文件Note over S3: 最终生成 128MB 大文件

Flink 采用事件驱动架构,每条记录触发一次处理:

graph LRS[Source] -->|记录1| OP1[Operator]OP1 -->|记录1| OP2[Operator]OP2 -->|记录1| SINK[Sink]S -.->|记录2| OP1OP1 -.->|记录2| OP2OP2 -.->|记录2| SINK

Flink 和 Spark Streaming 的核心区别在于处理模型:

  • 处理模型:Flink 是事件驱动架构,每条记录到达后立即处理,延迟可达毫秒级。Spark Streaming 是微批处理架构,收集一批记录后再处理,延迟通常在秒级。
  • 时间语义:Flink 支持事件时间(Event Time)、处理时间(Processing Time)、摄入时间(Ingestion Time)三种时间语义,可以正确处理乱序事件。Spark Streaming 主要使用处理时间。
  • 状态管理:Flink 内置 State Backend,支持 Keyed State 和 Operator State,可以轻松实现有状态计算。Spark Streaming 需要借助外部存储(如 Redis、HBase)管理状态。
  • Exactly-Once 语义:Flink 通过 Checkpoint 机制原生支持端到端的精确一次语义。Spark Streaming 需要 Enable Checkpoint 才能实现类似保证。

Exactly-Once 语义

Flink 通过 Checkpoint 实现 Exactly-Once 语义,Checkpoint Barrier 是特殊标记,随数据流流动:

  1. JobManager 触发 Checkpoint:向所有 Source 算子注入 Barrier
  2. Barrier 随数据流动:Barrier 像一条分隔线,将数据分为两个部分
  3. 算子收到 Barrier
    • 将当前状态持久化(本地状态 + source offset)
    • 向下游发送 Barrier
  4. 所有算子完成:Checkpoint 成功

每个算子的状态都会保存:

  • Source:Kafka offset
  • Map/FlatMap:聚合状态、累加器
  • Sink:写入状态、事务 ID

但是仅 Checkpoint 不够,还需要 Sink 端配合才能实现端到端 Exactly-Once(Paimon Sink 实现了 TwoPhaseCommitSinkFunction

sequenceDiagramparticipant JM as JobManagerparticipant TM as TaskManagerparticipant Source as Kafka Sourceparticipant Sink as Paimon SinkJM->>TM: 触发 CheckpointTM->>Source: Barrier 注入Source->>Source: 暂停处理,保存 offsetSource->>TM: Barrier 传递TM->>Sink: Barrier 传递Sink->>Sink: 刷写数据,保存状态Sink->>TM: 确认完成TM->>JM: Checkpoint 完成

环境配置

服务编排文件

version: '3.8'services:postgres:build:context: .dockerfile: docker/postgres/Dockerfilecontainer_name: cdc-postgresports:- "5433:5432"environment:POSTGRES_USER: ${POSTGRES_USER:-postgres}POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres}POSTGRES_DB: ${POSTGRES_DB:-cdc_demo}volumes:- ./init-scripts:/docker-entrypoint-initdb.dhealthcheck:test: ["CMD-SHELL", "pg_isready -U postgres"]interval: 10stimeout: 5sretries: 5start_period: 10skafka:image: confluentinc/cp-kafka:7.5.0container_name: cdc-kafkaports:- "9095:9092"- "29095:29092"environment:KAFKA_NODE_ID: 1KAFKA_PROCESS_ROLES: broker,controllerKAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cdc-kafka:9092,PLAINTEXT_HOST://localhost:29094KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:9093KAFKA_CONTROLLER_LISTENERS: CONTROLLER://0.0.0.0:9093CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"healthcheck:test: ["CMD", "bash", "-c", "kafka-broker-api-versions --bootstrap-server localhost:9092 | head -1"]interval: 10stimeout: 10sretries: 10start_period: 30sdebezium:image: debezium/connect:2.7container_name: cdc-debeziumdepends_on:kafka:condition: service_healthyports:- "8084:8083"environment:BOOTSTRAP_SERVERS: cdc-kafka:9092GROUP_ID: cdc-connect-clusterCONFIG_STORAGE_TOPIC: connect-configsOFFSET_STORAGE_TOPIC: connect-offsetsSTATUS_STORAGE_TOPIC: connect-statusCONFIG_STORAGE_REPLICATION_FACTOR: 1OFFSET_STORAGE_REPLICATION_FACTOR: 1STATUS_STORAGE_REPLICATION_FACTOR: 1healthcheck:test: ["CMD", "curl", "-f", "http://localhost:8083/"]interval: 15stimeout: 10sretries: 10start_period: 30sminio:image: minio/minio:latestcontainer_name: cdc-miniocommand: server /data --console-address ":9001"ports:- "9002:9000"- "9003:9001"environment:MINIO_ROOT_USER: minioadminMINIO_ROOT_PASSWORD: minioadminhealthcheck:test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]interval: 10stimeout: 5sretries: 5start_period: 10sminio-init:image: minio/mc:latestcontainer_name: cdc-minio-initdepends_on:minio:condition: service_healthyentrypoint: >/bin/sh -c "mc alias set myminio http://minio:9000 minioadmin minioadmin;mc mb myminio/paimon-warehouse --ignore-existing;exit 0;"jobmanager:build:context: .dockerfile: docker/flink/Dockerfileimage: cdc-flink-paimon:1.20container_name: cdc-flink-jobmanagercommand: jobmanagerports:- "8085:8081"- "6123:6123"environment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagerstate.backend.type: hashmapstate.checkpoints.dir: s3://paimon-warehouse/checkpointss3.endpoint: http://minio:9000s3.access.key: minioadmins3.secret.key: minioadmins3.path.style.access: truevolumes:- ./flink-sql-job:/opt/flink/usrlibdepends_on:kafka:condition: service_healthyminio:condition: service_healthytaskmanager:image: cdc-flink-paimon:1.20container_name: cdc-flink-taskmanagercommand: taskmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 2state.backend.type: hashmapstate.checkpoints.dir: s3://paimon-warehouse/checkpointss3.endpoint: http://minio:9000s3.access.key: minioadmins3.secret.key: minioadmins3.path.style.access: truevolumes:- ./flink-sql-job:/opt/flink/usrlibdepends_on:- jobmanager

关键配置

Kafka KRaft 配置

KAFKA_NODE_ID: 1                                    # 节点唯一标识,单节点为 1
KAFKA_PROCESS_ROLES: broker,controller              # 角色:broker 处理消息,controller 管理元数据
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,...       # 监听地址:0.0.0.0 表示接受所有网卡
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cdc-kafka:9092  # 广播地址:容器内使用容器名
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:9093    # 选举投票者:ID@host:port
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"                # 集群 ID,必须是 Base64 编码的 UUID

Debezium 环境变量

BOOTSTRAP_SERVERS: cdc-kafka:9092                   # Kafka 连接地址
GROUP_ID: cdc-connect-cluster                       # Connect 集群 ID
CONFIG_STORAGE_TOPIC: connect-configs               # 存储Connector配置的Topic
OFFSET_STORAGE_TOPIC: connect-offsets               # 存储消费偏移量的Topic
STATUS_STORAGE_TOPIC: connect-status                # 存储Connector状态的Topic

Flink 环境变量

jobmanager.rpc.address: jobmanager                  # JobManager 地址
taskmanager.numberOfTaskSlots: 2                    # TaskManager 槽位数
state.backend.type: hashmap                         # 状态后端类型
state.checkpoints.dir: s3://paimon-warehouse/checkpoints  # Checkpoint 目录
s3.endpoint: http://minio:9000                      # S3 端点(MinIO)
s3.path.style.access: true                          # 使用路径风格访问(MinIO 必需)

PostgreSQL Dockerfile

FROM postgres:15
RUN echo "wal_level = logical" >> /usr/share/postgresql/postgresql.conf.sample && \echo "max_replication_slots = 4" >> /usr/share/postgresql/postgresql.conf.sample && \echo "max_wal_senders = 4" >> /usr/share/postgresql/postgresql.conf.sample

Paimon 镜像构建

Dockerfile 完整内容

FROM flink:1.20-java17# ========================================
# 创建 S3 文件系统插件目录
# Flink 插件机制:每个插件必须在独立子目录中
# ========================================
RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \cd /opt/flink/plugins/s3-fs-hadoop && \curl -sL "https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.4/flink-s3-fs-hadoop-1.20.4.jar" -o flink-s3-fs-hadoop.jar# ========================================
# 下载 Paimon 和依赖到 lib 目录
# ========================================
RUN mkdir -p /opt/flink/lib && \cd /opt/flink/lib && \# Paimon 核心curl -sL "https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.20/1.0.0/paimon-flink-1.20-1.0.0.jar" -o paimon-flink.jar && \# Kafka SQL Connectorcurl -sL "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.20/flink-sql-connector-kafka-3.3.0-1.20.jar" -o flink-sql-connector-kafka.jar && \# Hadoop 客户端(包含 HdfsConfiguration)curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar" -o hadoop-client-api.jar && \curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar" -o hadoop-client-runtime.jar && \# commons-logging(Hadoop 依赖)curl -sL "https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar" -o commons-logging.jar && \# Paimon S3 支持curl -sL "https://repo1.maven.org/maven2/org/apache/paimon/paimon-s3/1.0.0/paimon-s3-1.0.0.jar" -o paimon-s3.jar# 验证下载
RUN ls -la /opt/flink/lib/*.jar && ls -la /opt/flink/plugins/s3-fs-hadoop/

Dockerfile 下载了以下 JAR 文件:

  • paimon-flink.jar(48MB):Paimon 核心 API,提供 Catalog、Table 等核心类。
  • paimon-s3.jar(32MB):Paimon S3 文件系统实现,支持访问 MinIO 和 AWS S3。
  • hadoop-client-api.jar(19MB):Hadoop 客户端 API,包含 ConfigurationHdfsConfiguration 等核心类。
  • hadoop-client-runtime.jar(30MB):Hadoop 运行时依赖,包含 Guava、Protobuf 等传递依赖。
  • commons-logging.jar(60KB):日志门面,Hadoop 的必需依赖。
  • flink-sql-connector-kafka.jar(5.6MB):Flink Kafka SQL Connector,支持读取 Debezium JSON 格式。
  • flink-s3-fs-hadoop.jar:Flink S3 文件系统插件,必须放在 plugins/ 目录。

Flink 有两个加载 JAR 的目录,区别在于加载方式:

/opt/flink/lib/:存放通用依赖 JAR。Flink 使用系统类加载器加载这些 JAR,所有 Job 共享。适合放置 Paimon、Hadoop、Kafka Connector 等必需依赖。

/opt/flink/plugins/<name>/:存放可选插件。每个插件使用独立的类加载器加载,避免依赖冲突。适合放置 S3 文件系统等可选功能。S3 文件系统插件必须放在 plugins/ 目录,因为它是可选的文件系统实现。

问题和解决

本节记录在实现 Flink SQL + Paimon 过程中遇到的依赖问题,以及每个问题的详细分析和解决方案。理解这些问题的根本原因,有助于在未来快速定位类似错误。

缺少 Hadoop Configuration 类

执行 CREATE CATALOG paimon 时报错:

org.apache.flink.table.api.ValidationException: Unable to create catalog 'paimon'.
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurationat org.apache.paimon.catalog.CatalogContext.<init>(CatalogContext.java:53)at org.apache.paimon.flink.FlinkCatalogFactory.createCatalog(...)

Paimon 在初始化 Catalog 时需要读取 Hadoop 配置(如 S3 endpoint、access key 等),这依赖于 org.apache.hadoop.conf.Configuration 类。然而 Flink 官方镜像 flink:1.20-java17 是精简镜像,不包含任何 Hadoop 相关 JAR。

在 Dockerfile 中添加 Hadoop 客户端依赖:

curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar" -o hadoop-client-api.jar
curl -sL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar" -o hadoop-client-runtime.jar

HdfsConfiguration 类找不到

错误内容

java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfigurationat org.apache.paimon.catalog.CatalogContext.<init>(CatalogContext.java:53)

最初尝试下载 hadoop-common.jar,它应该包含 Hadoop 的通用类。但 HdfsConfiguration 类实际上不在 hadoop-common 中,而是属于 HDFS 模块。Paimon 需要这个类来检测和配置 HDFS/S3 文件系统。正确的做法是使用 hadoop-client-api 这个聚合 JAR,它包含了客户端所需的所有核心类。

使用 hadoop-client-api-3.3.4.jar 替代分散的 Hadoop JAR。这个 JAR 是一个聚合包,包含:

  • Hadoop Common(通用工具类)
  • Hadoop HDFS(HDFS 客户端)
  • Hadoop MapReduce Client(MapReduce 客户端 API)

S3 协议不支持

错误日志

org.apache.paimon.fs.UnsupportedSchemeException: Could not find a file io implementation for scheme 's3' in the classpath.
FlinkFileIOLoader also cannot access this path.
Hadoop FileSystem also cannot access this path 's3://paimon-warehouse'.

即使配置了 warehouse = 's3://paimon-warehouse',Paimon 默认也不知道如何处理 S3 协议。Paimon 支持多种文件系统(本地文件系统、HDFS、S3 等),但 S3 支持需要单独的依赖包。

添加 Paimon S3 模块,这个 JAR 包含了 Paimon 对 S3 协议的实现,包括 AWS SDK 和 S3 客户端。

curl -sL "https://repo1.maven.org/maven2/org/apache/paimon/paimon-s3/1.0.0/paimon-s3-1.0.0.jar" -o paimon-s3.jar

MinIO Bucket 不存在

这个错误说明 Paimon 已经能够连接到 MinIO,但找不到指定的 bucket。与 AWS S3 不同,MinIO 不会自动创建 bucket必须显式创建。

com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket)

有两种方式创建 bucket:

手动创建(适合测试)

docker exec cdc-minio sh -c "mc alias set myminio http://localhost:9000 minioadmin minioadmin && mc mb myminio/paimon-warehouse"

自动创建(推荐用于生产)

# docker-compose.yml
minio-init:image: minio/mc:latestdepends_on:minio:condition: service_healthyentrypoint: >/bin/sh -c "mc alias set myminio http://minio:9000 minioadmin minioadmin;mc mb myminio/paimon-warehouse --ignore-existing;exit 0;"

Flink 需要 S3 文件系统插件,但插件必须放在正确位置。

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Could not find a file system implementation for scheme 's3'. 
The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop.
Please ensure that each plugin resides within its own subfolder within the plugins directory.

Flink 的插件机制有一个重要规则:每个插件必须放在独立的子目录中。例如:

  • 正确:/opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop.jar
  • 错误:/opt/flink/lib/flink-s3-fs-hadoop.jar

这是因为 Flink 使用独立的类加载器加载每个插件,避免依赖冲突。如果插件 JAR 直接放在 lib/ 目录,它会被系统类加载器加载,可能导致类冲突。

在 Dockerfile 中正确配置插件目录:

RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \cd /opt/flink/plugins/s3-fs-hadoop && \curl -sL "https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.4/flink-s3-fs-hadoop-1.20.4.jar" \-o flink-s3-fs-hadoop.jar

运行与验证

构建镜像并启动服务

# 进入项目目录
cd cdc-pipeline# 构建 Flink + Paimon 镜像
docker build -t cdc-flink-paimon:1.20 -f docker/flink/Dockerfile .# 启动所有服务
docker-compose up -d# 查看服务状态
docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"

预期输出

NAMES                   STATUS                    PORTS
cdc-postgres            Up (healthy)               0.0.0.0:5433->5432/tcp
cdc-kafka               Up (healthy)              0.0.0.0:9095->9092/tcp
cdc-debezium            Up (healthy)              0.0.0.0:8084->8083/tcp
cdc-minio               Up (healthy)              0.0.0.0:9002->9000/tcp
cdc-flink-jobmanager    Up                         0.0.0.0:8085->8081/tcp
cdc-flink-taskmanager   Up                         6123/tcp, 8081/tcp

验证 PostgreSQL 数据

docker exec cdc-postgres psql -U postgres -d cdc_demo -c "SELECT * FROM orders;"order_id | customer_id |    product_name     | quantity | amount  |  status  
----------+-------------+---------------------+----------+---------+----------        1 |        1001 | Wireless Headphones |        2 |   89.99 | pending        2 |        1002 | Mechanical Keyboard |        1 |  149.99 | shipped3 |        1003 | USB-C Hub           |        3 |   45.50 | delivered
(3 rows)

注册 Debezium Connector

  • topic.prefix:Topic 前缀。最终 Topic 格式为 <prefix>.<schema>.<table>,例如 cdc.public.orders
  • plugin.name:PostgreSQL 逻辑复制插件。pgoutput 是 PostgreSQL 原生插件(推荐),wal2json 是第三方插件。
  • slot.name:PostgreSQL 复制槽名称。Debezium 会自动创建这个复制槽。
  • snapshot.mode:快照模式。initial 表示首次全量同步后增量,never 表示仅增量(不读取历史数据)。
curl -X POST http://localhost:8084/connectors \-H "Content-Type: application/json" \-d '{"name": "postgres-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.hostname": "postgres","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "cdc_demo","topic.prefix": "cdc","table.include.list": "public.orders","plugin.name": "pgoutput","slot.name": "debezium_slot","snapshot.mode": "initial"}}'

验证 Kafka CDC 消息

docker exec cdc-kafka kafka-console-consumer \--bootstrap-server cdc-kafka:9092 \--topic cdc.public.orders \--from-beginning --max-messages 1 --timeout-ms 5000

测试 Paimon Catalog

# 创建测试 SQL 文件
cat > flink-sql-job/test-paimon.sql << 'EOF'
CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 's3://paimon-warehouse/','s3.endpoint' = 'http://minio:9000','s3.access-key' = 'minioadmin','s3.secret-key' = 'minioadmin','s3.path.style.access' = 'true'
);
SHOW CATALOGS;
EOF# 执行 SQL
docker exec cdc-flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/flink/usrlib/test-paimon.sql

输出结果表明catalog注册

[INFO] Execute statement succeeded.+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|          paimon |
+-----------------+
2 rows in set

以下是 flink-sql-job/cdc-to-paimon.sql 完整内容

-- Step 1: Create Paimon Catalog with MinIO (S3-compatible storage)
CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 's3://paimon-warehouse','s3.endpoint' = 'http://minio:9000','s3.access-key' = 'minioadmin','s3.secret-key' = 'minioadmin','s3.path.style.access' = 'true'
);USE CATALOG paimon;-- Step 2: Create database and table
CREATE DATABASE IF NOT EXISTS cdc;
USE cdc;-- Step 3: Create Paimon table with Primary Key for CDC support
CREATE TABLE IF NOT EXISTS orders (order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at TIMESTAMP,updated_at TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket' = '4','compaction.min.file-num' = '5','compaction.max.file-num' = '50','compaction.target-file-size' = '128 mb'
);-- Step 4: Create Kafka source table using Debezium JSON format
CREATE TEMPORARY TABLE kafka_cdc (op STRING,ts_ms BIGINT,`after` ROW<order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at BIGINT,updated_at BIGINT>
) WITH ('connector' = 'kafka','topic' = 'cdc.public.orders','properties.bootstrap.servers' = 'cdc-kafka:9092','properties.group.id' = 'flink-cdc-consumer','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json','debezium-json.schema-include' = 'true'
);-- Step 5: Insert CDC data into Paimon table
INSERT INTO orders
SELECT `after`.order_id,`after`.customer_id,`after`.product_name,`after`.quantity,`after`.amount,`after`.status,TO_TIMESTAMP_LTZ(`after`.created_at / 1000, 3),TO_TIMESTAMP_LTZ(`after`.updated_at / 1000, 3)
FROM kafka_cdc
WHERE op IN ('c', 'r', 'u');

创建 Paimon Catalog。这个语句创建一个 Paimon Catalog,用于管理 Paimon 表。

  • warehouse:Paimon 表的存储位置。使用 S3 协议访问 MinIO。
  • s3.endpoint:MinIO 的 S3 API 端点。注意使用容器名 minio 而非 localhost
  • s3.path.style.access:设为 true 表示使用路径风格 URL(http://minio:9000/bucket/key),而非虚拟主机风格(http://bucket.minio:9000/key)。MinIO 必须使用路径风格。
CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 's3://paimon-warehouse','s3.endpoint' = 'http://minio:9000','s3.access-key' = 'minioadmin','s3.secret-key' = 'minioadmin','s3.path.style.access' = 'true'
);

创建 Paimon Primary Key 表,Paimon Primary Key 表的关键配置:

  • PRIMARY KEY (order_id) NOT ENFORCED:定义主键。NOT ENFORFORCED 表示 Flink 不强制执行主键约束(由 Paimon 处理)。Primary Key 表支持 INSERT、UPDATE、DELETE 操作,Paimon 会根据主键自动进行 Upsert。

  • bucket:分桶数。数据按主键哈希分配到不同桶中,影响并发写入能力。建议设为并发度的 2-4 倍。

  • compaction.min.file-num:触发 Compaction 的最小文件数。当文件数达到 5 个时,自动合并。

  • compaction.max.file-num:最大文件数阈值。超过 50 个文件时强制 Compaction。

  • compaction.target-file-size:Compaction 目标文件大小。合并后生成 128MB 的大文件。

CREATE TABLE IF NOT EXISTS orders (order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at TIMESTAMP,updated_at TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('bucket' = '4','compaction.min.file-num' = '5','compaction.max.file-num' = '50','compaction.target-file-size' = '128 mb'
);

创建 Kafka Source 表。Kafka Source 表使用 Debezium JSON 格式解析 Kafka 消息:

  • connector:使用 Kafka 连接器。
  • topic:Debezium 创建的 CDC Topic,格式为 <prefix>.<schema>.<table>
  • properties.group.id:Kafka Consumer Group ID。Flink 通过这个 ID 管理 offset。
  • scan.startup.modeearliest-offset 表示从最早的消息开始消费。其他选项:latest-offset(仅消费新消息)、specific-offsets(从指定 offset 开始)。
  • formatdebezium-json 表示消息格式为 Debezium JSON。
  • debezium-json.schema-includetrue 表示 Kafka 消息包含 schema 信息。

Schema 定义直接映射 Debezium JSON 的 payload 结构:

  • op:操作类型(crud
  • ts_ms:Debezium 处理时间戳
  • after:变更后的数据,使用 ROW 类型定义嵌套结构
CREATE TEMPORARY TABLE kafka_cdc (op STRING,ts_ms BIGINT,`after` ROW<order_id BIGINT,customer_id BIGINT,product_name STRING,quantity INT,amount DECIMAL(10, 2),status STRING,created_at BIGINT,updated_at BIGINT>
) WITH ('connector' = 'kafka','topic' = 'cdc.public.orders','properties.bootstrap.servers' = 'cdc-kafka:9092','properties.group.id' = 'flink-cdc-consumer','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json','debezium-json.schema-include' = 'true'
);

INSERT INTO 语句。这个语句将从 Kafka 读取的 CDC 数据写入 Paimon 表:

  • op IN ('c', 'r', 'u'):过滤操作类型。c 是插入,r 是快照读取,u 是更新。不处理 d(删除)操作,因为演示场景不需要。

  • TO_TIMESTAMP_LTZ(value / 1000, 3):Debezium 的时间戳是微秒,需要除以 1000 转换为毫秒,然后用 3 位精度解析。

  • after.field:从 after 结构中提取字段值。

INSERT INTO orders
SELECT `after`.order_id,`after`.customer_id,`after`.product_name,`after`.quantity,`after`.amount,`after`.status,TO_TIMESTAMP_LTZ(`after`.created_at / 1000, 3),TO_TIMESTAMP_LTZ(`after`.updated_at / 1000, 3)
FROM kafka_cdc
WHERE op IN ('c', 'r', 'u');

有两种方式提交 Flink SQL Job:

使用 SQL Client

# 进入 Flink JobManager 容器
docker exec -it cdc-flink-jobmanager bash# 使用 SQL Client 执行 SQL 文件
./bin/sql-client.sh -f /opt/flink/usrlib/cdc-to-paimon.sql

使用 SQL Gateway

# 启动 SQL Gateway(需要额外配置)
./bin/sql-gateway.sh start# 通过 REST API 提交 SQL
curl -X POST http://localhost:8083/v1/sessions \-H "Content-Type: application/json" \-d '{"session_name": "cdc-job"}'

文件路径验证

MinIO Bucket 内容

# 列出 bucket
docker exec cdc-minio mc alias set myminio http://localhost:9000 minioadmin minioadmin
docker exec cdc-minio mc ls myminio/# 输出
# [2026-05-24 09:32:46 UTC]     0B paimon-warehouse/

Paimon 表文件结构

# 列出表目录
docker exec cdc-minio mc ls myminio/paimon-warehouse/cdc.db/orders/# 预期结构
# schema/           - Schema 定义文件
# snapshot/         - 快照文件(如果已插入数据)
# manifest/         - 清单文件(如果已插入数据)
# data/             - 数据文件(如果已插入数据)

Schema 文件内容

# 查看 schema 文件
docker exec cdc-minio mc cat myminio/paimon-warehouse/cdc.db/orders/schema/schema-0# 输出(JSON 格式的表结构)
{"id": 0,"fields": [{"id": 0, "name": "order_id", "type": "BIGINT"},{"id": 1, "name": "customer_id", "type": "BIGINT"},...],"primaryKeys": ["order_id"],...
}
http://www.jsqmd.com/news/879134/

相关文章:

  • 告别命令行!在Ubuntu标题栏实时显示网速和CPU的保姆级教程(Indicator-Sysmonitor)
  • 上海图书馆档案搬迁推荐——图书馆档案室整体搬迁避坑指南|7个高危陷阱逐一拆解 - 知行集录
  • AI病理分析:结构化证据提取链路怎么搭,才能真正进入科研流程
  • 云南私人定制导游服务排行 附正规预定全流程 - 奔跑123
  • AI视频生成“假熟练”陷阱(83%用户未察觉):3个隐藏技能断层导致输出质量长期停滞
  • Reloaded-II依赖解析机制深度剖析与循环依赖解决方案
  • kkFileView在Linux服务器上安装踩坑全记录:从字体乱码到Office组件报错的保姆级排错指南
  • 融合模糊决策与ECSA优化的软件项目智能风险评估框架
  • claude code 底层技术
  • DeepSeek模型量化部署翻车现场复盘:INT4精度崩塌、KV Cache错位、Tokenizer解码乱码——火山引擎专家团48小时根因分析报告
  • 2026年4月目前专业的凿井绞车企业推荐,凿井绞车/2JZ型凿井绞车/多绳摩擦式提升机,凿井绞车源头厂家选哪家 - 品牌推荐师
  • 如何打造专属AI工作空间:Chatbox主题与界面个性化全攻略
  • Real-ESRGAN-GUI:免费AI图像增强工具终极指南,模糊图片秒变高清
  • BERTopic主题模型可视化全攻略:5种图表从安装到解读,让你的分析报告更出彩
  • FCS模拟异常扩散:从布朗运动到CTRW的仿真与模型鉴别
  • 终极代码逻辑可视化工具:用AI技术将复杂源码转化为人类可读逻辑
  • 2026年4月国内质量好的便携式非甲烷总烃分析仪制造企业推荐,气象环境在线监测仪,便携式非甲烷总烃分析仪工厂哪家好 - 品牌推荐师
  • 手把手教你用Python复现FBCNet:一个融合FBCSP与CNN的脑电解码SOTA模型
  • 视频转音频MP3最全指南:手机、电脑、在线工具一网打尽 - 小有的家
  • 通过TaotokenTokenPlan套餐实现大模型用量与成本的可预测管理
  • ETCD部署
  • 2026年佛山旧房精改全景评测:行业协会数据+业主口碑双核驱动的6强榜单 - 优家闲谈
  • 别再死记硬背贝叶斯公式了!用Python+主观贝叶斯,手把手教你做个简单的智能推理小工具
  • 2026年4月诚信的智能监控系统机构推荐,简单易上手,无需复杂培训 - 品牌推荐师
  • SDCPC 2026 游记
  • ENVI 5.x 保姆级教程:从零绘制你的第一个高光谱3D数据立方体(含去黑边技巧)
  • Poppins字体:终极多语言开源字体解决方案,9种字重+天城文支持
  • 2026上海展台设计搭建公司评测:上海帝斓展览有限公司 - 寻茫精选
  • 2026 专业无损去水印工具推荐|免费去水印软件对比|合法获取高清素材的方法 - 爱上科技热点
  • 2026深度测评:杭州GEO优化服务商TOP5避坑选型指南 - 品牌报告