当前位置: 首页 > news >正文

Flink ML K-Means 离线聚类 + 在线增量聚类(mini-batch + decayFactor)

一、K-Means(离线版):有限数据上的迭代聚类

1)输入列(Input Columns)

参数名类型默认值说明
featuresColVector"features"特征向量

2)输出列(Output Columns)

参数名类型默认值说明
predictionColInteger"prediction"预测所属簇 ID(簇中心编号)

3)参数(Parameters)详解

KMeansModel(预测侧)参数
Key默认值类型说明
distanceMeasureEuclideanDistanceMeasure.NAMEString距离度量(当前支持欧式距离)
featuresCol"features"String特征列名
predictionCol"prediction"String输出列名
k2Integer簇数量(最大聚类数)
KMeans(训练侧)额外参数
Key默认值类型说明
initMode"random"String初始化方式(当前支持 random)
seednullLong随机种子(保证可复现)
maxIter20Integer最大迭代次数

4)Java 示例代码解读(离线 KMeans)

示例流程很标准:

  1. 构造输入数据(DenseVector 流)
  2. DataStream → Table,并命名为features
  3. kmeans.fit(table)训练得到KMeansModel
  4. model.transform(table)输出每条数据的簇 ID
  5. collect 打印 features + clusterId

关键代码片段:

DataStream<DenseVector>inputStream=env.fromElements(Vectors.dense(0.0,0.0),Vectors.dense(0.0,0.3),Vectors.dense(0.3,0.0),Vectors.dense(9.0,0.0),Vectors.dense(9.0,0.6),Vectors.dense(9.6,0.0));TableinputTable=tEnv.fromDataStream(inputStream).as("features");KMeanskmeans=newKMeans().setK(2).setSeed(1L);KMeansModelkmeansModel=kmeans.fit(inputTable);TableoutputTable=kmeansModel.transform(inputTable)[0];

输出打印(预测列是 Integer):

DenseVectorfeatures=(DenseVector)row.getField(kmeans.getFeaturesCol());intclusterId=(Integer)row.getField(kmeans.getPredictionCol());

二、Online K-Means:无界流上的持续聚类(mini-batch + 遗忘)

1)为什么需要 Online K-Means?

离线 KMeans 训练出来的中心是“固定”的。
但很多业务数据分布会随时间变化,例如:

  • 用户行为习惯变了
  • 商品/内容热点变化
  • 流量来源变化

这时你希望模型能“持续学习”,让聚类中心跟着数据漂移而更新,就需要 Online K-Means。

2)Online K-Means 的核心思想(mini-batch + decayFactor)

Online K-Means 基于“mini-batch KMeans”的更新规则,并加入遗忘机制(decay):

  • 每次从训练流中积累一个 mini-batch
  • 基于这个 batch 计算临时中心(estimated centroids)
  • 用加权平均更新旧中心(original centroids):

decayFactor 解释(非常关键)

  • decayFactor = 1:历史与新数据同等重要(几乎不遗忘)
  • decayFactor = 0:完全由最新数据决定中心(强遗忘)
  • 值越小 → 遗忘越强 → 模型越“跟新”
  • 值越大 → 趋于稳定 → 变化越慢

3)输入输出列(Online)

输入列同离线:

参数名类型默认值说明
featuresColVector"features"特征向量

输出列同离线:

参数名类型默认值说明
predictionColInteger"prediction"所属簇 ID

4)参数(OnlineKMeans)详解

OnlineKMeansModel(预测侧)
Key默认值类型说明
distanceMeasureEuclideanDistanceMeasure.NAMEString距离度量(欧式距离)
featuresCol"features"String特征列名
predictionCol"prediction"String输出列名
k2Integer簇数量
OnlineKMeans(训练侧)额外参数
Key默认值类型说明
batchStrategyCOUNT_STRATEGYStringmini-batch 构造策略
globalBatchSize32Integer全局 batch 大小
decayFactor0.0Double遗忘系数(历史中心贡献缩放)
seednullLong随机种子

5)Java 示例代码解读(OnlineKMeans)

示例里做了非常“演示型”的设计:训练数据分两段周期性出现,观察聚类结果如何随时间变化。

(1)训练流是无限流,周期性吐两批不同分布的数据
  • trainData1:大致在 (0~10) 附近
  • trainData2:分布跳到了 (10,100) 与 (-10,-100) 两块

这等于让数据分布发生“漂移”,你就能看到在线聚类中心被新数据影响。

(2)predict 也是周期性吐同一组预测点
List<Row>predictData=Arrays.asList(Row.of(Vectors.dense(10.0,10.0)),Row.of(Vectors.dense(-10.0,10.0)));

输出里会不停打印:

  • 两个点是否被分到同一个簇
    因为随着训练数据改变、中心改变,聚类结果可能随时间变化。
(3)初始化模型数据 initialModelData

在线聚类必须有初始中心,否则没法开始迭代。示例使用:

.setInitialModelData(KMeansModelData.generateRandomModelData(tEnv,2,2,0.0,0))

含义是:

  • k=2 个中心
  • 每个中心 2 维
  • 随机生成初始中心
(4)globalBatchSize=6:每 6 条数据更新一次中心
.setGlobalBatchSize(6)

这与训练数据每批 6 条刚好对应,便于演示“每批更新一次”的效果。

三、离线 KMeans vs 在线 OnlineKMeans:怎么选?

选离线 KMeans 的典型场景

  • 你有明确的历史数据窗口(按天、按周)
  • 模型周期性训练发布,追求稳定可控
  • 线上只是推理(transform),不希望训练影响延迟

选 OnlineKMeans 的典型场景

  • 数据持续流入且分布变化快
  • 你希望模型能持续适应新模式(概念漂移)
  • 你可以接受聚类结果随时间变化

四、实战建议(非常重要)

1)KMeans 之前强烈建议做标准化

KMeans 基于距离(欧式距离),特征尺度不同会导致“某个维度支配聚类”。典型做法:

  • VectorAssembler(拼特征)
  • StandardScaler(标准化)
  • KMeans / OnlineKMeans

2)k 的选择不要拍脑袋

常见方法:

  • 肘部法(Elbow)
  • 轮廓系数(Silhouette)
  • 结合业务可解释性(比如用户分群常选 5/8/10)

3)OnlineKMeans 的 decayFactor 是控制“跟新程度”的旋钮

简单经验:

  • 数据分布很稳定:decayFactor 接近 1
  • 数据漂移明显:decayFactor 取 0.1~0.5 让模型更灵活
  • 想快速跟随热点:decayFactor 更小

4)batch size 与更新频率要结合吞吐与稳定性

  • batch 小:更新快但抖动大
  • batch 大:更稳定但响应慢

五、小结

Flink ML 的 KMeans 家族可以覆盖绝大多数“聚类/分群”需求:

  • KMeans(离线):有限数据、迭代训练、中心稳定
  • OnlineKMeans(在线):无界流、mini-batch 更新、支持遗忘机制

掌握了k / maxIter / globalBatchSize / decayFactor这些关键参数,你就能把聚类从“demo”落到“线上可用”。

http://www.jsqmd.com/news/156166/

相关文章:

  • C语言函数详解
  • 基于YOLOv11的跌倒识别检测系统(YOLOv11深度学习+YOLO数据集+UI界面+登录注册界面+Python项目源码+模型)
  • 基于YOLOv12的风力叶片缺陷识别检测系统(YOLOv12深度学习+YOLO数据集+UI界面+登录注册界面+Python项目源码+模型)
  • 基于PyTorch-CUDA-v2.6镜像搭建YOLOv11目标检测训练环境
  • HuggingFace镜像网站推荐,加速transformers库下载
  • 计算机毕业设计springboot北罗镇中学校务通管理系统 基于SpringBoot的乡镇中学校园综合信息管理平台 面向乡村教育的轻量化校务协同系统
  • Conda install pytorch 总是失败?看看这些避坑指南
  • 指针作为函数参数
  • 基于PyTorch-CUDA镜像的多卡并行训练实践分享
  • 第 5 课:Python 高级数据容器与文件操作 —— 数据去重、有序存储与持久化核心
  • 西门子S7 - 1200 PLC双轴定位算法在电池焊接控制中的应用
  • 词法分析器是编译程序的基础模块,其构造逻辑基于正规式与有限自动机理论
  • TinyMCE6处理政府公文word图片转存需求
  • Jupyter Notebook保存为PDF/HTML,方便分享AI研究成果
  • PyTorch Dataset类自定义数据集读取方法
  • H. Blackslex and Plants
  • ‌解锁速度:CI/CD中的云测试集成
  • Anaconda虚拟环境中安装PyTorch-GPU的正确姿势
  • 针对认知无人机通信中的频谱感知问题,提出了一种时空加权协作频谱感知检测器
  • 压电促动式气浮间隙调节机构设计与性能分析
  • ‌云测试与AI的融合创新
  • Jupyter Lab集成PyTorch环境,边训练边写技术文档
  • 彼得林奇的“价值陷阱“避免方法
  • 生成式AI重塑云端测试数据生态:技术突破与行业实践
  • PyTorch-CUDA基础镜像安全加固措施说明
  • 探索二极管箝位型三电平逆变器(NPC)的奥秘
  • python Manim 制作科普动画!
  • Git reset撤销错误提交,保护PyTorch项目历史
  • 移动测试的变革与工具选型挑战
  • DLP 高精度智造典范:Raise3D 3D 打印机,定义精密制造新标准