当前位置: 首页 > news >正文

DolphinDB工业数据质量:完整性检查与修复

目录

    • 摘要
    • 一、数据质量概述
      • 1.1 数据质量维度
      • 1.2 质量指标
    • 二、完整性检查
      • 2.1 字段完整性
      • 2.2 记录完整性
      • 2.3 时间完整性
    • 三、一致性检查
      • 3.1 数据一致性
      • 3.2 引用一致性
      • 3.3 业务一致性
    • 四、数据质量评分
      • 4.1 质量评分函数
      • 4.2 质量报告
    • 五、自动修复
      • 5.1 缺失值修复
      • 5.2 异常值修复
      • 5.3 重复值修复
    • 六、质量监控
      • 6.1 质量监控表
      • 6.2 定期质量检查
    • 七、实战案例
      • 7.1 数据质量管理平台
    • 八、总结
    • 参考资料

摘要

本文深入讲解DolphinDB工业数据质量管理。从完整性检查到一致性验证,从数据质量评分到自动修复,从质量监控到持续改进,全面介绍数据质量管理的核心方法。通过丰富的代码示例,帮助读者掌握工业数据质量管理的核心技能。


一、数据质量概述

1.1 数据质量维度

数据质量维度

完整性

高质量数据

准确性

一致性

及时性

有效性

1.2 质量指标

指标说明
完整性数据是否完整
准确性数据是否正确
一致性数据是否一致
及时性数据是否及时
有效性数据是否有效

二、完整性检查

2.1 字段完整性

//创建测试数据 t=table(1..100asid,take([1,NULL,3],100)asdevice_id,take([25.0,NULL,27.0],100)astemperature,take([50.0,51.0,NULL],100)ashumidity)//检查字段完整性defcheckFieldCompleteness(data){result=table(data.columnNames()asfield_name,each(def(col){sum(isNull(data[col]))},data.columnNames())asnull_count,each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())asnull_rate)returnresult}//使用 checkFieldCompleteness(t)

2.2 记录完整性

//检查记录完整性defcheckRecordCompleteness(data,keyColumns){//检查主键完整性 keyNull=select*fromdata where hasNull(keyColumns)//检查重复记录 duplicates=select count(*)ascntfromdata group by keyColumns having count(*)>1returndict(STRING,ANY,[["keyNullCount",keyNull.rows()],["duplicateCount",duplicates.rows()]])}//使用 checkRecordCompleteness(t,`id)

2.3 时间完整性

//检查时间序列完整性defcheckTimeCompleteness(data,timeCol,interval){//获取时间范围 minTime=min(data[timeCol])maxTime=max(data[timeCol])//计算预期记录数 expectedCount=(maxTime-minTime)/interval+1//实际记录数 actualCount=data.rows()//缺失记录 missingCount=expectedCount-actualCountreturndict(STRING,ANY,[["expectedCount",expectedCount],["actualCount",actualCount],["missingCount",missingCount],["completenessRate",actualCount*100.0/expectedCount]])}

三、一致性检查

3.1 数据一致性

//检查数据一致性defcheckDataConsistency(data,rules){results=array(STRING,0)for(ruleinrules){violations=select*fromdata wherenoteval(rule.condition)if(violations.rows()>0){results.append!(rule.name+": "+string(violations.rows())+" 条违规")}}returnresults}//定义规则 rules=[dict(STRING,ANY,[["name","温度范围"],["condition","temperature between -40 and 100"]]),dict(STRING,ANY,[["name","湿度范围"],["condition","humidity between 0 and 100"]])]//使用 checkDataConsistency(t,rules)

3.2 引用一致性

//检查引用一致性defcheckReferenceConsistency(data,refTable,dataCol,refCol){//查找无效引用 invalidRefs=select*fromdata where data[dataCol]notin(select refColfromrefTable)returndict(STRING,ANY,[["invalidCount",invalidRefs.rows()],["invalidRecords",invalidRefs]])}

3.3 业务一致性

//检查业务一致性defcheckBusinessConsistency(data){//示例:温度升高时,湿度应该下降 inconsistent=select*fromdata where temperature>30andhumidity>70returninconsistent}

四、数据质量评分

4.1 质量评分函数

//数据质量评分defcalculateQualityScore(data){scores=dict(STRING,DOUBLE)//完整性评分 nullRates=each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())scores["completeness"]=100-avg(nullRates)//准确性评分(基于异常值比例) outlierRates=array(DOUBLE,0)for(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgVal=avg(data[col])stdVal=std(data[col])outlierRate=sum(abs(data[col]-avgVal)>3*stdVal)*100.0/data.rows()outlierRates.append!(outlierRate)}}scores["accuracy"]=100-avg(outlierRates)//总分 scores["total"]=(scores["completeness"]+scores["accuracy"])/2returnscores}//使用 calculateQualityScore(t)

4.2 质量报告

//生成质量报告defgenerateQualityReport(data){report=dict(STRING,ANY)//基本信息 report["totalRows"]=data.rows()report["totalColumns"]=data.columns()//完整性 report["completeness"]=checkFieldCompleteness(data)//质量评分 report["scores"]=calculateQualityScore(data)returnreport}//使用 report=generateQualityReport(t)print(report)

五、自动修复

5.1 缺失值修复

//自动修复缺失值defautoFixMissingValues(data,strategy="mean"){result=datafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){if(strategy=="mean"){result[col]=iif(isNull(data[col]),avg(data[col]),data[col])}elseif(strategy=="median"){result[col]=iif(isNull(data[col]),med(data[col]),data[col])}elseif(strategy=="zero"){result[col]=iif(isNull(data[col]),0,data[col])}}}returnresult}

5.2 异常值修复

//自动修复异常值defautoFixOutliers(data,method="clip"){result=datafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgVal=avg(data[col])stdVal=std(data[col])lower=avgVal-3*stdVal upper=avgVal+3*stdValif(method=="clip"){result[col]=iif(data[col]<lower,lower,iif(data[col]>upper,upper,data[col]))}elseif(method=="remove"){result=select*fromresult where data[col]between lowerandupper}}}returnresult}

5.3 重复值修复

//自动修复重复值defautoFixDuplicates(data,keyColumns){returnselect distinct*fromdata}

六、质量监控

6.1 质量监控表

//创建质量监控表 share table(1:0,`check_time`table_name`check_type`score`details,[TIMESTAMP,STRING,STRING,DOUBLE,STRING])asquality_log//记录质量检查deflogQualityCheck(tableName,checkType,score,details){insert into quality_log values(now(),tableName,checkType,score,details)}

6.2 定期质量检查

//定期质量检查任务defscheduledQualityCheck(){t=loadTable("dfs://iot_db","sensor_data")//完整性检查 completeness=calculateQualityScore(t)["completeness"]logQualityCheck("sensor_data","completeness",completeness,"")//准确性检查 accuracy=calculateQualityScore(t)["accuracy"]logQualityCheck("sensor_data","accuracy",accuracy,"")}//定时任务 scheduleJob("quality_check","数据质量检查",scheduledQualityCheck,00:00,2024.01.01,2030.12.31,'D')

七、实战案例

7.1 数据质量管理平台

//==========数据质量管理平台==========//1.创建质量检查函数defqualityCheckPipeline(data,tableName){print("=== 数据质量检查: "+tableName+" ===")//完整性检查 completeness=checkFieldCompleteness(data)print("字段完整性:")print(completeness)//一致性检查 rules=[dict(STRING,ANY,[["name","温度范围"],["condition","temperature between -40 and 100"]]),dict(STRING,ANY,[["name","湿度范围"],["condition","humidity between 0 and 100"]])]consistency=checkDataConsistency(data,rules)print("一致性检查: "+string(consistency))//质量评分 scores=calculateQualityScore(data)print("质量评分:")print(scores)//记录日志 logQualityCheck(tableName,"total",scores["total"],"")returnscores}//2.创建测试数据 t=table(1..1000asid,take(1..10,1000)asdevice_id,2024.01.01T00:00:00+0..999*60000astimestamp,concat([rand(20.0..30.0,950),take(NULL,30),rand(100.0..200.0,20)])astemperature,concat([rand(40.0..60.0,970),take(NULL,30)])ashumidity)//3.执行质量检查 qualityCheckPipeline(t,"sensor_data")//4.自动修复 fixed=autoFixMissingValues(t,"mean")fixed=autoFixOutliers(fixed,"clip")//5.再次检查 qualityCheckPipeline(fixed,"sensor_data_fixed")print("数据质量管理完成")

八、总结

本文详细介绍了DolphinDB工业数据质量管理:

  1. 完整性检查:字段完整性、记录完整性、时间完整性
  2. 一致性检查:数据一致性、引用一致性、业务一致性
  3. 质量评分:评分函数、质量报告
  4. 自动修复:缺失值修复、异常值修复、重复值修复
  5. 质量监控:监控表、定期检查

思考题

  1. 如何设计数据质量指标体系?
  2. 如何平衡数据修复的自动化程度?
  3. 如何持续改进数据质量?

参考资料

  • DolphinDB数据质量
  • DolphinDB数据治理

http://www.jsqmd.com/news/1078254/

相关文章:

  • P89LPC9321单片机引脚、时钟与SFR配置实战指南
  • 2026深度实测:vibe coding优势全解析——企业级AI开发选型实战指南
  • 厨房食品卫生与安全检测14类数据集分享(适用于YOLO系列深度学习分类检测任务)
  • 个性化 LLM Agent 不是“加个用户画像“那么简单:这篇综述把四维能力分类法定清楚了
  • 用《战舰》游戏学强化学习:从零构建可运行的RL智能体
  • 从Swagger/HAR到JMeter脚本:构建自动化性能测试工具链的工程实践
  • 为什么选择TrollInstallerX:iOS 14-16.6.1 TrollStore安装完整指南
  • AI 故障排障 Agent:从人工诊断到多源数据自动推理的工程实践
  • 铁电MEMS突触技术:神经形态计算新突破
  • Hermes 上手指南:真实开发里的落地路径
  • 动图魔方技术拆解 10:GIF 多帧重编辑的 ImageSource 与 PixelMapList 实践
  • 鸿蒙 ArkTS 实战:Pet Feeding Clock 从状态建模到交互闭环完整解析
  • PianoPlayer:如何用动态规划算法解决钢琴指法优化的数学难题
  • GPT-4稀疏激活真相:2%参数如何驱动万亿模型高效推理
  • 一文彻底搞懂 Loop Engineering
  • 机器学习中的范数:从数学定义到模型调优的实战指南
  • 第 16 篇:Requests 库入门 —— 5 行代码到 50 行工程的蜕变
  • 暗黑破坏神2存档编辑器:从零开始掌握角色定制的终极指南
  • MuleSoft企业级AI编排:LLM安全接入核心系统的实战方法论
  • ROS日志系统深度解析:从调试工具到机器人可观测性基础设施
  • Deepin Boot Maker:快速制作启动盘的终极完整指南
  • 六类AI推理场景成本优化实战:从静态响应到硬件感知
  • 类变量和实例变量的内存分配方式对性能的影响具体有哪些?
  • VMware虚拟机从入门到精通:完整安装指南
  • Ministral 3微调指南:面向X光片的视觉-语言协同诊断训练
  • SVM数学直觉:从几何本质到工程调参的实战指南
  • 用pytest构建AI应用测试体系:从语义断言到CI/CD集成
  • 线性代数直觉:用Python形状思维打通机器学习矩阵运算
  • FIFA 23 Live Editor:重新定义你的足球经理生涯体验
  • 手机:人类文明的第三物种