当前位置: 首页 > news >正文

ETDataset 数据集预处理实战:从原始CSV到PyTorch DataLoader的5个关键步骤

ETDataset 数据集预处理实战:从原始CSV到PyTorch DataLoader的5个关键步骤

电力变压器温度预测是能源管理领域的重要课题,ETDataset作为业内广泛使用的时间序列数据集,为研究者提供了丰富的实战素材。本文将手把手带你完成从原始CSV文件到可投入模型训练的PyTorch DataLoader的全流程,重点解决实际工程中的五个核心挑战。

1. 数据加载与初步探索

任何数据预处理流程的第一步都是理解数据结构和内容。ETDataset通常以CSV格式存储,包含分钟级(ETTm)和小时级(ETTh)两种时间粒度。我们先使用pandas加载数据并检查基本特征:

import pandas as pd # 加载小时级数据示例 df = pd.read_csv('ETTh1.csv') print(f"数据集形状: {df.shape}") print(df.head()) # 检查数据类型和缺失值 print("\n数据类型统计:") print(df.dtypes) print("\n缺失值统计:") print(df.isnull().sum())

典型输出会显示8个特征列:

  • date: 时间戳
  • HUFL/HULL: 高使用率有效/无效负载
  • MUFL/MULL: 中等使用率有效/无效负载
  • LUFL/LULL: 低使用率有效/无效负载
  • OT: 油温(预测目标)

注意:实际项目中建议使用pathlib代替直接字符串路径,增强代码可移植性。例如:

from pathlib import Path data_path = Path('data/ETTh1.csv')

时间序列数据的探索性分析(EDA)至关重要。快速绘制特征分布和趋势图:

import matplotlib.pyplot as plt df['OT'].plot(title='Oil Temperature Trend') plt.xlabel('Time') plt.ylabel('Temperature') plt.show()

2. 时间戳解析与索引处理

正确处理时间特征是时间序列分析的基础。ETDataset中的date列需要转换为datetime类型并设为索引:

df['date'] = pd.to_datetime(df['date']) df.set_index('date', inplace=True) # 检查时间序列连续性 time_diff = df.index.to_series().diff() print(f"时间间隔统计:\n{time_diff.value_counts()}")

对于存在不规则间隔的数据,我们需要进行重采样。例如将分钟级数据统一为15分钟间隔:

# 只适用于ETTm数据集 df_resampled = df.resample('15T').mean()

处理大型数据集时,可以考虑使用dask库进行分布式加载:

import dask.dataframe as dd ddf = dd.read_csv('ETTh1.csv', parse_dates=['date']) ddf = ddf.set_index('date')

3. 特征工程与归一化

电力负载特征通常存在量纲差异,需要进行标准化处理。我们使用Scikit-learn的ColumnTransformer实现差异化处理:

from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.compose import ColumnTransformer # 定义特征分组 load_features = ['HUFL', 'HULL', 'MUFL', 'MULL', 'LUFL', 'LULL'] target_feature = ['OT'] # 创建转换器 preprocessor = ColumnTransformer( transformers=[ ('load', StandardScaler(), load_features), ('temp', MinMaxScaler(), target_feature) ], remainder='passthrough' ) # 应用转换 df_scaled = pd.DataFrame( preprocessor.fit_transform(df), columns=load_features + target_feature, index=df.index )

对于时间序列数据,我们通常还需要创建滞后特征:

# 创建24小时滞后特征 for feature in load_features: df_scaled[f'{feature}_lag24'] = df_scaled[feature].shift(24) # 删除因滞后产生的缺失值 df_scaled.dropna(inplace=True)

4. 缺失值处理与异常检测

尽管ETDataset已经过预处理,实际项目中仍需处理数据质量问题。以下是常见的处理方法:

缺失值处理策略对比表

方法适用场景优点缺点
线性插值少量连续缺失保持趋势可能低估波动
前向填充设备短暂故障简单快速延长异常影响
季节性均值周期性数据保留周期特征忽略近期变化
模型预测大量缺失精度高计算成本高

异常值检测可以使用滑动窗口统计法:

def detect_anomalies(series, window=24, sigma=3): rolling_mean = series.rolling(window).mean() rolling_std = series.rolling(window).std() return series[(series - rolling_mean).abs() > sigma * rolling_std] anomalies = detect_anomalies(df_scaled['OT']) print(f"检测到异常值数量: {len(anomalies)}")

5. 构建时序样本与DataLoader

时间序列预测需要将数据转换为监督学习格式。我们实现一个灵活的滑动窗口生成器:

import torch from torch.utils.data import Dataset, DataLoader class ETDataset(Dataset): def __init__(self, data, window_size=24, horizon=12): self.data = torch.FloatTensor(data.values) self.window_size = window_size self.horizon = horizon def __len__(self): return len(self.data) - self.window_size - self.horizon + 1 def __getitem__(self, idx): x = self.data[idx:idx+self.window_size, :-1] # 除最后一列外的所有特征 y = self.data[idx+self.window_size:idx+self.window_size+self.horizon, -1] # 最后一列是目标 return x, y # 示例使用 dataset = ETDataset(df_scaled, window_size=24*7, horizon=24) # 使用一周数据预测下一天 dataloader = DataLoader(dataset, batch_size=32, shuffle=True) # 检查一个批次 for x, y in dataloader: print(f"输入形状: {x.shape}, 输出形状: {y.shape}") break

提示:对于多GPU训练,可以使用DistributedSampler优化数据加载:

from torch.utils.data.distributed import DistributedSampler sampler = DistributedSampler(dataset) dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

完整的数据处理流程还应包括数据集分割。时间序列需要按时间顺序划分:

# 按时间划分训练/验证/测试集 train_size = int(0.7 * len(df_scaled)) val_size = int(0.2 * len(df_scaled)) train_data = df_scaled.iloc[:train_size] val_data = df_scaled.iloc[train_size:train_size+val_size] test_data = df_scaled.iloc[train_size+val_size:] print(f"数据集划分: 训练集 {len(train_data)}, 验证集 {len(val_data)}, 测试集 {len(test_data)}")

高级技巧与性能优化

处理大型时间序列数据集时,内存和计算效率至关重要。以下是几个实用技巧:

内存映射技术:对于超大型数据集,可以使用numpy的memmap功能:

import numpy as np # 将数据保存为memmap格式 fp = np.memmap('temp.mmap', dtype='float32', mode='w+', shape=df_scaled.values.shape) fp[:] = df_scaled.values[:] del fp # 刷新到磁盘 # 后续加载 data = np.memmap('temp.mmap', dtype='float32', mode='r', shape=df_scaled.values.shape)

并行预处理:使用joblib加速特征工程:

from joblib import Parallel, delayed def process_feature(col): return df[col].rolling(24).mean() results = Parallel(n_jobs=4)(delayed(process_feature)(col) for col in load_features)

自定义DataLoader:实现更高效的时间序列采样:

class SequentialSampler(torch.utils.data.Sampler): def __init__(self, data_source, stride=1): self.data_source = data_source self.stride = stride def __iter__(self): n = len(self.data_source) return iter(range(0, n, self.stride)) def __len__(self): return len(range(0, len(self.data_source), self.stride)) sampler = SequentialSampler(dataset, stride=12) dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

工程化部署考虑

将预处理流程产品化时,需要关注以下方面:

  1. 可复现性:使用joblib保存预处理管道

    from joblib import dump dump(preprocessor, 'preprocessor.joblib')
  2. 监控数据漂移:定期检查特征分布变化

    from scipy import stats def detect_drift(reference, current, alpha=0.05): pvalues = [] for col in reference.columns: _, p = stats.ks_2samp(reference[col], current[col]) pvalues.append(p > alpha) return sum(pvalues) / len(pvalues)
  3. 自动化测试:为数据质量添加断言检查

    assert df_scaled.isnull().sum().sum() == 0, "存在缺失值未处理" assert (df_scaled.index == pd.Series(df_scaled.index).drop_duplicates()).all(), "索引不唯一"
  4. 性能基准:记录各步骤耗时

    from time import perf_counter class Timer: def __enter__(self): self.start = perf_counter() return self def __exit__(self, *args): self.end = perf_counter() self.duration = self.end - self.start with Timer() as t: preprocessor.fit_transform(df) print(f"预处理耗时: {t.duration:.2f}秒")
http://www.jsqmd.com/news/1131511/

相关文章:

  • AI率总超标?2026年AI论文网站排行榜权威发布,一次过审不是梦!
  • 短信验证码接口防刷实战:Redis 限流 3 策略与 5 分钟 10 次拦截
  • 从黑客角度解释:Rust 是系统级语言,而Go 却不是
  • 74HC32与PIC18F45K50实现高效键盘管理方案
  • 工业控制系统安全漏洞深度解析:从原理到防护的实战指南
  • 把抽象的价值、理想、能力,持续转化为现实世界中能够被观察、被验证、被影响的存在。
  • ADS 2022 威尔金森功分器设计:从原理图到版图仿真的 3 个关键步骤与 Delta 参数调优
  • LiteSeg 2019 模型 C++ OpenCV DNN 部署:RTX 3080 实测 512x512 图像 15ms 推理
  • Kali Linux:从渗透测试工具到专业安全审计平台的深度解析
  • COCO 2017 数据集实战:PyTorch DataLoader 构建与 80 类目标检测数据加载
  • ELK Stack 安全加固:Kibana 7.6.1 启用 X-Pack 认证的 5 个关键步骤
  • ResNet-50 迁移学习实战:CIFAR-10 数据集 95%+ 准确率调优(PyTorch 1.13)
  • 深度解析WeChatMsg:微信聊天记录数据资产化的技术实现方案
  • openEuler/QoS-Deployment-Test:从零开始编写自定义测试用例的完整指南
  • XXL-Job执行器默认AccessToken漏洞在不出网环境下的深度利用与防御
  • 高密度 PCB 维修:2种防护方案(绝缘纸/铜丝)避免热风枪损伤邻件
  • Linux上运行Windows软件与游戏的终极解决方案:Bottles完整指南
  • 终极指南:如何用MoeVoiceStudio实现高质量二次元语音合成
  • 如何快速将音频转文字:AsrTools智能语音识别终极指南
  • DIP封装转面包板:从2.54mm标准到7.62mm间距的5种适配方案解析
  • LLM 输出格式约束:JSON 模式不是万能保险
  • 故障复盘——让失败“变成财富“
  • 抖音无水印下载神器:5分钟搞定批量下载难题
  • Docker 镜像签名:能拉取不代表能运行
  • Apriori 算法 Python 实战:mlxtend 库处理 9835 条购物篮数据,挖掘 26 条强规则
  • mRemoteNG终极指南:一站式管理所有远程连接的免费神器
  • LSTM 股票预测实战:PyTorch 2.3 多特征工程与 3 种归一化方法对比
  • Python实现国密SM4算法:从核心原理到ECB/CBC模式实战
  • GAIL 2016 算法实战:PyTorch 复现 9 个 Gym 任务,3 种基线对比
  • 告别卡顿:用Winhance中文版让Windows系统重获流畅体验