当前位置: 首页 > news >正文

python里与spark相关的语法介绍

在 Python 中使用 Spark,主要通过PySpark库来实现。它是 Apache Spark 的 Python API,语法风格与 Pandas 有相似之处,但核心思想是分布式计算惰性求值

以下是 PySpark 核心语法的系统性介绍(基于目前主流的 Spark 3.x / 4.x 版本):

1. 入口:SparkSession

所有 PySpark 程序的起点,替代了旧版的SparkContext

frompyspark.sqlimportSparkSession spark=(SparkSession.builder.appName("MyApp").master("local[*]")# 本地测试用,生产环境由集群管理器指定.config("spark.some.config","value").getOrCreate())

2. DataFrame 创建

DataFrame 是 PySpark 最核心的数据结构,等价于分布式表。

方式代码示例
从文件读取spark.read.parquet("/path")/.csv()/.json()/.table("db.tbl")
从列表创建spark.createDataFrame([(1,"a"),(2,"b")], ["id","name"])
从 Pandas 转换spark.createDataFrame(pandas_df)
SQL 查询创建spark.sql("SELECT * FROM table WHERE dt='2026-06-30'")

3. 核心变换语法 (Transformations)

⚠️关键概念:惰性求值。以下操作不会立即执行,只是记录逻辑计划,直到遇到 Action 操作才真正触发计算。

3.1 列操作 (pyspark.sql.functions)

这是 PySpark 最常用的模块,通常简写为F

frompyspark.sqlimportfunctionsasF df.select(F.col("name"),# 引用列F.lit(2026).alias("year"),# 常量列F.concat_ws("-",F.col("y"),F.col("m")),# 字符串拼接F.when(F.col("age")>18,"adult").otherwise("minor"),# 条件判断F.date_add(F.current_date(),-7),# 日期函数F.explode(F.col("tags"))# 数组展开)
3.2 常用 DataFrame 方法
# 过滤df.filter(F.col("amount")>100)df.where("region = 'cn' AND dt = '2026-06-30'")# 也支持SQL表达式字符串# 聚合df.groupBy("region").agg(F.sum("amount").alias("total"),F.countDistinct("user_id").alias("uv"))# 关联df1.join(df2,on="user_id",how="left")df1.join(df2,on=(df1.id==df2.uid)&(df1.dt==df2.dt),how="inner")# 窗口函数frompyspark.sql.windowimportWindow w=Window.partitionBy("dept").orderBy(F.desc("salary"))df.withColumn("rank",F.row_number().over(w))# 其他df.dropDuplicates(["user_id"])# 去重df.na.fill(0,subset=["amount"])# 空值填充df.repartition(10,"dt")# 重分区(影响Shuffle)df.cache()/df.persist()# 缓存到内存/磁盘

4. 动作语法 (Actions)

触发实际计算并返回结果到 Driver 端:

df.show(20,truncate=False)# 打印前N行df.count()# 计数df.collect()# ⚠️ 拉取全部数据到Driver,大表慎用!df.toPandas()# ⚠️ 转为Pandas DataFrame,同样慎用df.write.parquet("/output")# 写出文件df.createOrReplaceTempView("tmp")# 注册临时视图供SQL使用

5. Spark SQL 语法

如果不习惯 DataFrame API,可以直接写 SQL:

df.createOrReplaceTempView("sales")result=spark.sql(""" SELECT region, SUM(amount) as total FROM sales WHERE dt = '2026-06-30' GROUP BY region HAVING total > 10000 ORDER BY total DESC """)

6. UDF(用户自定义函数)

当内置函数无法满足需求时使用,但性能较差(Python 与 JVM 间有序列化开销),应优先使用内置函数或 Pandas UDF。

# 普通 UDF(逐行处理,慢)@F.udf("string")defmask_phone(phone):returnphone[:3]+"****"+phone[7:]ifphoneelseNone# Pandas UDF / Vectorized UDF(向量化,快10-100倍)✅ 推荐@F.pandas_udf("double")defnormalize(s:pd.Series)->pd.Series:return(s-s.mean())/s.std()

7. PySpark vs Pandas 语法速查对照

操作PandasPySpark
选列df[["a","b"]]df.select("a","b")
过滤df[df.age>18]df.filter(F.col("age")>18)
新增列df["new"] = ...df.withColumn("new", ...)
重命名df.rename(columns={})df.withColumnRenamed("old","new")
分组聚合df.groupby().agg()df.groupBy().agg()
排序df.sort_values()df.orderBy()/df.sort()
采样df.sample(frac=0.1)df.sample(fraction=0.1)

💡 最佳实践与避坑指南

  1. 避免collect()/toPandas():除非确认数据量很小(< 几GB),否则会导致 Driver OOM。调试时用show()limit(100).toPandas()
  2. 优先用内置函数pyspark.sql.functions里的函数在 JVM 端执行,比 Python UDF 快几个数量级。
  3. 注意 ShufflejoingroupByrepartitiondistinct都会触发 Shuffle,是性能瓶颈。尽量用广播 Join(F.broadcast(small_df))减少 Shuffle。
  4. 合理设置分区数:分区太少导致并行度不足,太多导致任务调度开销大。一般每个分区 128MB~256MB 为宜。
  5. 类型安全:PySpark 是强类型的,字符串"123"和整数123不能直接比较/关联,需用F.col().cast()显式转换。
  6. Spark Connect (Spark 4.x 新特性):如果你使用的是较新版本,推荐使用 Spark Connect 客户端模式,将 Driver 与 Cluster 解耦,支持更好的 IDE 补全和远程开发体验。
http://www.jsqmd.com/news/1100155/

相关文章:

  • 如何保护您的 Reddit 账户:2026 年全方位安全运行Reddit账户
  • 什么是酒吧大屏互动系统?软硬件、功能模块、营收能力一文详解
  • 通俗易懂!三种解法彻底吃透【轮转数组】(LeetCode189)
  • 2026物理AI元年已至,自动驾驶企业该重概念还是重落地?
  • Linux基础常用命令实操指南
  • 快上车!掌握多尺度Mamba新方法,快人一步发文章
  • 监控与可观察性开源平台 Grafana 13.0.3 发布,多项特性增强与 Bug 修复!
  • PC+移动端双端测试:功能、兼容、一致性+排期
  • 智慧校园技术改造实战:智能锁身份核验+通断电联动,解决校园安全与运维痛点
  • 2026国产AI写歌工具横评 商用合规与效果实测
  • 加密数据分析实战:从识别到解密的系统性方法
  • 3个ComfyUI中文工作流常见问题及解决方案:从困惑到精通
  • 从亚麻布到汽车音响:为什么喇叭音盆材料会影响声音?
  • 圆满收官|VeryCloud亮相2026亚马逊云科技中国峰会,AI实践获行业积极反馈
  • TokUI:面向AI场景的流式UI框架
  • 卡尔曼滤波在桥区船舶航行轨迹预判中的工程落地实践
  • 从文本 Agent 到具身 Agent:一场关于数字人认知的底层重构
  • 本地 AI 自动化工具 OpenClaw 部署全流程,附常见故障修复(含安装包)
  • 大众点评数据2026
  • AI Agent 实战部署指南:从核心能力到接口测试的完整流程
  • 翻译毕业证需提供哪些材料?翻译毕业证如何办理?
  • 接纳孩子的平凡,是父母最高级的通透
  • CosyVoice 双向流式 streamingCall() — 前后端总体方案
  • 【JAVA八股文第一章-JVM内存模型】
  • HDFS的文件的读写流程及常用命令
  • 01 · 当 AI 学会“按规矩办事“——规范驱动 Agent 工作流总览
  • 终极指南:如何快速上手MoeKoe Music开源酷狗音乐客户端
  • 从零到一:如何用Citizens2打造沉浸式Minecraft服务器体验
  • 基于改进YOLOv8与无人机的电动自行车违规行为智能检测系统
  • GitLab架构演进:应对AI时代代码分析与高并发挑战