当前位置: 首页 > news >正文

温湿度传感器

一段Python代码来分析温度数据中的异常值。这里使用了多种常见的异常检测方法:

我来帮你写一段Python代码来分析温度数据中的异常值。这里使用了多种常见的异常检测方法:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体(如果使用中文标签)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial']
plt.rcParams['axes.unicode_minus'] = False

# 1. 生成模拟温度数据(包含一些异常值)
np.random.seed(42)
n_samples = 1000

# 正常温度数据(假设冬季气温,大部分在-5°C到10°C之间)
normal_temps = np.random.normal(loc=2, scale=4, size=int(n_samples*0.95))

# 添加一些异常值
outliers = np.array([
25, # 异常高温
30, # 异常高温
-20, # 异常低温
-25, # 异常低温
15, 15, 15, # 重复的较高温度
-15, -15, # 重复的较低温度
])

# 合并数据
all_temps = np.concatenate([normal_temps, outliers])
dates = pd.date_range(start='2024-01-01', periods=len(all_temps), freq='H')

# 创建DataFrame
df = pd.DataFrame({
'timestamp': dates,
'temperature': all_temps
})

print("数据概览:")
print(f"总数据点: {len(df)}")
print(f"温度统计:")
print(df['temperature'].describe())
print("\n" + "="*50)

# 2. 使用Z-score方法检测异常值
def detect_outliers_zscore(data, threshold=3):
"""使用Z-score方法检测异常值"""
z_scores = np.abs(stats.zscore(data))
outliers = np.where(z_scores > threshold)[0]
return outliers, z_scores

# 3. 使用IQR方法检测异常值
def detect_outliers_iqr(data, k=1.5):
"""使用IQR方法检测异常值"""
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - k * IQR
upper_bound = Q3 + k * IQR
outliers = np.where((data < lower_bound) | (data > upper_bound))[0]
return outliers, (lower_bound, upper_bound)

# 4. 使用Isolation Forest
def detect_outliers_isolation_forest(data, contamination=0.05):
"""使用孤立森林检测异常值"""
iso_forest = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=100
)
data_reshaped = data.values.reshape(-1, 1)
predictions = iso_forest.fit_predict(data_reshaped)
outliers = np.where(predictions == -1)[0]
return outliers, iso_forest.decision_function(data_reshaped)

# 5. 使用LOF(局部离群因子)
def detect_outliers_lof(data, contamination=0.05):
"""使用LOF检测异常值"""
lof = LocalOutlierFactor(
contamination=contamination,
novelty=False,
n_neighbors=20
)
data_reshaped = data.values.reshape(-1, 1)
predictions = lof.fit_predict(data_reshaped)
outliers = np.where(predictions == -1)[0]
return outliers, lof.negative_outlier_factor_

# 应用各种方法检测异常值
print("异常值检测结果:")
print("-" * 30)

# Z-score方法
zscore_outliers, z_scores = detect_outliers_zscore(df['temperature'], threshold=3)
print(f"Z-score方法 (threshold=3):")
print(f" 检测到异常值数量: {len(zscore_outliers)}")
print(f" 异常值索引: {zscore_outliers[:10]}{'...' if len(zscore_outliers) > 10 else ''}")

# IQR方法
iqr_outliers, (lower_bound, upper_bound) = detect_outliers_iqr(df['temperature'], k=1.5)
print(f"\nIQR方法 (k=1.5):")
print(f" 检测到异常值数量: {len(iqr_outliers)}")
print(f" 正常范围: [{lower_bound:.2f}, {upper_bound:.2f}]")
print(f" 异常值索引: {iqr_outliers[:10]}{'...' if len(iqr_outliers) > 10 else ''}")

# Isolation Forest
iso_outliers, iso_scores = detect_outliers_isolation_forest(df['temperature'], contamination=0.05)
print(f"\nIsolation Forest (contamination=0.05):")
print(f" 检测到异常值数量: {len(iso_outliers)}")
print(f" 异常值索引: {iso_outliers[:10]}{'...' if len(iso_outliers) > 10 else ''}")

# LOF方法
lof_outliers, lof_scores = detect_outliers_lof(df['temperature'], contamination=0.05)
print(f"\nLOF方法 (contamination=0.05):")
print(f" 检测到异常值数量: {len(lof_outliers)}")
print(f" 异常值索引: {lof_outliers[:10]}{'...' if len(lof_outliers) > 10 else ''}")

# 6. 统计异常值
def get_consensus_outliers(methods_outliers, min_agreement=2):
"""获取多种方法都认为的异常值(共识)"""
outlier_counts = {}
for outliers in methods_outliers:
for idx in outliers:
outlier_counts[idx] = outlier_counts.get(idx, 0) + 1

# 返回至少被min_agreement种方法认为是异常值的索引
consensus_outliers = [idx for idx, count in outlier_counts.items()
if count >= min_agreement]
return sorted(consensus_outliers), outlier_counts

all_methods = [zscore_outliers, iqr_outliers, iso_outliers, lof_outliers]
consensus_outliers, outlier_counts = get_consensus_outliers(all_methods, min_agreement=2)

print(f"\n共识异常值 (至少2种方法认同):")
print(f" 数量: {len(consensus_outliers)}")
print(f" 索引和温度值:")
for idx in consensus_outliers[:15]: # 显示前15个
print(f" 索引 {idx}: {df['temperature'].iloc[idx]:.1f}°C")

# 7. 可视化
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
fig.suptitle('温度异常值分析', fontsize=16, fontweight='bold')

# 子图1: 温度分布直方图
axes[0, 0].hist(df['temperature'], bins=50, edgecolor='black', alpha=0.7)
axes[0, 0].axvline(df['temperature'].mean(), color='red', linestyle='--',
label=f'均值: {df["temperature"].mean():.2f}°C')
axes[0, 0].axvline(df['temperature'].median(), color='green', linestyle='--',
label=f'中位数: {df["temperature"].median():.2f}°C')
axes[0, 0].set_xlabel('温度 (°C)')
axes[0, 0].set_ylabel('频数')
axes[0, 0].set_title('温度分布直方图')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 子图2: 箱线图
axes[0, 1].boxplot(df['temperature'], vert=True, patch_artist=True)
axes[0, 1].set_title('温度箱线图')
axes[0, 1].set_ylabel('温度 (°C)')
axes[0, 1].grid(True, alpha=0.3)

# 子图3: 时间序列图(标出异常值)
axes[1, 0].plot(df['timestamp'], df['temperature'], 'b-', alpha=0.6, label='正常值')
consensus_temps = df['temperature'].iloc[consensus_outliers]
consensus_timestamps = df['timestamp'].iloc[consensus_outliers]
axes[1, 0].scatter(consensus_timestamps, consensus_temps,
color='red', s=50, zorder=5, label='异常值')
axes[1, 0].set_xlabel('时间')
axes[1, 0].set_ylabel('温度 (°C)')
axes[1, 0].set_title('温度时间序列(标出异常值)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].tick_params(axis='x', rotation=45)

# 子图4: 异常值检测方法比较
methods = ['Z-score', 'IQR', 'Isolation Forest', 'LOF']
outlier_counts = [len(zscore_outliers), len(iqr_outliers), len(iso_outliers), len(lof_outliers)]
x_pos = np.arange(len(methods))
axes[1, 1].bar(x_pos, outlier_counts, color=['skyblue', 'lightgreen', 'salmon', 'gold'])
axes[1, 1].set_xlabel('检测方法')
axes[1, 1].set_ylabel('检测到的异常值数量')
axes[1, 1].set_title('不同方法检测到的异常值数量')
axes[1, 1].set_xticks(x_pos)
axes[1, 1].set_xticklabels(methods, rotation=45)
for i, v in enumerate(outlier_counts):
axes[1, 1].text(i, v, str(v), ha='center', va='bottom', fontweight='bold')
axes[1, 1].grid(True, alpha=0.3, axis='y')

# 子图5: QQ图(检验正态分布)
stats.probplot(df['temperature'], dist="norm", plot=axes[2, 0])
axes[2, 0].set_title('QQ图(检验正态性)')
axes[2, 0].grid(True, alpha=0.3)

# 子图6: 共识异常值详细分析
temperature_values = df['temperature'].values
normal_mask = np.ones(len(temperature_values), dtype=bool)
normal_mask[consensus_outliers] = False

axes[2, 1].hist(temperature_values[normal_mask], bins=30, alpha=0.7,
label='正常值', color='blue', edgecolor='black')
axes[2, 1].hist(temperature_values[consensus_outliers], bins=10, alpha=0.7,
label='异常值', color='red', edgecolor='black')
axes[2, 1].set_xlabel('温度 (°C)')
axes[2, 1].set_ylabel('频数')
axes[2, 1].set_title('正常值与异常值分布对比')
axes[2, 1].legend()
axes[2, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 8. 异常值处理建议
print("\n" + "="*50)
print("异常值处理建议:")
print("="*50)

# 分析异常值特征
if len(consensus_outliers) > 0:
outlier_temps = df['temperature'].iloc[consensus_outliers]
mean_temp = df['temperature'].mean()
std_temp = df['temperature'].std()

print(f"\n异常值统计信息:")
print(f" 异常值平均温度: {outlier_temps.mean():.2f}°C")
print(f" 异常值温度范围: [{outlier_temps.min():.2f}°C, {outlier_temps.max():.2f}°C]")
print(f" 高温异常值: {sum(outlier_temps > mean_temp + 3*std_temp)} 个")
print(f" 低温异常值: {sum(outlier_temps < mean_temp - 3*std_temp)} 个")

print(f"\n处理建议:")
print("1. 数据清洗:")
print(f" - 考虑移除 {len(consensus_outliers)} 个共识异常值")
print(f" - 或使用中位数填充: {df['temperature'].median():.2f}°C")

print("\n2. 可能的原因分析:")
if any(outlier_temps > 20):
print(" - 高温异常可能是传感器故障或夏季数据混入")
if any(outlier_temps < -15):
print(" - 低温异常可能是传感器故障或极端天气")

print("\n3. 下一步行动:")
print(" - 验证异常值是否真实(检查传感器日志)")
print(" - 考虑使用移动平均或指数平滑处理")
print(" - 建立温度异常预警系统")
else:
print("未检测到显著的异常值。")

# 9. 保存清洗后的数据(可选)
def clean_data(df, outlier_indices, method='remove'):
"""清理异常值"""
if method == 'remove':
cleaned_df = df.drop(outlier_indices).reset_index(drop=True)
elif method == 'median_fill':
cleaned_df = df.copy()
median_val = df['temperature'].median()
cleaned_df.loc[outlier_indices, 'temperature'] = median_val
elif method == 'mean_fill':
cleaned_df = df.copy()
mean_val = df['temperature'].mean()
cleaned_df.loc[outlier_indices, 'temperature'] = mean_val
elif method == 'interpolate':
cleaned_df = df.copy()
# 标记异常值为NaN,然后用插值填充
cleaned_df.loc[outlier_indices, 'temperature'] = np.nan
cleaned_df['temperature'] = cleaned_df['temperature'].interpolate()
else:
raise ValueError("不支持的清理方法")

return cleaned_df

# 示例:使用中位数填充异常值
cleaned_df = clean_data(df, consensus_outliers, method='median_fill')
print(f"\n数据清洗结果:")
print(f" 原始数据形状: {df.shape}")
print(f" 清洗后数据形状: {cleaned_df.shape}")
print(f" 移除/修改的异常值数量: {len(consensus_outliers)}")

# 保存清洗后的数据
cleaned_df.to_csv('cleaned_temperature_data.csv', index=False)
print("清洗后的数据已保存为 'cleaned_temperature_data.csv'")

这段代码提供了完整的温度异常值分析流程:

主要功能:

1. 数据生成:创建包含正常温度和异常值的模拟数据
2. 多种异常检测方法:
- Z-score方法(基于标准差)
- IQR方法(基于四分位距)
- Isolation Forest(孤立森林)
- LOF(局部离群因子)
3. 可视化分析:
- 温度分布直方图
- 箱线图
- 时间序列图
- 不同方法对比
- QQ图
- 正常值与异常值对比
4. 数据处理建议:
- 移除异常值
- 中位数/均值填充
- 插值填充
5. 结果保存:保存清洗后的数据

使用方法:

1. 安装所需库:

pip install numpy pandas matplotlib seaborn scikit-learn scipy

2. 替换模拟数据为你的实际温度数据
3. 根据需要调整参数(阈值、污染率等)
4. 运行代码获取分析结果

这段代码可以帮你快速识别温度数据中的异常值,并提供多种处理方案。

http://www.jsqmd.com/news/277513/

相关文章:

  • TrollInstallerX完整配置指南:iOS 14-16系统化安装与专业调试
  • Qwen-Image-2512-ComfyUI成本分析:月度GPU费用节省实测数据
  • DLSS指示器完整配置教程:5步实现游戏性能可视化监控
  • 小白也能用!BSHM镜像保姆级教程,人像抠图零基础入门
  • Glyph部署耗时太久?镜像加速优化实战教程
  • 如何快速掌握VDA5050协议:AGV智能调度终极指南
  • 网盘下载加速神器:5分钟掌握免登录直链解析技巧
  • GitHub加速神器:告别龟速下载,体验极速开发新境界
  • DLSS状态监控系统:专业玩家的性能可视化解决方案
  • 2026年评价高的大连散杂船出口品牌怎么选
  • Zotero-Better-Notes终极指南:5个技巧让文献管理变高效
  • 万物识别项目集成建议:API封装与系统对接方法
  • Amlogic S905L3-B设备Armbian系统部署终极指南
  • Live Avatar benchmark性能基准:4×4090与5×80GB实测对比表
  • Live Avatar适合中小企业吗?硬件门槛与替代方案建议
  • 设计师必备工具:Qwen-Image-Layered让创意自由编辑
  • Bilibili-Old:一键恢复经典B站界面,重拾怀旧播放体验
  • 闲置电视盒子终极改造指南:从娱乐设备到专业Linux服务器
  • MGeo+Jupyter:边调试边看结果超方便
  • 2026年可靠的DCMM价格公司哪家便宜?最新排行
  • Z-Image-Turbo镜像优势解析:为何要选预置权重版本?入门必看
  • fft npainting lama修复效果差?标注技巧与参数调优详解
  • 终极指南:8大云盘免登录高速下载神器完全解析
  • Topit:重塑Mac多任务体验的窗口管理艺术
  • MGeo实时地址校验系统搭建:高并发场景下的性能调优技巧
  • Amlogic设备Armbian系统改造终极指南:从闲置硬件到专业应用平台
  • 远程办公利器!Fun-ASR助力会议纪要生成
  • Z-Image-Turbo_UI体验报告:界面设计与用户体验点评
  • Bilibili旧版界面恢复终极指南:快速找回经典观影体验
  • MGeo在电信客户管理中的应用:多渠道地址信息融合实战