当前位置: 首页 > news >正文

PyFlink Connectors 如何在 Python 作业里正确使用 Kafka/JSON 等连接器(JAR 依赖、DDL 建表、pipeline.jars、内置 Source/Sink、

1. PyFlink 为什么要手动指定 Connector/Format JAR?

因为:

  • Flink 核心运行时在 JVM 上
  • connector(如 kafka)和 format(如 json)都是 JVM 侧实现
  • Python 代码只是驱动 Table/SQL 的规划与提交

所以你需要通过pipeline.jars指定依赖(多个 jar 用;分隔):

table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

实战建议:

  • connector jar 和 format jar 都要带上(例如 Kafka + JSON)
  • 路径用file:///这种绝对 URI,避免分布式环境找不到文件
  • 生产上更推荐把 jar 放到统一位置(Flink lib 或制品仓)并在提交时声明依赖,pipeline.jars适合快速验证与 demo

2. 在 PyFlink Table API 中,推荐用 DDL 定义 Source/Sink

PyFlink 的 Table API 使用 connector 最推荐的方式是:DDL + execute_sql()
理由很简单:DDL 更直观、更可复制、也最接近线上 SQL Gateway/SQL Client 的使用方式。

2.1 Kafka Source/Sink + JSON Format(最小可用示例)

source_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table").wait()

关键点拆解:

  • execute_sql()注册表(source/sink)
  • sql_query()产出一个 Table
  • execute_insert()触发写入(并提交作业)
  • .wait()在本地/mini cluster 场景常用,用于等待作业执行(远程集群通常不建议一直 wait)

3. 完整可运行的 Python 结构(把 jar、DDL、DML 串起来)

你给的完整示例结构非常标准,我建议你在博客里也用这种方式组织代码:

frompyflink.tableimportTableEnvironment,EnvironmentSettingsdeflog_processing():env_settings=EnvironmentSettings.in_streaming_mode()t_env=TableEnvironment.create(env_settings)# 1) 指定 connector & format jarst_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")# 2) DDL: source/sinksource_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)# 3) DML: query + insertt_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table")\.wait()if__name__=='__main__':log_processing()

4. PyFlink 里“内置”的 Sources/Sinks:不用额外 jar 也能跑

除了 Kafka 这类外部 connector,Flink 也提供了一些“开箱即用”的数据源/数据汇,特别适合本地调试与单测。

4.1 from/to Pandas(非常适合快速验证)

frompyflink.table.expressionsimportcolimportpandasaspdimportnumpyasnp pdf=pd.DataFrame(np.random.rand(1000,2))table=t_env.from_pandas(pdf,["a","b"]).filter(col('a')>0.5)pdf2=table.to_pandas()

注意:to_pandas()会把结果收集到客户端内存,生产慎用,建议先limit()

4.2 from_elements():用 Python 集合直接造表

frompyflink.tableimportDataTypes# 自动推断table_env.from_elements([(1,'Hi'),(2,'Hello')])# 指定字段名table_env.from_elements([(1,'Hi'),(2,'Hello')],['a','b'])# 指定 schema(更稳)table_env.from_elements([(1,'Hi'),(2,'Hello')],DataTypes.ROW([DataTypes.FIELD("a",DataTypes.INT()),DataTypes.FIELD("b",DataTypes.STRING())]))

这类内置 source 对写教程、做 POC、复现 bug 特别省事。

5. 自定义 Sources & Sinks:Python 不能直接写,需 Java/Scala 实现

文档明确说明了现阶段的边界:

  • 自定义 source/sink 需要 Java/Scala 实现
  • Python 侧可以通过实现 TableFactory(也是 Java/Scala)让它能被 DDL 发现并使用

也就是说:你可以用 PyFlink 写作业逻辑,但 connector 生态仍然是 JVM 的。

如果你后面要写“自定义 connector”系列博客,可以按这个路线写:

  • 先用 Java 写 DynamicTableSourceFactory / DynamicTableSinkFactory(SPI 注册)
  • 再在 PyFlink 里通过 DDL'connector'='xxx'直接使用

6. 常见踩坑清单(PyFlink Connector 场景高频问题)

  • 只加了 connector jar,没加 format jar:DDL 里用了'format'='json',但没带 json format 的 jar,会在运行期报找不到 format factory
  • pipeline.jars 路径不可达:本地 file 路径对集群 TaskManager 不可见,必须用集群可访问路径或随 job 提交
  • 用 DDL 建表但没触发执行:Table/SQL 是惰性执行,必须execute_insert()execute_sql(INSERT ...)才会提交作业
  • wait() 用错场景:本地调试很方便;远程集群提交通常希望异步返回,避免客户端阻塞
http://www.jsqmd.com/news/219898/

相关文章:

  • AI+FFMPEG:用自然语言生成视频处理脚本
  • 教学实践:如何在计算机课程中使用Llama Factory开展大模型实验
  • 用Llama Factory实现多模态微调:图文结合的新可能
  • 模型压缩:使用Llama Factory将大模型瘦身90%的实用技巧
  • AI如何加速AARCH64架构下的开发流程
  • 零基础玩转GD32:EMBEDDED BUILDER入门指南
  • Llama Factory全自动:设置好参数就让模型夜间自动训练完成
  • 多情感语音合成PK:Sambert-Hifigan支持喜怒哀乐语调调节实测
  • 儿童教育产品集成案例:识字APP接入TTS实现发音指导
  • 零基础入门:10分钟用VueDraggable创建可拖拽列表
  • 二次开发:基于Llama Factory源码定制专属模型训练平台
  • NanoPi R5S OpenWrt固件终极优化:实测千兆网络性能爆发指南
  • AList终极指南:3步打造你的智能文件管理中心
  • Android开发新手必看:ADB Daemon错误完全指南
  • OCR技术对比:CRNN在不同场景下的表现
  • 如何用AI快速生成MC.JS1.8.8的插件代码?
  • 用APOLLO快速构建微服务配置原型系统
  • Sambert-Hifigan语音合成实战:3步部署中文多情感TTS服务
  • AI有声书制作全流程:Sambert-Hifigan实现长文本自动分段合成
  • 从入门到精通:Llama Factory全量微调云端实战手册
  • 用AI加速Node-RED开发:5个智能节点推荐
  • 终极指南:如何利用Mosquitto遗嘱消息构建智能设备离线监控系统
  • RuoYi-Vue3动态表单生成器完整使用指南
  • PyFlink Metrics 在 UDF 里埋点(Counter/Gauge/Distribution/Meter)、分组 Scope、生产可观测性最佳实践
  • 如何快速掌握AppSmith:新手的完整无代码开发指南
  • OpenCode环境变量定制化配置:打造专属AI编程工作流
  • Deepoc-M:低幻觉AI大模型,为数学教育与科研注入新动能
  • Llama Factory终极指南:从云环境选型到高级调参技巧
  • SNMP开发效率提升:传统vs现代工具对比
  • ElevenClock:重新定义Windows 11任务栏时钟体验