当前位置: 首页 > news >正文

PyFlink Table API 用纯 Python 写一个 WordCount(读 CSV + 聚合 + 写出)

1. 你将构建什么

目标 pipeline:

1)Source:filesystem connector + csv format(从输入文件读一列字符串)
2)Transform:UDTF split + group by word + count
3)Sink:filesystem connector(写到输出路径)或 print connector(打印到 stdout)

核心关键词你会全程用到:

  • TableEnvironment:Table API 的入口
  • TableDescriptor / DDL:两种注册表的方式
  • UDTF(表函数):把一行拆成多行
  • execute_insert:触发真正执行(Table API 默认是 lazy 的)

2. 环境准备与安装

要求:

  • Java 11
  • Python 3.9 / 3.10 / 3.11 / 3.12
  • 安装 PyFlink

安装命令:

python-mpipinstallapache-flink

安装完就能直接写 Python Table API 作业并在本地 mini cluster 运行。

3. TableEnvironment:作业的“总控台”

Table API 程序从创建 TableEnvironment 开始:

t_env=TableEnvironment.create(EnvironmentSettings.in_streaming_mode())t_env.get_config().set("parallelism.default","1")

这里有两个点很关键:

  • in_streaming_mode():按流模式运行(即使你读的是文件,也可以跑;只是语义和执行细节会不同)
  • parallelism.default = 1:让结果写到一个文件里更直观,否则并行度高时可能输出多分片

如果你确定输入是有界文件并且只想跑批处理,也可以用 batch mode(不同版本 API 名称略有差异,记住:批场景选择 batch 更符合直觉)。

4. 注册 Source/Sink 的两种方式:TableDescriptor vs DDL

4.1 方式 A:TableDescriptor(更“Python 风格”)

注册 source(filesystem + csv):

t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word',DataTypes.STRING()).build()).option('path',input_path).format('csv').build())tab=t_env.from_path('source')

注册 sink(filesystem + canal-json 举例):

t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word',DataTypes.STRING()).column('count',DataTypes.BIGINT()).build()).option('path',output_path).format(FormatDescriptor.for_format('canal-json').build()).build())

说明:

  • Schema定义列结构
  • option('path', ...)指定输入/输出路径
  • format('csv')FormatDescriptor.for_format(...)指定格式
  • create_temporary_table创建临时表(作业生命周期内有效)

如果你不想写文件、想直接看结果,sink 用 print 更方便:

t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word',DataTypes.STRING()).column('count',DataTypes.BIGINT()).build()).build())

4.2 方式 B:execute_sql + DDL(更“SQL 工程化”)

很多团队更喜欢把表定义写成 DDL,方便复制到 SQL Client / SQL Gateway:

my_source_ddl=f""" create table source ( word STRING ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{input_path}' ) """my_sink_ddl=f""" create table sink ( word STRING, `count` BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'canal-json', 'path' = '{output_path}' ) """t_env.execute_sql(my_source_ddl)t_env.execute_sql(my_sink_ddl)

两种方式本质一样:最终都是把表注册进 TableEnvironment,供后续查询引用。

5. UDTF:把一行拆成多行(split)

WordCount 的关键在“切词”。这里用 UDTF(user-defined table function)实现一行输出多行:

@udtf(result_types=[DataTypes.STRING()])defsplit(line:Row):forsinline[0].split():yieldRow(s)

要点:

  • @udtf(result_types=[...])指定输出类型
  • yield Row(s)表示每个 token 输出一行
  • 输入参数lineRow,里面的line[0]是你那一列字符串

如果你后面需要更严格的分词(去标点、统一大小写、过滤空串等),都可以在这个 UDTF 里做。

6. Table API 计算:flat_map + group_by + count

计算逻辑非常直观:

tab.flat_map(split).alias('word')\.group_by(col('word'))\.select(col('word'),lit(1).count)\.execute_insert('sink')\.wait()

逐句解释:

  • flat_map(split):一行变多行(UDTF)
  • .alias('word'):给输出列命名为 word
  • group_by(col('word')):按单词分组
  • select(col('word'), lit(1).count):对每组计数
  • execute_insert('sink'):写入 sink 表
  • .wait():本地 mini cluster 跑 demo 时等待执行结束

注意:Table API 是惰性执行的。你不调用execute_insert,前面的链式调用只是“构建计划”,不会真正跑起来。

如果你提交到远程集群,通常不建议.wait()阻塞(尤其在脚本式提交时),具体取舍看你的提交方式和运行环境。

7. 完整可运行代码(与你给的示例一致)

你给的完整代码已经非常标准,包含:

  • 支持--input/--output
  • 如果没传 input 就用内置 Hamlet 文本
  • 如果没传 output 就用 print connector

运行方式:

python word_count.py

或指定文件:

python word_count.py--input/path/in.csv--output/path/out.json

输出示例里出现的:

+I[To, 1]

这类前缀(如+I)代表变更日志语义(RowKind),+I表示插入(INSERT)。当你用支持 changelog 的 format 或执行在流语义下时,这种表现会更常见。

8. 实战小贴士与常见坑

8.1 文件输入到底算批还是流

  • 文件本质是有界数据,更适合 batch mode
  • 但 streaming mode 也能跑 demo,只是你看到的行为(尤其输出语义、changelog)更偏流处理

建议:你做离线 ETL 就用 batch mode;做持续消费就用 streaming mode + 消息队列 source。

8.2 并行度导致“输出多个文件”

filesystem sink 在并行度 > 1 时,通常会产生多个分片文件(part-xxx)。demo 为了直观设置了parallelism.default=1,这不是必须,但很适合入门阶段验证结果。

8.3 format 的选择

  • 入门建议:source 用 csv、sink 用 print 或 csv/ json 先跑通
  • canal-json 更偏 changelog 语义展示,适合 CDC 或更新流结果的落地

8.4 split 的分词质量

line.split()是最简单的按空白切分。真实场景建议增加:

  • 全部转小写/大写
  • 去标点
  • 过滤空串
  • 处理中文分词(如果需要)

这些都可以在 UDTF 里一步到位。

9. 下一步你可以怎么扩展

当 WordCount 跑通后,你可以继续往真实生产写法靠:

  • Source 从 filesystem 换成 Kafka(实时流)
  • Sink 换成 Upsert Kafka / JDBC / Elasticsearch(看业务落地)
  • 增加 watermark、窗口聚合(如 1 分钟滚动词频)
  • 用 DDL 管理所有表定义,Table API 只写核心逻辑
http://www.jsqmd.com/news/204079/

相关文章:

  • 揭秘Dify容错机制:3步实现毫秒级故障恢复与稳定响应
  • 抖音/快手推广思路:剪辑‘震惊!15亿参数干翻百亿模型’片段
  • 实用指南:【Yandex 俄罗斯搜索引擎】第1课:初识Yandex与俄罗斯搜索市场
  • 项目经理长脑子捷径:拥有资本视角
  • 入驻GitCode开源榜单:提升项目曝光与信任背书
  • 2026年上海消防泵行业顶尖服务商综合评估报告 - 2025年品牌推荐榜
  • PyFlink Table API 读懂 Changelog、Table API 与 SQL 混用、结果输出与 EXPLAIN 计划
  • 编程竞赛辅助工具新选择:VibeThinker能否替代传统IDE插件?
  • 【2026最新】C语言编译器汇总,C语言编程软件推荐(15款,适合新手小白) - sdfsafafa
  • 想在陕西汉中农村盖房子,靠谱的自建房设计公司口碑推荐 - 苏木2025
  • 如何撰写爆款标题?参考这20个VibeThinker相关内容范例
  • 2026年广东铝伸缩杆厂家推荐:技术实力与行业适配度双维度实测TOP5排名 - 品牌推荐
  • 为什么说小参数模型是未来?VibeThinker带来全新思考方向
  • HuggingFace镜像网站之外的新选择:本地部署VibeThinker做算法竞赛训练
  • Dify描述生成效果差?,90%用户忽略的4个调优细节曝光
  • XMind2026最新破解版下载及安装使用教程
  • Docker容器总是“看似正常”?揭秘健康检查精准配置的3大核心参数
  • 前端——审批模板技术难点-复杂嵌套表单的状态管理
  • 谁是TOP1?陕西咸阳自建房设计公司评测排行榜 + 真实建房案例参考 - 苏木2025
  • 吸引精准用户:针对LeetCode、Codeforces人群的内容策略
  • 前端——审批模板技术难点-动态流程图的可视化设计
  • 2026年热门的挤压铝型材,散热器铝型材,工业铝型材厂家采购推荐名录 - 品牌鉴赏师
  • 如何用Docker Compose实现无缝发布?这才是生产环境的标准操作
  • SuperMap Hi-Fi 3D SDK for Unreal 如何修改模型选中高亮颜色
  • 陕西延安自建房设计公司哪家强?2026年最新权威靠谱测评榜单抢先看 - 苏木2025
  • 蜂鸣器等效电路模型:系统学习其电气特性
  • yolo11/yolov8/opencv 使用yolo11和yolov8分别训练混凝土裂缝检测数据集 建立基于深度学习YOLOV8/11框架混凝土缺陷检测系统
  • 为什么你的Dify系统总在关键时刻崩溃?一文看懂响应容错设计盲区
  • 智慧电力设备电网输电线输电线散股检测数据集 YOLOV8模型如何训练无人机电力设备输电线电网输电线散股检测数据集检测数据集 建立深度学习框架YOLOV8散股检测系统
  • Windows Cleaner终极指南:从系统诊断到性能飞跃的完整优化方案