当前位置: 首页 > news >正文

状态和水印

一、spark系统崩溃以后,重启时需要记得:
任务配置必须一致:重启时spark-submit的参数(如 Executor 数量、内存)、代码逻辑、检查点路径必须和崩溃前完全一致,否则会报 “检查点不兼容”;

正确代码:加checkpointLocation

query = parsed_df.writeStream
.foreachBatch(write_to_mysql)
.outputMode("append")
.option("checkpointLocation", "file:///tmp/spark_checkpoint") # 检查点记进度
.start()
第一步:处理 1-10 条数据,写入 MySQL 后,检查点会记录 “Kafka 偏移量到 10,这批数据已处理”;
第二步:崩溃重启后,Spark 读取检查点→知道 “已经处理到 10,该处理 11-20 条”→不会重复写入 1-10 条。

数据库写入支持幂等性
def write_to_mysql(batch_df, batch_id):
# 用INSERT ... ON DUPLICATE KEY UPDATE:重复则覆盖,不新增
batch_df.write.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "user_orders")
.option("user", "root")
.option("password", "123456")
.option("sql", """
INSERT INTO user_orders (order_id, user_id, amount, order_time)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE -- 幂等核心:重复则更新,不新增
user_id=VALUES(user_id),
amount=VALUES(amount),
order_time=VALUES(order_time)
""")
.mode("append")
.save()

输出模式:
append,upate,complete
append,一个窗口期内,每个输出新增不可变的数据,比如日志收集,不可用于聚合计算类数据输出,因为如果聚合类
update,一个窗口期内,每次输出变化的数据,比如聚合类订单金额总数,一个窗口10秒钟,10秒内金额总数不断增长
complete,一个窗口期内,全量输出数据,这种适合排名前十位数据显示,

水印:
核心作用:清理过期窗口状态,防止 OOM,是窗口聚合的 “标配”;
配置口诀:先加水印,后聚合;水印列 = 窗口列;时长 = 窗口 + 乱序缓冲;
平衡原则:水印时长不是越大越好,也不是越小越好 —— 刚好覆盖 “最大乱序时间” 即可(生产环境通常 5-15 分钟)。
水印的核心要求是:必须加在所有 “有状态操作” 之前,示例如下:
order_watermark_df = parsed_order_df.withWatermark("create_time", "15 minutes") # 水印核心行

4. Spark SQL:清洗+去重+聚合(仅核心逻辑)

order_watermark_df.createOrReplaceTempView("tmp_order")

清洗(过滤取消单)+ 去重(基于order_id)+ 关联商品品类(Hive极简维度表)

clean_sql = """
SELECT DISTINCT o.order_id, o.province, o.pay_amount, o.create_time, g.category_name
FROM tmp_order o
LEFT JOIN (SELECT goods_id, category_name FROM dwd.dim_goods_core WHERE is_valid=1) g
ON o.goods_id = g.goods_id
WHERE o.order_status != 'CANCEL' AND o.pay_amount > 0
"""
clean_order_df = spark.sql(clean_sql)
注意:parsed_order_df先加水印,再运行spark sql

迟到数据
迟到数据是指超过水印时间的数据,采用离线兜底策略,Spark 默认会直接丢弃迟到数据,但可以通过侧输出流把这些数据单独捕获,写入专门的 “迟到数据表”,给数据流标记 “迟到数据标签”,Spark 会把超过水印的订单分流到侧输出流,主流程仍按正常逻辑处理,侧输出流单独存储迟到数据,迟到的数据在第二天凌晨再计算一次,计算完以后将数据更新到

http://www.jsqmd.com/news/135238/

相关文章:

  • 【计算机毕业设计案例】基于springboot+vue的企业项目合同信息系统基于springboot的合同信息管理系统(程序+文档+讲解+定制)
  • Springboot小区物业管理系统ia0at(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 【开题答辩全过程】以 基于springboot的社区团购小程序设计与实现为例,包含答辩的问题和答案
  • 别再苦熬数月写论文了!8个免费AI神器20分钟搞定,文理医工全覆盖
  • 正弦曲线的形成过程 | JsxGraph 代码
  • AI大模型入门到进阶:9步掌握AI应用开发核心技术,零基础也能学会!
  • 【商志考研英语】【2001】【part4】
  • 基于python的高校就业管理系统的设计和实现--论文pycharm django vue flask
  • 【毕业设计】基于springboot的合同信息管理系统(源码+文档+远程调试,全bao定制等)
  • c++字符串
  • 一个现代化的资产安全管理平台,致力于实现资产探测自动化与风险可视化
  • AI Agent记忆工程完全指南:从上下文到智能协作
  • 基于PowerWorld的风电场仿真与计算
  • B站视频下载终极指南:BilibiliDown完整使用教程
  • AI大模型应用开发入门:算法不再是唯一门槛,两种方向任你选
  • MCP+Agent+RAG:打造能说会做的下一代智能系统架构
  • 基于双层优化的电动汽车优化调度MATLAB代码探秘
  • Java计算机毕设之基于springboot的合同信息管理系统基于Springboot框架的企业合同管理系统设计与实现(完整前后端代码+说明文档+LW,调试定制等)
  • 【程序员必藏】AI大模型与Agent智能体开发实战:3天掌握高薪技术,重塑核心竞争力
  • 2025 最新!10个AI论文平台测评:继续教育写论文痛点全解析
  • 大模型开发全流程:8个关键步骤带你从入门到实践
  • vid coding - spec kit 工具链
  • Java毕设选题推荐:基于springboot+vue的企业合同管理系统基于springboot的合同信息管理系统【附源码、mysql、文档、调试+代码讲解+全bao等】
  • 关于个人服务器配置论坛功能的实现,以及一些出现的问题的解决办法
  • WordPress插件零日漏洞研究:静态代码分析入门指南
  • AI Ping新旗舰免费模型实战解析:GLM-4.7与MiniMax M2.1
  • 2025专科生必备9个降AI率工具测评榜单
  • AI Ping新旗舰免费模型实战解析:GLM-4.7与MiniMax M2.1
  • 【课程设计/毕业设计】基于springboot的合同信息管理系统基于springboot企业合同管理系统【附源码、数据库、万字文档】
  • 2025年AI记忆架构转折点:Agent记忆与RAG的终极对决,收藏这篇技术选型指南