当前位置: 首页 > news >正文

精简数据管道:如何使用 PySpark 和 WhyLogs 进行高效的数据分析和验证

原文:towardsdatascience.com/streamline-data-pipelines-how-to-use-whylogs-with-pyspark-for-data-profiling-and-validation-544efa36c5ad?source=collection_archive---------3-----------------------#2024-01-07

https://medium.com/@sarbahi.sarthak?source=post_page---byline--544efa36c5ad--------------------------------https://towardsdatascience.com/?source=post_page---byline--544efa36c5ad-------------------------------- Sarthak Sarbahi

·发表于Towards Data Science ·阅读时间:9 分钟·2024 年 1 月 7 日

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/30d8f37b895178790b25f83e1ab4662b.png

图片来自Evan Dennis提供的Unsplash

数据管道,由数据工程师或机器学习工程师创建,不仅仅是为了准备报告数据或训练模型。确保数据的质量同样至关重要。如果数据随着时间变化,你可能会得到意料之外的结果,这样是不好的。

为了避免这种情况,我们常常使用数据分析和数据验证技术。数据分析为我们提供关于数据集中不同列的统计信息。数据验证检查是否存在错误,将实际数据与预期数据进行比较。

一个很棒的工具是whylogs。它可以让你记录各种数据。记录后,你可以创建whylogs 配置文件。这些配置文件帮助你跟踪数据的变化,设置规则以确保数据的正确性,并以简便的方式展示汇总统计数据。

在这篇博客中,你将学习如何将 whylogs 与 PySpark 配合使用。我们将通过一个实践指南来讲解如何进行数据分析和验证。让我们开始吧!

目录

  1. whylogs 的组件

  2. 环境设置

  3. 理解数据集

  4. 开始使用 PySpark

  5. 使用 whylogs 进行数据分析

  6. 使用 whylogs 进行数据验证

whylogs 的组件

让我们首先理解 whylogs 的重要特性。

这些就是我们需要了解的关于 whylogs 的所有信息。如果你想了解更多,我鼓励你查看文档。接下来,让我们开始为教程设置环境。

环境设置

我们将在本教程中使用 Jupyter notebook。为了让我们的代码在任何地方都能运行,我们将在 Docker 中使用 JupyterLab。这个设置会安装所有所需的库,并准备好示例数据。如果你是 Docker 新手并想学习如何设置 Docker,请查看这个链接。

[## GitHub - sarthak-sarbahi/whylogs-pyspark

通过在 GitHub 上创建账户,贡献于 sarthak-sarbahi/whylogs-pyspark 的开发。

github.com](https://github.com/sarthak-sarbahi/whylogs-pyspark/tree/main?source=post_page-----544efa36c5ad--------------------------------)

从这里下载示例数据(CSV)。这些数据将用于数据简介和验证。创建一个data文件夹在项目的根目录下,并将 CSV 文件保存到该文件夹中。接下来,在相同的根目录下创建一个Dockerfile

本教程的 Dockerfile(图片来自作者)

这个 Dockerfile 是一组创建特定环境的指令,用于本教程。我们来逐步解析它:

到目前为止,你的项目目录应该是这样的。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/d46865880372332c87ffd2efd2e6fc44.png

在 VS Code 中的项目目录(图源:作者)

太棒了!现在,让我们构建一个 Docker 镜像。为此,请在终端中输入以下命令,确保你位于项目的根文件夹中。

docker build-t pyspark-whylogs.

这个命令创建了一个名为pyspark-whylogs的 Docker 镜像。你可以在Docker Desktop应用的“镜像”标签中看到它。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/b227c143a7d68aa6f704308ea8bb051f.png

构建的 Docker 镜像(图源:作者)

下一步:让我们运行这个镜像来启动 JupyterLab。请在终端中输入另一个命令。

docker run-p8888:8888pyspark-whylogs

这个命令从pyspark-whylogs镜像启动一个容器。它确保你可以通过计算机的 8888 端口访问 JupyterLab。

运行这个命令后,你会在日志中看到一个类似于这样的 URL:http://127.0.0.1:8888/lab?token=your_token。点击该链接以打开 JupyterLab Web 界面。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/a6b6590371d0150b205222e6f1e10cf7.png

Docker 容器日志(图源:作者)

太棒了!一切已经为使用 whylogs 设置好了。现在,让我们了解一下我们将要处理的数据集。

理解数据集

我们将使用一个关于医院患者的数据集。该文件名为patient_data.csv,包含 100k 行数据,列包括:

关于这个数据集的来源,别担心。它是由 ChatGPT 创建的。接下来,让我们开始编写一些代码。

开始使用 PySpark

首先,在 JupyterLab 中打开一个新的 notebook。记得在开始工作之前保存它。

[## whylogs-pyspark/whylogs_pyspark.ipynb at main · sarthak-sarbahi/whylogs-pyspark

通过在 GitHub 上创建账户,为 sarthak-sarbahi/whylogs-pyspark 项目的开发做出贡献。

github.com

我们将首先导入所需的库。

# Import librariesfromtypingimportAnyimportpysparkfrompyspark.sqlimportSparkSessionimportpyspark.sql.functionsasFfromwhylogs.api.pyspark.experimentalimportcollect_column_profile_viewsfromwhylogs.api.pyspark.experimentalimportcollect_dataset_profile_viewfromwhylogs.core.metrics.condition_count_metricimportConditionfromwhylogs.core.relationsimportPredicatefromwhylogs.core.schemaimportDeclarativeSchemafromwhylogs.core.resolversimportSTANDARD_RESOLVERfromwhylogs.core.specialized_resolversimportConditionCountMetricSpecfromwhylogs.core.constraints.factoriesimportcondition_meetsfromwhylogs.core.constraintsimportConstraintsBuilderfromwhylogs.core.constraints.factoriesimportno_missing_valuesfromwhylogs.core.constraints.factoriesimportgreater_than_numberfromwhylogs.vizimportNotebookProfileVisualizerimportpandasaspdimportdatetime

然后,我们将设置一个 SparkSession。这让我们能够运行 PySpark 代码。

# Initialize a SparkSessionspark=SparkSession.builder.appName('whylogs').getOrCreate()spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

之后,我们将通过读取 CSV 文件来创建一个 Spark 数据框。我们还会检查它的架构。

# Create a dataframe from CSV filedf=spark.read.option("header",True).option("inferSchema",True).csv("/home/patient_data.csv")df.printSchema()

接下来,让我们先看一下数据。我们将查看数据框中的第一行。

# First row from dataframedf.show(n=1,vertical=True)

既然我们已经查看了数据,现在是时候开始使用 whylogs 进行数据分析了。

使用 whylogs 进行数据分析

为了对数据进行分析,我们将使用两个函数。首先是collect_column_profile_views。这个函数为数据框中的每一列收集详细的分析配置。这些配置为我们提供统计信息,比如计数、分布等,具体取决于我们如何设置 whylogs。

# Profile the data with whylogsdf_profile=collect_column_profile_views(df)print(df_profile)

数据集中的每一列都会在字典中获取一个ColumnProfileView对象。我们可以检查每列的各种指标,比如它们的均值。

whylogs 会查看每个数据点,并通过统计方式决定该数据点是否与最终计算相关。

例如,让我们看看height的平均值。

df_profile["height"].get_metric("distribution").mean.value

接下来,我们还将直接从数据框中计算均值以进行对比。

# Compare with mean from dataframedf.select(F.mean(F.col("height"))).show()

然而,仅仅逐列进行数据分析并不总是足够的。因此,我们使用另一个函数,collect_dataset_profile_view。这个函数对整个数据集进行分析,而不仅仅是单列。我们可以将其与 Pandas 结合,分析所有分析指标。

# Putting everything togetherdf_profile_view=collect_dataset_profile_view(input_df=df)df_profile_view.to_pandas().head()

我们还可以将这个分析结果保存为 CSV 文件,以备后用。

# Persist profile as a filedf_profile_view.to_pandas().reset_index().to_csv("/home/jovyan/patint_profile.csv",header=True,index=False)

/home/jovyan文件夹位于我们的 Docker 容器中,来自Jupyter 的 Docker 镜像堆栈(包含 Jupyter 应用程序的现成 Docker 镜像)。在这些 Docker 设置中,‘jovyan’ 是运行 Jupyter 的默认用户。/home/jovyan文件夹是 Jupyter 笔记本通常启动的位置,也是您应将文件放置在其中以便在 Jupyter 中访问的地方。

就这样,我们使用 whylogs 对数据进行分析。接下来,我们将探索数据验证。

使用 whylogs 进行数据验证

对于我们的数据验证,我们将执行以下检查:

现在,让我们开始吧。whylogs 中的数据验证从数据分析开始。我们可以使用collect_dataset_profile_view函数来创建分析配置,就像我们之前看到的那样。

然而,这个函数通常会创建一个带有标准指标的分析配置,比如均值和计数。但如果我们需要检查列中的单个值,而不是对比其他约束条件,这时就可以使用条件计数指标。它就像是向我们的分析配置中添加了一个自定义的指标。

让我们为visit_date列创建一个检查,验证每一行。

defcheck_date_format(date_value:Any)->bool:date_format='%Y-%m-%d'try:datetime.datetime.strptime(date_value,date_format)returnTrueexceptValueError:returnFalsevisit_date_condition={"is_date_format":Condition(Predicate().is_(check_date_format))}

一旦我们有了条件,就将其添加到分析配置中。我们使用标准架构并添加自定义检查。

# Create condition count metricschema=DeclarativeSchema(STANDARD_RESOLVER)schema.add_resolver_spec(column_name="visit_date",metrics=[ConditionCountMetricSpec(visit_date_condition)])

然后,我们使用标准指标和我们为visit_date列创建的自定义新指标重新创建了配置文件。

# Use the schema to pass to logger with collect_dataset_profile_view# This creates profile with standard metrics as well as condition count metricsdf_profile_view_v2=collect_dataset_profile_view(input_df=df,schema=schema)

在我们的配置文件准备好之后,我们可以为每一列设置验证检查。

builder=ConstraintsBuilder(dataset_profile_view=df_profile_view_v2)builder.add_constraint(no_missing_values(column_name="patient_id"))builder.add_constraint(condition_meets(column_name="visit_date",condition_name="is_date_format"))builder.add_constraint(greater_than_number(column_name="weight",number=0))constraints=builder.build()constraints.generate_constraints_report()

我们还可以使用 whylogs 生成这些检查的报告。

# Visualize constraints report using Notebook Profile Visualizervisualization=NotebookProfileVisualizer()visualization.constraints_report(constraints,cell_height=300)

它将生成一个 HTML 报告,显示哪些检查通过,哪些失败。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/92e9b55579654c4f34af3e2310e9ad42.png

whylogs 约束报告(图片来源:作者)

这是我们发现的内容:

让我们再次检查数据框中的这些发现。首先,我们用 PySpark 代码检查visit_date格式。

# Validate visit_date columndf \.withColumn("check_visit_date",F.to_date(F.col("visit_date"),"yyyy-MM-dd"))\.withColumn("null_check",F.when(F.col("check_visit_date").isNull(),"null").otherwise("not_null"))\.groupBy("null_check")\.count()\.show(truncate=False)+----------+-----+|null_check|count|+----------+-----+|not_null|98977||null|1023|+----------+-----+

它显示,100,000 行中有 1023 行不符合我们的日期格式。接下来是weight列。

# Validate weight columndf \.select("weight")\.groupBy("weight")\.count()\.orderBy(F.col("weight"))\.limit(1)\.show(truncate=False)+------+-----+|weight|count|+------+-----+|0|2039|+------+-----+

再次,我们的发现与 whylogs 一致。几乎有 2,000 行的权重为零。这也结束了我们的教程。你可以在这里找到本教程的笔记本。

结论

在本教程中,我们介绍了如何在 PySpark 中使用 whylogs。我们首先使用 Docker 准备了环境,然后对我们的数据集进行了数据分析和验证。记住,这只是开始。Whylogs 提供了更多功能,从机器学习中的数据变化(数据漂移)追踪,到实时流中的数据质量检查。

我真诚地希望这篇指南对你有所帮助。如果你有任何问题,请随时在下面的评论中提出。

参考文献

http://www.jsqmd.com/news/757784/

相关文章:

  • UAV Log Viewer:一站式无人机日志分析与可视化专业工具
  • 4大核心技术突破:DXVK Vulkan转换层的高效优化实战指南
  • 收藏!小白程序员转行AI必看:核心岗位、薪资与进阶指南
  • 从无人机航拍到古迹数字化:聊聊SFM技术在实际项目中的踩坑与优化
  • Claude API拦截器:优化大模型交互的轻量级中间件实践
  • 苏州鼎轩废旧电子产品:昆山诚信的工厂电子垃圾回收公司推荐几家 - LYL仔仔
  • 闲置京东e卡回收,轻松变现不浪费 - 京顺回收
  • 简化物业数据管理:使用 Indexify 进行高级数据提取与检索
  • SVPWM仿真进阶:从‘马鞍波’到‘羊角波’,深入理解扇区判断与时间分配的逻辑差异
  • 大模型革命:小白程序员必备指南,收藏学习未来技能!
  • Minecraft区块修复工具终极指南:5大场景教你如何拯救损坏存档
  • 使用taotoken后大模型api调用的延迟与稳定性实际体验观察
  • 睿家诚家具维修:常熟靠谱的软硬包装饰定制施工公司找哪家 - LYL仔仔
  • AI驱动SEO的关键词优化实践与策略探索
  • 统帅五一销售战报:懒人三筒霸榜双料,多品类高增领跑年轻家电市场 - 速递信息
  • 24美元比特币USB矿机实测与挖矿原理分析
  • Linux服务器运维:如何通过grub参数pci=noaer禁用OS AER,让BMC正确记录PCIE错误日志
  • OpenWrt路由器插件:3分钟解锁网易云音乐所有灰色歌曲
  • 从设备配方到生产报表:手把手教你用Codesys时间类型构建完整时间轴
  • 体验Taotoken聚合端点在高峰期的请求延迟与稳定性
  • 如何实现高效Windows内存监控与清理:Mem Reduct深度技术解析
  • 5分钟快速上手!泰坦之旅无限仓库终极管理工具TQVaultAE完全指南
  • 萧山区教育培训机构综合实力排名(2026):品牌深度测评 + 选课避雷 - 浙江行业评测
  • AntiDupl:专业级重复图片检测工具,轻松释放磁盘空间
  • DDrawCompat:让经典游戏在Windows 11上完美运行的兼容性修复方案
  • 本地AI聊天伴侣LocalChat:离线部署、隐私保护与实战指南
  • 高效构建思维导图HTML模板:markmap html.ts模块的5个进阶实战技巧
  • 生信分析实战:用MetaPhlAn4处理完测序数据后,这些结果文件怎么用?(附常用脚本)
  • 终极Visual C++运行库解决方案:5步告别DLL错误困扰
  • 苏州鼎轩废旧电子产品:张家港区机房服务器设备回收公司怎么联系 - LYL仔仔