当前位置: 首页 > news >正文

数据管理:从采集到特征存储

数据管理:从采集到特征存储

1. 技术分析

1.1 数据管理流程

数据管理是机器学习工程的基础:

数据管理流程 数据采集 → 数据存储 → 数据清洗 → 特征工程 → 特征存储

1.2 数据存储方案对比

方案类型特点适用场景
PostgreSQL关系型结构化数据交易数据
MongoDBNoSQL文档型非结构化数据
Apache Parquet列存储高效查询大数据
Feast特征存储特征管理ML特征

1.3 数据质量维度

数据质量维度 完整性: 数据是否完整 准确性: 数据是否准确 一致性: 数据格式一致 时效性: 数据是否及时

2. 核心功能实现

2.1 数据采集

import pandas as pd import numpy as np class DataCollector: def __init__(self): self.sources = {} def add_source(self, name, source): self.sources[name] = source def collect(self): dataframes = [] for name, source in self.sources.items(): df = source.fetch() df['source'] = name dataframes.append(df) return pd.concat(dataframes, ignore_index=True) class DatabaseSource: def __init__(self, connection_string, query): self.connection_string = connection_string self.query = query def fetch(self): import psycopg2 conn = psycopg2.connect(self.connection_string) df = pd.read_sql(self.query, conn) conn.close() return df class APIDataSource: def __init__(self, url, params=None): self.url = url self.params = params or {} def fetch(self): import requests response = requests.get(self.url, params=self.params) data = response.json() return pd.DataFrame(data) class FileDataSource: def __init__(self, file_path): self.file_path = file_path def fetch(self): if self.file_path.endswith('.csv'): return pd.read_csv(self.file_path) elif self.file_path.endswith('.parquet'): return pd.read_parquet(self.file_path) elif self.file_path.endswith('.json'): return pd.read_json(self.file_path)

2.2 数据清洗

class DataCleaner: def __init__(self): self.rules = [] def add_rule(self, rule): self.rules.append(rule) def clean(self, df): for rule in self.rules: df = rule(df) return df class MissingValueHandler: def __init__(self, strategy='drop'): self.strategy = strategy def __call__(self, df): if self.strategy == 'drop': return df.dropna() elif self.strategy == 'fill_mean': numeric_cols = df.select_dtypes(include=[np.number]).columns df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean()) return df elif self.strategy == 'fill_median': numeric_cols = df.select_dtypes(include=[np.number]).columns df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median()) return df class OutlierHandler: def __init__(self, method='iqr'): self.method = method def __call__(self, df): numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: if self.method == 'iqr': q1 = df[col].quantile(0.25) q3 = df[col].quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)] return df class DuplicateHandler: def __call__(self, df): return df.drop_duplicates()

2.3 特征存储

import feast class FeatureStore: def __init__(self, repo_path): self.repo_path = repo_path self.store = feast.FeatureStore(repo_path=repo_path) def register_feature(self, feature_def): self.store.apply([feature_def]) def materialize_features(self, start_date, end_date): self.store.materialize(start_date, end_date) def get_features(self, entity_df, features): return self.store.get_historical_features( entity_df=entity_df, features=features ).to_df() class FeatureRegistry: def __init__(self): self.features = {} def register(self, name, feature): self.features[name] = feature def get(self, name): return self.features.get(name) def list_features(self): return list(self.features.keys()) class OnlineFeatureStore: def __init__(self): self.features = {} def update(self, entity_id, features): if entity_id not in self.features: self.features[entity_id] = {} self.features[entity_id].update(features) def get(self, entity_id): return self.features.get(entity_id, {}) def batch_get(self, entity_ids): return {eid: self.get(eid) for eid in entity_ids}

3. 性能对比

3.1 数据存储对比

存储读取速度写入速度查询灵活性适用场景
CSV小规模
Parquet大规模
PostgreSQL事务性
FeastML特征

3.2 数据清洗方法对比

方法效果计算开销适用场景
删除缺失值简单缺失率低
均值填充保持分布数值特征
中位数填充抗异常值偏态分布

3.3 特征存储对比

工具在线服务离线服务版本管理部署复杂度
Feast
Tecton
Hopsworks

4. 最佳实践

4.1 数据管理流程

def build_data_pipeline(config): collector = DataCollector() for source_config in config['sources']: if source_config['type'] == 'database': source = DatabaseSource(source_config['connection_string'], source_config['query']) elif source_config['type'] == 'api': source = APIDataSource(source_config['url'], source_config.get('params')) elif source_config['type'] == 'file': source = FileDataSource(source_config['path']) collector.add_source(source_config['name'], source) cleaner = DataCleaner() cleaner.add_rule(MissingValueHandler(config.get('missing_strategy', 'drop'))) cleaner.add_rule(OutlierHandler()) cleaner.add_rule(DuplicateHandler()) return collector, cleaner class DataManagementWorkflow: def __init__(self, config): self.collector, self.cleaner = build_data_pipeline(config) self.feature_store = FeatureStore(config.get('feature_store_path', '.')) def run(self): print("Collecting data...") raw_data = self.collector.collect() print("Cleaning data...") cleaned_data = self.cleaner.clean(raw_data) print("Storing features...") self.feature_store.materialize_features( cleaned_data['timestamp'].min(), cleaned_data['timestamp'].max() ) return cleaned_data

4.2 数据质量检查

class DataQualityChecker: def __init__(self): pass def check_completeness(self, df): completeness = (1 - df.isnull().sum() / len(df)) * 100 return completeness.to_dict() def check_uniqueness(self, df, unique_columns): results = {} for col in unique_columns: results[col] = df[col].nunique() == len(df) return results def check_range(self, df, column_ranges): results = {} for col, (min_val, max_val) in column_ranges.items(): if col in df.columns: within_range = ((df[col] >= min_val) & (df[col] <= max_val)).all() results[col] = within_range return results def run_all_checks(self, df, unique_columns=[], column_ranges={}): return { 'completeness': self.check_completeness(df), 'uniqueness': self.check_uniqueness(df, unique_columns), 'range_check': self.check_range(df, column_ranges) }

5. 总结

数据管理是机器学习工程的基础:

  1. 数据采集:从多种来源获取数据
  2. 数据清洗:确保数据质量
  3. 特征存储:管理和复用特征
  4. 数据质量:持续监控数据质量

对比数据如下:

  • Feast 是最佳的开源特征存储工具
  • Parquet 是大规模数据的最佳格式
  • 推荐使用 Feast 管理 ML 特征
  • 数据质量检查应自动化
http://www.jsqmd.com/news/813891/

相关文章:

  • Skeleton UI组件库:现代Web开发的框架无关设计系统实践
  • 2026亲测:知网/维普AI率从60%降到5%!5款降AIGC工具深度测评(附免费手改技巧) - 降AI实验室
  • 使用curl命令直接测试taotoken聊天补全接口的配置与排错方法
  • NotebookLM如何3天完成文献综述初稿:清华/中科院团队实证的7步学术工作流
  • Umi-CUT:批量图片去黑边与裁剪的终极免费解决方案
  • 芯片巨头与创客运动:从生态博弈到商业共赢的十年演进
  • 还在问CTF是啥?这篇“网安扫盲贴”,带你从入门到入坑!小白收藏这篇就够了
  • 2026年北极绒费用分析,哪家更实惠 - mypinpai
  • 芯片设计RTL到GDSII流程演进:从物理感知到多物理域签核
  • 技术沟通中的语义陷阱:识别与清理“僵尸表达”的工程实践
  • IT行业年龄歧视的法律边界与合规实践:从招聘到解雇的风险防范指南
  • Amazon 内部金融团队的 RAG 实战:用 Bedrock 把监管审查从人工翻文件变成 AI 对话
  • 从波音737 MAX看复杂系统安全设计:冗余、验证与工程伦理
  • ARM调试寄存器详解:EDITCTRL与EDPRCR实战指南
  • 开源API逆向工程:豆包大模型免费接口实现与部署指南
  • 2026年专精特新申报机构口碑靠谱吗 - mypinpai
  • 使用Python配合Taotoken快速构建一个多模型对话测试脚本
  • 借贷纠纷还是刑事诈骗?太原刑事律师胡晓颐如何为企业主洗刷“诈骗”嫌疑? - 品牌排行榜
  • README工匠技能:从模块化到自动化,打造高质量开源项目文档
  • Webiny无头CMS深度解析:Serverless架构与插件化设计实战
  • 2026年开关什么品牌好?行业口碑品牌推荐及选择参考 - 品牌排行榜
  • 通用型数据采集系统选型指南:从原理到实战的七维评估
  • UI/UX设计协作平台核心功能与技术架构深度解析
  • WarcraftHelper技术方案:游戏兼容性修复工具的现代化适配实践
  • 构建具备上下文感知的智能对话机器人:从记忆管理到主动服务
  • Ubuntu: Suites: noble noble-updates noble-backports noble-security noble-proposed
  • 目前正规的邓州旧房全屋改造公司推荐排行榜2026 - 品牌排行榜
  • 深度拆解GPT-Realtime-2:从“能听会说”到“听懂人话”,靠的是什么?
  • 2026年开关有什么牌子?五大热门品牌推荐 - 品牌排行榜
  • 如何快速解密RPG Maker加密文件:新手必看的完整解密指南