当前位置: 首页 > news >正文

Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

1. JsonDeserializationSchema:KafkaSource 中反序列化 POJO

JsonDeserializationSchema实现了 Flink 的DeserializationSchema,因此只要某个 connector 支持DeserializationSchema,你就能直接使用它。

典型用法:KafkaSource 只消费 value,反序列化成 POJO:

JsonDeserializationSchema<SomePojo>jsonFormat=newJsonDeserializationSchema<>(SomePojo.class);KafkaSource<SomePojo>source=KafkaSource.<SomePojo>builder().setValueOnlyDeserializer(jsonFormat)// ....build();

适用场景:

  • Kafka 的 value 是 JSON
  • 你希望在 DataStream 里直接拿到业务对象SomePojo

工程建议:

  • POJO 字段尽量使用包装类型(Integer/Long)应对字段缺失或 null
  • 为了兼容字段变动,可以配合 ObjectMapper 设置忽略未知字段(见第 3 节)

2. JsonSerializationSchema:KafkaSink 中序列化 POJO

写回 Kafka 时,JsonSerializationSchema实现了SerializationSchema,可用于任何支持SerializationSchema的 connector。

典型用法:KafkaSink 写 value,序列化 POJO 为 JSON:

JsonSerializationSchema<SomePojo>jsonFormat=newJsonSerializationSchema<>();KafkaSink<SomePojo>sink=KafkaSink.<SomePojo>builder().setRecordSerializer(newKafkaRecordSerializationSchemaBuilder<SomePojo>().setValueSerializationSchema(jsonFormat)// ....build()).build();

适用场景:

  • 你希望下游系统继续消费 JSON
  • 你不想自己手写 Jackson 序列化逻辑

3. 自定义 ObjectMapper:控制 Jackson 行为(非常常用)

Flink 允许你通过构造函数传入SerializableSupplier<ObjectMapper>来定制 mapper,相当于提供一个“ObjectMapper 工厂”。

你可以用它做很多工程级增强,比如:

  • 忽略未知字段(兼容上游 schema 变更)
  • 注册模块(Java 时间类型、参数名模块等)
  • 开启/关闭某些序列化特性(字段排序、空值处理等)

示例:自定义序列化 mapper,让 map key 有序,并注册模块:

JsonSerializationSchema<SomeClass>jsonFormat=newJsonSerializationSchema<>(()->newObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS).registerModule(newParameterNamesModule()));

你也可以把“兼容字段变更”的设置加进去(强烈建议生产开启类似配置):

  • FAIL_ON_UNKNOWN_PROPERTIES关闭
  • JavaTimeModule 等

(这里不展开写完整 mapper 配置,你只要知道:用 supplier 你就能完全掌控 Jackson。)

4. PyFlink:Row 类型用 JsonRowSerializationSchema / JsonRowDeserializationSchema

在 PyFlink 中,Flink 内置了 Row 的 JSON Schema:

  • JsonRowDeserializationSchema
  • JsonRowSerializationSchema

这对 Python 流处理特别友好,因为 Python 侧更常操作 Row 而不是 POJO 类。

KafkaSource:JSON -> Row

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowDeserializationSchema.builder()\.type_info(row_type_info)\.build()source=KafkaSource.builder()\.set_value_only_deserializer(json_format)\.build()

KafkaSink:Row -> JSON

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowSerializationSchema.builder()\.with_type_info(row_type_info)\.build()sink=KafkaSink.builder()\.set_record_serializer(KafkaRecordSerializationSchema.builder().set_topic('test').set_value_serialization_schema(json_format).build())\.build()

适用场景:

  • Python 处理流数据,行结构清晰
  • Kafka 中 value 为 JSON

5. 选型建议:POJO vs ObjectNode vs Row

  • Java POJO:类型安全、IDE 友好、适合稳定 schema 的业务流
  • ObjectNode:更灵活,适合 schema 频繁变化、半结构化数据
  • PyFlink Row:Python 生态更顺手,适合表/行式处理
http://www.jsqmd.com/news/248287/

相关文章:

  • 【项目管理】项目管理流程文件(PPT)
  • 火焰识别,火焰检测,火灾检测,基于yolov5的火焰检测,可以检测视频和图片,视频实时检测,将训练好的模型部署到英伟达边缘计算 基于 YOLOv5 的高精度、高帧率火焰检测系统
  • 学长亲荐2026 MBA必用TOP10 AI论文工具测评
  • 期刊论文投稿快人一步!虎贲等考 AI 解锁学术发表 “加速器”
  • 还在为降重降 AIGC 抓狂?虎贲等考 AI:学术改写天花板,两步搞定合规论文
  • PetaLinux工程目录设备树文件结构与作用
  • 机器人诊断系统十年演进
  • 智能巡检车、无人机道路检测、AI 路况分析平台 智慧交通 驾驶视角道路病害缺陷检测数据集 建立基于深度学习框架YOLOV8道路病害缺陷检测系统 裂纹 网快 坑洼
  • ECC错误
  • 机器人感知技术十年演进
  • 使用C#控制台批量删除 Unity目录里的 .meta文件
  • 机器人日志十年演进
  • 全方位CRM源码系统功能详解,完全开源,支持个性化定制
  • 机器人诊断十年演进
  • 亲测好用10个AI论文网站,专科生毕业论文轻松搞定!
  • 支持多终端的CRM系统源码 带完整的搭建部署教程以及源代码包
  • 移动机器人十年演进
  • 自动驾驶十年演进
  • 学长亲荐2026研究生AI论文网站TOP9:开题报告文献综述神器
  • 具身智能十年演进
  • 自从进了这个京东捡漏群,拿了很多低价商品!
  • 通用十年演进母模型
  • 深入浅出HDFS:分布式文件系统核心原理与实践解析
  • Are you authorized to profile this page? No probe response, Blackfire not properly installed or inva
  • sudo setenforce 0的庖丁解牛
  • 电力线温度在线监测装置设计与实现(有完整资料)
  • 稳定性质量系列-系统稳定性建设实践
  • 国际消费中心城市DID(2007-2023)
  • 基于R语言的贝叶斯网络模型的实践技术应用
  • 中国省市县医院可达性数据集