PEMS-BAY交通速度数据HDF5文件解析全攻略:用Pandas和h5py库搞定时空数据预处理
PEMS-BAY交通速度数据HDF5文件解析全攻略:用Pandas和h5py库搞定时空数据预处理
时空预测模型如DCRNN的输入数据质量直接决定了模型性能上限。作为交通预测领域的基准数据集,PEMS-BAY以HDF5格式存储了325个监测站半年内每5分钟采样的速度数据,总计超过1600万条记录。面对这种高维时空数据,如何高效解析并转换为模型可用的结构化格式,成为每个数据工程师必须掌握的技能。
本文将带您深入HDF5文件结构,从数据加载、维度解析到时间戳转换,逐步拆解预处理全流程。我们不仅会用到h5py库直接操作二进制数据,还会结合Pandas进行高级时间序列处理,最终生成带站点ID和时间戳的规整DataFrame。无论您正在构建交通流量预测系统,还是处理其他时空数据集,这些技术都能直接迁移复用。
1. HDF5文件结构与数据加载
HDF5(Hierarchical Data Format)是一种支持海量数据存储的二进制文件格式,特别适合存储多维数组和复杂数据结构。PEMS-BAY数据集采用这种格式,将速度数据、站点信息和时间戳分层组织。
1.1 文件基础结构探查
使用h5py库打开文件后,首先需要理解数据的组织方式:
import h5py file_path = 'pems-bay.h5' with h5py.File(file_path, 'r') as f: print(list(f.keys())) # 查看根目录下的组 speed_group = f['speed'] print(list(speed_group.keys())) # 查看speed组下的数据集典型输出显示speed组包含四个关键数据集:
axis0: 325个监测站IDaxis1: 52116个时间戳(纳秒精度)block0_values: 速度值矩阵(52116×325)block0_items: 与axis0相同的站点ID(冗余字段)
1.2 数据维度验证
加载数据前验证维度一致性至关重要:
with h5py.File(file_path, 'r') as f: speed = f['speed'] print(f"站点数量: {speed['axis0'][:].shape[0]}") print(f"时间点数量: {speed['axis1'][:].shape[0]}") print(f"速度矩阵形状: {speed['block0_values'][:].shape}")关键检查点:
- 确保
block0_values的第一维度与axis1长度一致(时间维度) - 第二维度应与
axis0长度一致(空间维度) - 检查
axis0和block0_items是否确实包含相同站点ID
2. 时间戳解析与索引构建
原始时间戳采用纳秒精度的Unix时间戳,需要转换为人类可读格式并建立有效的时间索引。
2.1 纳秒时间戳转换
Pandas的to_datetime能自动处理纳秒级时间戳:
import pandas as pd with h5py.File(file_path, 'r') as f: timestamps_ns = f['speed']['axis1'][:] datetime_index = pd.to_datetime(timestamps_ns) print(datetime_index[:5]) # 检查前5个时间点常见问题排查:
- 时区处理:默认转换为本地时区,如需UTC可指定
utc=True - 时间范围验证:检查首尾时间是否符合预期(PEMS-BAY应为2017年上半年)
2.2 构建规整时间序列
验证时间序列的连续性和采样频率:
freq_check = pd.infer_freq(datetime_index[:100]) # 检查前100个点的频率 print(f"推断频率: {freq_check}") # 生成完整时间范围验证是否有缺失 expected_range = pd.date_range( start=datetime_index[0], end=datetime_index[-1], freq='5T' # 5分钟间隔 ) print(f"缺失点数: {len(expected_range) - len(datetime_index)}")提示:PEMS-BAY数据理论上应完全连续,如发现缺失点需检查数据加载过程是否出错。
3. 站点信息与空间维度处理
325个监测站的元数据需要与速度数据正确关联,为后续空间分析奠定基础。
3.1 站点ID匹配验证
确保速度数据中的站点顺序与元数据一致:
station_ids = pd.DataFrame({ 'speed_file_id': f['speed']['axis0'][:], 'block0_items_id': f['speed']['block0_items'][:] }) print(station_ids.head()) # 验证ID是否完全一致 assert (station_ids['speed_file_id'] == station_ids['block0_items_id']).all()3.2 站点地理位置加载
从配套CSV加载站点坐标数据:
locations = pd.read_csv( 'graph_sensor_locations_bay.csv', names=['station_id', 'latitude', 'longitude'] ) # 合并验证 station_meta = pd.merge( station_ids[['speed_file_id']], locations, left_on='speed_file_id', right_on='station_id', how='left' ) print(f"缺失坐标的站点数: {station_meta['latitude'].isna().sum()}")地理数据处理技巧:
- 使用平均坐标作为地图展示的中心点
- 将经纬度转换为Web墨卡托投影(EPSG:3857)用于距离计算
4. 构建时空DataFrame
将原始速度矩阵转换为"长格式"DataFrame,便于后续分析和模型输入。
4.1 矩阵转置与重组
使用Pandas的MultiIndex实现高效转换:
import numpy as np with h5py.File(file_path, 'r') as f: speed_matrix = f['speed']['block0_values'][:] # 创建多级索引 index = pd.MultiIndex.from_product( [datetime_index, station_ids['speed_file_id']], names=['timestamp', 'station_id'] ) # 展平速度矩阵并创建Series speed_series = pd.Series( data=speed_matrix.flatten('F'), # 按列展平 index=index, name='speed' ) # 转换为DataFrame并重置索引 speed_df = speed_series.reset_index() print(speed_df.head())性能优化:
- 对于大数据集,可考虑分块处理
- 使用
dask.dataframe实现并行转换
4.2 数据质量检查
转换完成后进行基本统计分析:
stats = speed_df['speed'].describe() print(stats) # 异常值检测 q1 = stats['25%'] q3 = stats['75%'] iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr outliers = speed_df[ (speed_df['speed'] < lower_bound) | (speed_df['speed'] > upper_bound) ] print(f"异常值比例: {len(outliers)/len(speed_df):.2%}")5. 高效数据切片与查询
处理后的DataFrame支持灵活的时间空间切片,满足不同分析需求。
5.1 时间范围筛选
利用Pandas的时间序列查询能力:
# 选择特定日期范围 jan_data = speed_df[ speed_df['timestamp'].between( '2017-01-01', '2017-01-31' ) ] # 选择特定时段(如早高峰) morning_peak = speed_df[ speed_df['timestamp'].dt.time.between( pd.to_datetime('07:00').time(), pd.to_datetime('09:00').time() ) ]5.2 空间筛选与聚合
结合站点元数据进行空间分析:
# 合并地理位置信息 enhanced_df = pd.merge( speed_df, station_meta, left_on='station_id', right_on='speed_file_id' ) # 区域筛选(示例:经度大于-122.4的区域) west_region = enhanced_df[enhanced_df['longitude'] > -122.4] # 按站点聚合统计 station_stats = enhanced_df.groupby('station_id')['speed'].agg( ['mean', 'std', 'count'] ).merge( station_meta[['station_id', 'latitude', 'longitude']], on='station_id' )6. 存储优化与格式转换
处理后的数据可选择多种格式存储,平衡访问效率和使用便利性。
6.1 格式对比与选择
| 格式 | 读取速度 | 写入速度 | 存储效率 | 查询灵活性 | 适用场景 |
|---|---|---|---|---|---|
| CSV | 慢 | 慢 | 低 | 高 | 数据交换 |
| Parquet | 快 | 中 | 高 | 中 | 分析型查询 |
| Feather | 很快 | 快 | 中 | 低 | 临时存储 |
| HDF5 | 快 | 慢 | 高 | 低 | 科学计算 |
6.2 分区存储策略
对于超大规模数据,可按时间分区存储:
# 按月份分区存储为Parquet for month in range(1, 7): month_data = speed_df[ speed_df['timestamp'].dt.month == month ] month_data.to_parquet( f'pems_bay_2017_{month:02d}.parquet', partition_cols=['station_id'] # 可选按站点进一步分区 )7. 可视化与探索分析
数据可视化是理解时空模式的重要手段。
7.1 时间趋势分析
绘制典型站点的速度变化:
import matplotlib.pyplot as plt sample_station = 400001 station_ts = speed_df[ speed_df['station_id'] == sample_station ].set_index('timestamp')['speed'] plt.figure(figsize=(12, 6)) station_ts.plot(title=f'Station {sample_station} Speed Trend') plt.ylabel('Speed (mph)') plt.grid(True)7.2 空间热力图
使用folium创建交互式地图:
import folium from folium.plugins import HeatMap # 计算各站点平均速度 avg_speed = speed_df.groupby('station_id')['speed'].mean().reset_index() geo_speed = pd.merge(avg_speed, locations, on='station_id') # 创建底图 m = folium.Map( location=[geo_speed['latitude'].mean(), geo_speed['longitude'].mean()], zoom_start=11 ) # 添加热力层 heat_data = geo_speed[['latitude', 'longitude', 'speed']].values.tolist() HeatMap(heat_data, radius=15).add_to(m)在实际项目中,处理PEMS-BAY数据时最常见的坑是混淆矩阵的维度顺序。记得检查block0_values的展平方向(Fortran风格按列),否则会导致站点与速度值错位。另一个实用技巧是将站点元数据预先合并到主DataFrame,这样在后续分析中可以省去频繁的merge操作。
