当前位置: 首页 > news >正文

DataGen Connector本地造数神器(不用 Kafka 也能把 Pipeline 跑起来)

1、它到底做了什么

  • Source 并行运行:有多少个 source 并发子任务,就把Long的序列切成多少段(sub-sequence)
  • 你提供一个GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型
  • 每个 subtask 内部有序,但全局顺序取决于并行度(parallelism)

一句话:Flink 负责发 index,你负责把 index 变成事件。

2、最小可跑示例:生成 0~999 的字符串

importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}

要点:

  • 并行度为 1 时输出是严格"Number: 0""Number: 999"顺序
  • 并行度 > 1 时:每个 subtask 内部仍然按序,但不同 subtask 的结果交织输出

3、限速:控制总吞吐(全局每秒不超过 N 条)

importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);

适用场景:

  • 你想模拟“上游流量”但又不想把本机打爆
  • 做算子性能对比、Backpressure 观察、checkpoint 行为观察

4、有界/无界:它“永远是 bounded”,但可以“看起来无界”

  • 语义上永远是 bounded(理论上会结束)
  • numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)

建议:

  • 要跑有限数据:考虑 BATCH mode,更贴近离线回放
  • 要模拟持续输入:用Long.MAX_VALUE+ rate limit

5、容错语义:at-least-once / end-to-end exactly-once 能不能保证?

可以,但有个硬条件:

  • GeneratorFunction必须对输入 index 完全确定性
    也就是:同一个 index 永远生成同样的输出。

反例(会破坏确定性):

  • random()System.currentTimeMillis()、读外部可变配置、读网络请求结果

正确做法:

  • 用 index 推导数据(例如 hash(index) 生成用户、金额、状态)
  • 或者用固定 seed 的伪随机:new Random(index)(每个 index 固定)

6、Watermark:也可以在 Source 侧发“确定性水位线”

默认例子用noWatermarks(),但你完全可以:

  • 在生成事件里带 eventTime
  • 配合自定义WatermarkStrategy生成 deterministic watermarks
    适合做 event-time 窗口、乱序、迟到数据的测试演示。
http://www.jsqmd.com/news/252728/

相关文章:

  • Dynamic Kafka Source不重启也能“动态切换集群/主题”
  • “棋圣”聂卫平去世 享年74岁
  • Tailwind CSS 4.1:终于把“文字阴影”端上来了——更狠的是,它把配置文件也“踢”出去了
  • 面试官:谈谈 Redis 的过期策略?
  • 载入史册的哈军工计算机“集结号”
  • 2026黄金戒指怎么选?推荐这7款:款式多样,佩戴舒适!
  • 【风控】Boost和Bagging
  • 你以为日期选择器很简单?我刚进团队就被它狠狠干了一周
  • 美防长访问“星舰基地”透露哪些信息
  • 吐血推荐10个AI论文写作软件,专科生搞定毕业论文+格式规范!
  • 几个硬件与人机环境系统智能的问题
  • 浪潮信息和Datawhale成功举办「AI+X」高校人才培养研讨会!
  • 机器人学习!(二)ROS-模型优化与加速(TensorRT)(4)2026/01/15
  • Spring Boot 钩子全集实战(六):SpringApplicationRunListener.contextPrepared()详解
  • 技术日报|Claude Code超级能力登顶,今日狂揽2000+星标
  • HoRain云--掌握jQuery事件处理全攻略
  • RyTuneX(Win10/11系统优化工具)
  • 普洛斯集团任命赵明琪为普洛斯中国首席执行官
  • 从脚本到服务:5 分钟通过 Botasaurus 将你的爬虫逻辑转化为 Web API
  • HoRain云--jQuery淡入淡出特效全解析
  • vue基于Python软件整合网站 flask django Pycharm
  • OTG最小改动!
  • HoRain云--JavaScript Switch语句详解与最佳实践
  • HoRain云--JavaScript while循环:从入门到精通
  • vue基于Python 最美夕阳红老人服务站网站 flask django Pycharm
  • 数字化转型大数据安全方案(PPT)
  • HoRain云--jQuery安装全指南:从CDN到本地
  • 导师推荐9个AI论文软件,助你轻松搞定本科毕业论文!
  • HoRain云--jQuery选择器全解析:高效定位DOM元素
  • HoRain云--jQuery 语法