当前位置: 首页 > news >正文

Flink Data Sink 理论 、架构、语义保证、两阶段提交与可插拔拓扑 - 指南

1. 总览:Sink 在作业中的位置

  • Source 负责读取(FLIP-27),Sink 负责写出(FLIP-191/372)。
  • Sink 是工厂式接口,生产在 TaskManager 上运行的 SinkWriter;高级场景可再拼装 CommittingSinkWriter + Committer 和自定义拓扑。

2. 核心 API 与职责边界

Sink(可序列化的工厂)

SinkWriter(数据面)

CommittingSinkWriter / Committer(控制面)

3. 快速上手:把 Sink 接到 DataStream

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream = env.fromSource(new MySource(...),WatermarkStrategy.noWatermarks(),"MySource");Sink<Integer> mySink = new MySink(...);stream.sinkTo(mySink);env.execute("Sink API Quickstart");

4. SinkWriter:写入、Flush 与水位线

  • write() 内只做“快速、可中断”的写入(写入内存缓冲/批次),避免长阻塞。
  • flush(endOfInput) 用于在 checkpoint 或收尾时刷出所有挂起数据
  • writeWatermark() 常见用法是“事件时间驱动的滚动”(例如滚动小文件/桶)。

5. Writer 状态:失败恢复与再平衡

实现 SupportsWriterState → Writer 变成 StatefulSinkWriter

6. Exactly-Once:两阶段提交(CommittingSinkWriter + Committer)

时序

  1. 正常写入 → checkpoint barrier 到达;
  2. CommittingSinkWriter.prepareCommit(flush=true):把临时产物落稳并产出 Committable
  3. Committer.commit():对齐成功后原子提交(rename/事务 commit);
  4. 恢复时:重复的 Committable 可幂等(已存在就跳过)。

关键要点

  • 事务/临时对象命名建议包含 checkpointId
  • Commit 操作必须 可重试+幂等
  • Writer 的 flush 只保证 At-Least-Once,Exactly-Once 由 Committer 收尾。

7. 自定义 Sink 拓扑(配图)

7.1 SupportsPreWriteTopology(Writer 之前)

在这里插入图片描述

用途:写前 分区重分发 / 乱序缓冲 / 批量整形。
典型场景:把同一 topic/partition 的数据路由到同一个 SinkWriter

7.2 SupportsPreCommitTopology(Writer 和 Committer 之间)

在这里插入图片描述

用途:将各并行 Writer 的 Committable 汇聚到少量/单一子任务做批量提交去重与重试,显著降低后端交互压力。

7.3 SupportsPostCommitTopology(Committer 之后)

在这里插入图片描述

用途:提交后进行 小文件合并构建索引审计/告警 等异步优化,常见于对象存储/HDFS。

8. 实战最佳实践与反模式

最佳实践

反模式

9. 测试清单

  • 单元:WriterState/Committable 序列化、幂等行为;
  • 组件:Flush 触发(条数/定时/checkpoint)、写后可见性;
  • 容错:随机 kill TM/JM 验证 Exactly-Once;
  • 扩缩容:state 重分配后的续写/续事务;
  • 长稳:句柄/连接池泄漏、内存曲线、对象回收。

10. 可复用的最小代码骨架

真实业务替换“外部系统写入/提交”即可。以下代码均带中文关键注释

10.1 最小可用 Sink(At-Least-Once)

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.common.eventtime.Watermark;
import java.io.IOException;
/** 一个最小的 At-Least-Once Sink:仅实现 Writer,并在 checkpoint/end 时 flush。 */
public class ConsoleLikeSink implements Sink<String> {@Overridepublic SinkWriter<String> createWriter(InitContext context) {return new ConsoleWriter();}static class ConsoleWriter implements SinkWriter<String> {@Overridepublic void write(String element, Context context) throws IOException {// 写入逻辑(尽量非阻塞/可中断)System.out.println("WRITE: " + element);}@Overridepublic void flush(boolean endOfInput) throws IOException {// 在 checkpoint 或输入结束时调用:刷新缓冲区/刷盘System.out.flush();}@Overridepublic void writeWatermark(Watermark watermark) {// 可选:基于水位线做滚动/分桶策略}@Overridepublic void close() throws Exception { /* 释放资源 */ }}}

10.2 带 Writer 状态的 Sink(SupportsWriterState

import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/** 保存“当前批次缓冲”的状态,用于失败恢复后继续写入。 */
public class StatefulBatchingSink implements StatefulSink<String, StatefulBatchingSink.WriterState> {/** WriterState 示例:仅持久化缓冲条数。生产中可携带临时文件名、分桶等更多信息。 */public static class WriterState { public final int buffered; public WriterState(int b){this.buffered=b;} }@Overridepublic StatefulSinkWriter<String, WriterState> createWriter(InitContext ctx) throws IOException {return new BatchingWriter(0);}@Overridepublic StatefulSinkWriter<String, WriterState> restoreWriter(InitContext ctx, Collection<WriterState> recovered) throws IOException {int restored = recovered.stream().mapToInt(s -> s.buffered).sum();return new BatchingWriter(restored);}@Overridepublic SimpleVersionedSerializer<WriterState> getWriterStateSerializer() {return new SimpleVersionedSerializer<WriterState>() {@Override public int getVersion() { return 1; }@Override public byte[] serialize(WriterState s) { return new byte[]{(byte)s.buffered}; }@Override public WriterState deserialize(int v, byte[] b) { return new WriterState(b[0]); }};}/** 实际 Writer:示例按条数批量 flush,状态记录缓冲条数。 */static class BatchingWriter implements StatefulSinkWriter<String, WriterState> {private int buffered;BatchingWriter(int initial){ this.buffered = initial; }@Overridepublic void write(String element, Context ctx) throws IOException {// 真实实现:写入内存缓冲/临时文件buffered++;if (buffered >= 1000) flush(false);}@Overridepublic void flush(boolean endOfInput) throws IOException {if (buffered > 0) {// 把缓冲批次一次性写出至目标(操作需幂等或可重放)// doFlush(buffered);buffered = 0;}}@Overridepublic List<WriterState> snapshotState(long checkpointId) {// 把“尚未刷新的缓冲量”记录到状态,恢复后无缝续写return List.of(new WriterState(buffered));}@Override public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark wm) {}@Override public void close() {}}}

10.3 Exactly-Once Sink(SupportsCommitter:两阶段提交)

import org.apache.flink.api.connector.sink2.CommittingSink;
import org.apache.flink.api.connector.sink2.CommittingSink.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/** 通过“临时对象 + 原子提交(如 rename/txn.commit)”实现 Exactly-Once。 */
public class ExactlyOnceTxnSink implements CommittingSink<String, ExactlyOnceTxnSink.Committable> {/** 预提交产物:例如临时文件路径或事务ID。 */public static class Committable { public final String temp; public Committable(String t){this.temp=t;} }@Overridepublic CommittingSinkWriter<String, Committable> createWriter(InitContext context) throws IOException {return new TxnWriter();}@Overridepublic Committer<Committable> createCommitter(CommitterInitContext context) throws IOException {return new TxnCommitter();}/** Writer:写到临时对象;在 checkpoint 上返回 Committable。 */static class TxnWriter implements CommittingSinkWriter<String, Committable> {private final String temp = "tmp-" + System.nanoTime();@Overridepublic void write(String element, Context ctx) throws IOException {// 实际实现:把数据写到 temp(事务缓冲/临时文件)}@Overridepublic Collection<Committable> prepareCommit(boolean flush) throws IOException {if (flush) {// 将临时对象“落稳”(例如 fsync/close)}return List.of(new Committable(temp));}@Override public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark wm) {}@Override public void close() {}}/** Committer:对齐成功后原子提交;需保证幂等(已提交则跳过)。 */static class TxnCommitter implements Committer<Committable> {@Overridepublic void commit(Collection<CommitRequest<Committable>> requests) throws IOException, InterruptedException {for (var r : requests) {String temp = r.getCommittable().temp;// 例如:rename(temp, finalPath) 或 txn.commit(transactionId)// 若目标已存在则安全返回(幂等)}}@Override public void close() throws Exception {}}}

结语

Data Sink API 把“写入(Writer)— 状态(State)— 提交(Committer)— 拓扑(Pre/PreCommit/PostCommit)”分层清晰:

  • 仅实现 Writer 即可获得 At-Least-Once
  • WriterState 即可失败恢复/扩缩容无缝续写;
  • 引入 CommittingSinkWriter + Committer 实现 Exactly-Once
  • 借助三类 拓扑扩展点,可以优雅地完成路由/集中提交/小文件合并等高级需求。
http://www.jsqmd.com/news/41205/

相关文章:

  • 滞留卡常题
  • 2025年推拉窗源头厂家权威推荐榜单:性价比门窗/系统窗/自建房门窗源头厂家精选
  • Cursor ai network issue workaround in Ubuntu 22.04
  • 2025 年漆渣脱水设备厂家最新推荐榜单:优质品牌厂家工艺系统装置全解析,助力企业高效环保处置漆渣脱水系统/漆渣脱水机/漆渣脱水装置厂家推荐
  • 2025 最新喷漆废水处理公司推荐!喷漆废水处理设备 / 药剂 / 工艺 / 循环回用系统优质品牌榜单,含技术改造与运维服务厂家优选
  • [KaibaMath]1024 丑陋的真子集符号⫋的由来
  • 安装Ubuntu
  • 完整教程:VScode 入门(设置篇)
  • 微服务架构中的 Token 工作机制详解
  • [KaibaMath]1023 柯西不等式的简洁证明
  • 2025 最新网架厂家权威排行榜:焊接球 / 螺栓球 / 大跨度等多类型网架实力企业最新推荐
  • WEB集群-HTTP概述与Nginx部署
  • 实战内容
  • 2025 最新无缝钢管厂家推荐榜:国际测评认证 + 技术创新 + 全场景适配权威指南
  • 【Qt开发】多元素类控件(二)-> QTableWidget - 实践
  • BBS伪随机数生成器
  • [KaibaMath]1022 一道平面几何题的两种解法
  • 实用指南:从0开始了解kafka《第二篇 kafka的安装、管理和配置》
  • 动态规划法
  • 函数表达式:JavaScript中那些你不知道的优雅写法 - 教程
  • 11.15模拟赛
  • 2025 最新无缝钢管优质厂家推荐:国际测评认证 + 技术创新 + 全场景适配 + 服务保障综合榜单
  • 西门子S7200_SMART仿真软件的使用(保姆级教程)
  • 2025年RS485红外线测温仪源头厂家权威推荐榜单:在线红外测温仪/20mA红外线测温仪/红外线测温仪变送器源头厂家精选
  • P14508 猜数游戏 guess
  • AMD Instinct MI50 通过llama.cpp 在 ROCm7.0.2上运行
  • 如何成为高级的安卓逆向工程师 glm4.6
  • PyTorch实战(9)——从零开始实现Transformer - 教程
  • 天津雅思培训机构排名2025,无老师国际/新通教育等优质机构,师资/口碑/提分率大PK
  • 2025 最新无缝钢管源头厂家推荐:国际测评认证 + 技术创新 + 全场景适配 + 服务保障综合榜单