ml_edm:基于成本敏感的时间序列早期分类Python工具包详解
1. 项目概述
在工业监控、医疗诊断和金融风控这些领域,我们常常面对一个共同的困境:数据是随着时间一点点“流”进来的,但决策却不能等到所有数据都齐备了再做。比如,一台设备传感器传回的振动信号刚开始出现异常,你是立刻停机检修,还是再等等看,以防是误报?等,可能错过最佳维修窗口,导致灾难性故障;不等,又可能因为信息不足而“误诊”,造成不必要的生产中断。这个“等”与“不等”的权衡,就是早期决策(Early Decision Making)的核心。它不是一个简单的二选一,而是一个在决策准确性(Accuracy)和决策及时性(Earliness)之间寻找最优解的持续博弈。
传统的时序分析或机器学习模型,往往假设我们拥有完整的、固定长度的序列数据。但在真实世界里,这种“奢侈”的情况很少。更多时候,我们需要在数据流还在进行中,就做出尽可能靠谱的判断。这就是早期时间序列分类(Early Classification of Time Series, ECTS)要解决的难题。它要求模型不仅学得好,还要“学得快”,能在数据到达的早期阶段就触发决策,同时将误判和延迟带来的综合成本降到最低。
然而,ECTS的研究虽然火热,算法层出不穷,从经典的EDSC、ECTS到近年的ECONOMY-γ、CALIMERA,但实际应用的门槛却不低。每个算法都有自己的实现逻辑、评估标准,代码散落在各个论文的附录或个人仓库里,缺乏统一的框架。研究者想复现对比,工程师想落地应用,都得从头造轮子,费时费力。ml_edm这个Python工具包的出现,正是为了终结这种混乱。它像一个精心整理的“兵器库”,将主流的ECTS算法收纳其中,并用一套清晰、模块化、且与scikit-learn高度兼容的API呈现出来。无论你是想快速验证一个新想法,还是需要在生产系统中集成一个可靠的早期预警模块,ml_edm都试图为你提供一条捷径。
2. 核心设计理念与架构解析
ml_edm的设计哲学非常明确:模块化、兼容性和成本敏感驱动。它不是把某个算法黑盒化地丢给你,而是把早期决策这个复杂过程拆解成几个逻辑清晰的组件,让你既能开箱即用,也能自由组合甚至自定义。
2.1 模块化设计:分离“学”与“判”
绝大多数ECTS算法都可以抽象为两个核心阶段:
- 分类阶段:在任意一个可能的时间点
t,根据到目前为止观测到的部分时间序列X[:, :t],模型需要给出对各个类别的置信度(通常是概率分布)。这本质上是一个对可变长度序列的分类问题。 - 触发阶段:根据当前时刻的分类置信度、历史信息以及一个预设的成本模型,决定是立即做出最终决策,还是继续等待更多数据。
ml_edm将这两个阶段解耦,分别对应classification和trigger两大模块。这种设计带来了巨大的灵活性:
- 可替换的分类器:你可以使用任何scikit-learn兼容的分类器(如
RandomForestClassifier,HistGradientBoostingClassifier)或管道(Pipeline)作为基础分类器。ml_edm的ClassifiersCollection会为每个你关注的时间戳t克隆并训练一个独立的分类器。 - 可插拔的触发策略:你可以从内置的多种触发模型(如
EconomyGamma,EDSC)中选择,也可以基于统一的接口实现自己的触发逻辑。这让你可以轻松对比不同触发策略在相同分类基础上的效果。 - 支持端到端模型:对于像TEASER这类本身就将分类和触发联合优化的端到端模型,ml_edm将其整体实现为一个
trigger_model,此时分类模块可以省略。
2.2 Scikit-learn API兼容性:无缝融入现有工作流
如果你熟悉scikit-learn,那么上手ml_edm几乎零成本。它的核心对象,如EarlyClassifier,严格遵循了fit()、predict()、score()的范式。
fit(X_train, y_train): 训练模型。predict(X_test): 对测试序列进行早期分类预测,并返回每个序列的预测类别和做出决策的时刻。score(X_test, y_test): 评估模型,默认返回一个三元组(平均成本, 准确率, 平均决策时间)。这是评估早期决策模型的黄金标准。
这种设计意味着ml_edm可以轻松嵌入到基于scikit-learn的机器学习管道中,进行网格搜索超参数优化、交叉验证等,极大提升了实验和工程化的效率。
2.3 成本矩阵:驱动决策的“指挥棒”
这是ml_edm区别于其他时序库最核心、也最精髓的部分。早期决策的本质是一个序列决策问题,而任何决策都需要一个衡量标准。ml_edm通过CostMatrices对象将这一标准显式化和量化。
CostMatrices需要你定义两个核心成本:
- 误分类成本:一个
n_classes x n_classes的矩阵。misclf_cost[i, j]表示真实类别为i但预测为j的成本。对角线元素通常为0(预测正确无成本)。这允许你定义非对称成本,例如在医疗诊断中,将重症误判为轻症的成本,远高于将轻症误判为重症的成本。 - 延迟成本:一个关于时间
t的函数delay_cost(t)。它量化了仅仅因为等待到时间t才做决策所带来的损失。通常,这是一个关于t的单调递增函数,比如线性函数t / T_max,其中T_max是序列的最大可能长度。
注意:在实际项目中,精确获取这两个成本函数非常困难。ml_edm的作者建议,即使没有真实成本,也应基于业务逻辑进行合理假设(例如,使用对称的误分类成本和线性延迟成本)来构建一个模拟的“地面真理”。因为只有这样,模型的优化目标才是明确且一致的。没有成本矩阵,早期决策就失去了优化的方向。
3. 核心模块深度使用指南
理解了设计理念,我们来深入看看每个模块的具体使用方法和背后的考量。
3.1 成本矩阵的构建与实战意义
构建CostMatrices是使用ml_edm的第一步,也是最需要结合业务思考的一步。
import numpy as np from ml_edm.cost_matrices import CostMatrices # 假设一个三分类问题(例如:设备状态:正常、预警、故障) n_classes = 3 max_T = 100 # 序列最大长度 timestamps = np.arange(10, max_T + 1, 10) # 我们关心第10, 20, ..., 100时刻的决策 # 1. 定义误分类成本矩阵 # 这里我们定义一个非对称成本:将“故障”误判为“正常”的成本最高。 misclf = np.array([ [0, 1, 5], # 真实为“正常”:误判为预警成本1,误判为故障成本5 [1, 0, 3], # 真实为“预警”:误判为正常成本1,误判为故障成本3 [5, 3, 0] # 真实为“故障”:误判为正常成本5,误判为预警成本3 ]) # 2. 定义延迟成本函数 # 采用线性延迟,假设每等待一个单位时间,成本增加 1/max_T。 # 也可以使用指数函数模拟成本加速增长,如 `lambda t: (t / max_T) ** 2` def linear_delay(t): return t / max_T # 3. 实例化成本矩阵对象 cost_matrices = CostMatrices( timestamps=timestamps, n_classes=n_classes, misclf_cost=misclf, delay_cost=linear_delay )实操心得:
timestamps参数至关重要。它定义了模型在哪些时间点“被允许”做出决策。通常,我们不会在每一个时间步都做决策,那样计算开销太大。选择关键的时间点(如等间隔采样,或基于业务知识的特定时间点)能极大提升效率。timestamps也直接决定了ClassifiersCollection需要训练多少个分类器。- 延迟成本函数的选择直接影响模型的“急躁”程度。线性成本下,模型对延迟的厌恶是均匀的。如果你希望模型更倾向于早期决策,可以使用凸函数(如二次函数)让延迟成本增长更快。
3.2 分类策略:应对可变长度序列
对于非端到端的ECTS算法,我们需要一个策略来处理不同长度的输入序列。ClassifiersCollection是最直接的方法。
from aeon.datasets import load_classification from sklearn.ensemble import HistGradientBoostingClassifier from sklearn.preprocessing import StandardScaler from sklearn.pipeline import make_pipeline from ml_edm.classification import ClassifiersCollection # 加载示例数据 X_train, y_train, _ = load_classification("GunPoint", split="train") X_test, y_test, _ = load_classification("GunPoint", split="test") # 创建一个scikit-learn管道作为基础分类器 # 注意:时间序列数据可能需要特征工程,这里简单使用标准化。 base_pipe = make_pipeline( StandardScaler(), HistGradientBoostingClassifier(random_state=42) ) # 实例化分类器集合 # 它会为 cost_matrices.timestamps 中的每个时间点训练一个独立的分类器 collection_clf = ClassifiersCollection( base_classifier=base_pipe, timestamps=cost_matrices.timestamps # 使用成本矩阵中定义的时间点 ) # 训练 # 可以传入 cost_matrices,某些分类策略可能会利用成本信息进行代价敏感学习 collection_clf.fit(X_train, y_train, cost_matrices=cost_matrices) # 验证每个时间点分类器的独立性能 for t in cost_matrices.timestamps: X_test_truncated = X_test[:, :t] # 截取到时间t的测试数据 acc = collection_clf.score(X_test_truncated, y_test, t=t) print(f"Time {t:3d}: Accuracy = {acc:.4f}")注意事项:
- 维度匹配:
X_train的形状通常是(n_samples, sequence_length, n_features)。对于单变量时间序列,n_features=1。ClassifiersCollection在内部会根据指定的t自动将(n_samples, sequence_length, ...)截取/处理为(n_samples, t, ...)以供基础分类器使用。你需要确保你的基础分类器能处理这种格式,或者通过自定义转换器将其展平为(n_samples, t * n_features)。 - 内存与计算:如果
timestamps很多(例如100个),且基础分类器很复杂(如深度学习模型),训练ClassifiersCollection会消耗大量资源和时间。prefit_classifiers=True参数允许你在构建EarlyClassifier时传入已训练好的collection_clf,方便模型缓存和复用。 - 特征工程:对于时间序列,直接使用原始点作为特征可能不够。
aeon工具包(ml_edm的依赖之一)提供了丰富的时序特征提取器(如Catch22、TSFresh特征),你可以将其集成到base_classifier的管道中,以提升分类性能。
3.3 触发模型:决策时机的掌控者
触发模型是早期决策的“大脑”,它根据实时信息决定何时停止等待。ml_edm实现了多种策略,下表对比了它们的关键特性:
| 触发模型 | 核心原理 | 是否端到端 | 支持并行 | 适用场景 |
|---|---|---|---|---|
| ProbabilityThreshold | 当任一类别概率超过固定阈值时触发。 | 否 | 是 | 基线方法,简单快速,可作为对比基准。 |
| EDSC | 基于“最早可预测时间”概念,寻找分类置信度足够高的最早点。 | 否 | 是 | 追求稳定性和可解释性,适用于置信度明确的场景。 |
| ECDIRE | 动态规划方法,直接优化整个序列决策过程的期望成本。 | 否 | 是 | 理论最优(在给定分类器和成本下),但计算量相对较大。 |
| ECONOMY-γ | 引入“γ-容忍度”概念,允许预测概率在一定范围内波动,平衡准确性与时效性。 | 否 | 是 | 当前主流方法之一,在多个基准数据集上表现鲁棒,是很好的默认选择。 |
| CALIMERA | 较新的方法,专注于学习类别在时间上的判别性模式。 | 否 | 是 | 当不同类别的判别性特征出现在不同时间阶段时,可能有优势。 |
| TEASER | 端到端神经网络,联合学习特征表示、分类和触发策略。 | 是 | 否 | 数据量充足时,可能获得更好的性能,但可解释性较差,训练复杂。 |
from ml_edm.trigger import EconomyGamma, EDSC, ProbabilityThreshold from ml_edm.early_classifier import EarlyClassifier # 方案一:使用 ECONOMY-γ 触发策略 trigger_eg = EconomyGamma(gamma=0.1) # gamma是容忍度参数,需调优 # 方案二:使用经典的 EDSC 策略 trigger_edsc = EDSC(threshold=0.95) # threshold是置信度阈值 # 方案三:使用简单的概率阈值策略 trigger_pt = ProbabilityThreshold(threshold=0.9) # 构建早期分类器 early_clf = EarlyClassifier( chronological_classifiers=collection_clf, # 使用我们之前训练好的分类器集合 trigger_model=trigger_eg, # 选择触发模型 cost_matrices=cost_matrices, # 传入成本矩阵 prefit_classifiers=True # 指明分类器已预训练 ) # 如果分类器未预训练,也可以直接调用 fit,它会自动训练分类器和触发模型 # early_clf.fit(X_train, y_train) # 进行预测 predictions, decision_times = early_clf.predict(X_test, return_decision_time=True) print(f"预测结果: {predictions[:5]}") print(f"决策时刻: {decision_times[:5]}") # 综合评估 avg_cost, accuracy, earliness = early_clf.score(X_test, y_test) print(f"\n综合评估:") print(f" 平均决策成本: {avg_cost:.4f}") print(f" 最终准确率: {accuracy:.4f}") print(f" 平均决策时间(占全长比): {earliness:.4f}")参数调优经验:
EconomyGamma的gamma参数是关键。gamma越大,模型对概率波动越“容忍”,倾向于更早做出决策(但可能牺牲准确率);gamma越小,则越保守,倾向于等待更确定的信息。通常需要通过交叉验证在验证集上寻找最优值。EDSC和ProbabilityThreshold的threshold参数直观但敏感。过高的阈值可能导致模型迟迟不决策(延迟成本激增),过低的阈值则导致过早的误判。建议结合成本矩阵进行网格搜索。- 对于端到端模型如
TEASER,你需要关注网络结构、学习率等深度学习超参数,其训练方式也与传统方法不同,通常直接调用fit。
4. 完整项目实战:工业设备故障早期预警
我们用一个模拟的工业设备振动信号预警场景,串联起ml_edm的完整使用流程。假设我们有3种设备状态:正常(Normal)、轻微磨损(Warning)、严重故障(Failure)。
4.1 数据准备与模拟
我们使用aeon或sktime来生成或加载一个多变量时间序列分类数据集进行模拟。这里为了演示,我们创建一个简单的合成数据集。
import numpy as np from sklearn.model_selection import train_test_split from aeon.datasets import make_example_3d_numpy # 生成一个模拟的3分类时间序列数据集 # X形状: (n_instances, n_timestamps, n_features) # y形状: (n_instances,) X, y = make_example_3d_numpy( n_cases=1000, # 1000个样本 n_timepoints=150, # 每个序列150个时间点 n_channels=5, # 5个特征通道(模拟5个传感器) n_classes=3, # 3个类别 random_state=42, return_y=True ) # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) print(f"训练集形状: {X_train.shape}, 测试集形状: {X_test.shape}")4.2 定义业务导向的成本矩阵
这是最具业务色彩的一步。假设业务规则如下:
- 将“严重故障”误判为“正常”代价最高(成本10),因为这会导致未预警的停机。
- 将“正常”误判为“故障”代价次之(成本5),因为这会导致不必要的停机检查。
- 延迟决策的成本随时间线性增长,最大延迟成本为1。
max_T = X.shape[1] # 序列最大长度 150 timestamps = np.arange(30, max_T + 1, 15) # 从第30个时间点开始,每15个点评估一次 n_classes = 3 # 定义误分类成本矩阵 (真实类别为行,预测类别为列) # 顺序: [Normal, Warning, Failure] misclf_cost = np.array([ [0, 2, 5], # 真实Normal: 误判Warning成本2,误判Failure成本5 [3, 0, 4], # 真实Warning: 误判Normal成本3,误判Failure成本4 [10, 6, 0] # 真实Failure: 误判Normal成本10,误判Warning成本6 ]) # 定义线性延迟成本 def delay_cost(t): return t / max_T # 标准化到[0,1]区间 from ml_edm.cost_matrices import CostMatrices cost_matrices = CostMatrices( timestamps=timestamps, n_classes=n_classes, misclf_cost=misclf_cost, delay_cost=delay_cost )4.3 构建并训练早期分类管道
我们选择HistGradientBoostingClassifier作为基础分类器,并使用EconomyGamma作为触发策略。
from sklearn.ensemble import HistGradientBoostingClassifier from sklearn.preprocessing import StandardScaler from sklearn.pipeline import make_pipeline from ml_edm.classification import ClassifiersCollection from ml_edm.trigger import EconomyGamma from ml_edm.early_classifier import EarlyClassifier # 1. 创建基础分类管道(包含标准化) base_clf = make_pipeline( StandardScaler(), HistGradientBoostingClassifier( max_iter=200, learning_rate=0.05, max_depth=5, random_state=42 ) ) # 2. 创建分类器集合 print("训练分类器集合...") collection_clf = ClassifiersCollection( base_classifier=base_clf, timestamps=cost_matrices.timestamps ) # 训练可能需要一些时间,因为要训练 len(timestamps) 个分类器 collection_clf.fit(X_train, y_train, cost_matrices=cost_matrices) # 3. 创建触发模型并组装早期分类器 print("配置早期分类器...") trigger = EconomyGamma(gamma=0.15) # 初始化一个gamma值 early_clf = EarlyClassifier( chronological_classifiers=collection_clf, trigger_model=trigger, cost_matrices=cost_matrices, prefit_classifiers=True ) # EarlyClassifier 的 fit 方法主要用来训练触发模型的参数(如果需要) # 由于我们用了预训练的分类器且EconomyGamma无需额外训练,这里可以简单调用或不调用 # early_clf.fit(X_train, y_train) # 如果触发模型需要训练,则调用4.4 模型评估与结果分析
现在,我们在测试集上评估这个早期预警系统的综合性能。
print("在测试集上进行评估...") avg_cost, accuracy, earliness = early_clf.score(X_test, y_test) print("="*50) print("评估报告") print("="*50) print(f"平均决策成本: {avg_cost:.4f}") print(f"最终决策准确率: {accuracy:.4f}") print(f"平均决策时间点 (标准化): {earliness:.4f} (约在第 {int(earliness * max_T)} 个时间点)") print("="*50) # 查看前几个样本的具体决策情况 preds, decision_times = early_clf.predict(X_test[:5], return_decision_time=True) print("\n前5个测试样本的决策详情:") for i, (true_label, pred_label, d_time) in enumerate(zip(y_test[:5], preds, decision_times)): status = "正确" if true_label == pred_label else "错误" print(f"样本{i}: 真实[{true_label}] -> 预测[{pred_label}] ({status}) | 决策于时间点 {d_time}")结果解读:
- 平均决策成本:这是核心指标,综合了误判和延迟的代价。我们的优化目标就是最小化这个值。你可以通过调整成本矩阵的定义或触发模型的参数(如
gamma)来优化它。 - 最终决策准确率:与等到序列结束再做决策的传统分类器准确率可比。在早期决策框架下,这个值通常会略低,因为我们用一定的准确率换取了时间。
- 平均决策时间:反映了系统的“敏捷度”。值越小,说明系统平均做出决策的时间越早。
4.5 与“等到最后”策略的对比
为了凸显早期决策的价值,我们最好有一个基线对比。最直接的基线就是等到所有数据都接收完(t = max_T)再做决策的传统分类器。
from sklearn.metrics import accuracy_score, classification_report # 训练一个使用完整序列的传统分类器 full_seq_clf = make_pipeline( StandardScaler(), HistGradientBoostingClassifier(max_iter=200, random_state=42) ) # 需要将3D数据重塑为2D以供标准分类器使用 (n_samples, n_timestamps * n_features) n_samples_train, n_timestamps, n_features = X_train.shape X_train_flat = X_train.reshape(n_samples_train, -1) X_test_flat = X_test.reshape(X_test.shape[0], -1) full_seq_clf.fit(X_train_flat, y_train) y_pred_full = full_seq_clf.predict(X_test_flat) acc_full = accuracy_score(y_test, y_pred_full) print("\n与‘等到最后’策略对比:") print(f" 传统分类器(使用完整序列)准确率: {acc_full:.4f}") print(f" 早期分类器最终准确率: {accuracy:.4f}") print(f" 准确率差异: {accuracy - acc_full:.4f}") print(f" 但早期分类器平均在 {int(earliness * max_T)}/{max_T} 时间点就做出了决策!")这个对比能清晰地展示,我们牺牲了可能的一点点准确率(有时甚至可能更高,因为早期决策避免了尾部噪声),但换来了大幅提前的决策时间,这对于故障预警、金融交易等场景的价值是巨大的。
5. 高级技巧、避坑指南与常见问题
在实际使用ml_edm的过程中,你会遇到一些挑战。以下是我从实战中总结的经验。
5.1 性能优化与并行计算
训练多个时间点的分类器是计算密集型的。ml_edm的ClassifiersCollection和大多数Trigger模型都支持并行化。
# 在实例化时指定 n_jobs 参数来启用并行 collection_clf_parallel = ClassifiersCollection( base_classifier=base_clf, timestamps=timestamps, n_jobs=-1 # 使用所有可用的CPU核心 ) # 对于触发模型,如EDSC,也支持并行训练 trigger_parallel = EDSC(threshold=0.95, n_jobs=4)注意:并行化会显著增加内存消耗。如果数据集很大或分类器很复杂,建议先在小样本或部分时间戳上测试,或者使用
n_jobs=2或4而非-1来控制资源使用。
5.2 处理多变量与不规则时间序列
当前版本的ml_edm主要面向规整的、单变量时间序列。如果你的数据是多变量的(例如多个传感器),需要确保你的基础分类器能够处理3D输入(n_samples, n_timestamps, n_features)。像HistGradientBoostingClassifier这样的模型不能直接处理,你需要:
- 展平:在分类管道开始处加入一个
ReshapeTransformer或自定义转换器,将(n_samples, n_timestamps, n_features)变为(n_samples, n_timestamps * n_features)。 - 使用专门模型:使用能够处理3D输入的模型,例如通过
aeon库中的TimeSeriesForestClassifier,或者使用深度学习框架(如PyTorch)构建网络,并将其包装成scikit-learn接口。
对于不规则采样的时间序列,ml_edm目前没有内置处理功能。一个常见的预处理方法是将其插值到规整的时间网格上,或者使用能够处理不规则序列的模型(如基于RNN或Attention的模型)作为基础分类器。
5.3 触发模型的选择与调参
没有“最好”的触发模型,只有“最适合”的。
- 追求简单和可解释:从
ProbabilityThreshold或EDSC开始。它们的参数(阈值)有明确的业务含义。 - 追求综合性能:
ECONOMY-γ和CALIMERA是很好的默认选择,它们在学术基准测试中表现稳健。ECONOMY-γ的gamma参数需要通过交叉验证调整。 - 数据充足且追求极致性能:可以考虑端到端的
TEASER,但要做好应对更长训练时间和更高调参复杂度的准备。 - 理论研究或需要理论保证:
ECDIRE提供了在给定分类器性能下的理论最优解,尽管计算开销大,但作为性能上界很有参考价值。
调参建议:使用GridSearchCV或RandomizedSearchCV对触发模型的超参数(如gamma,threshold)进行搜索。关键是要使用与最终评估一致的成本矩阵来定义搜索的评分标准。你可以自定义一个评分函数,使其返回early_clf.score()中的avg_cost(平均成本),因为这才是早期决策的终极优化目标。
5.4 常见错误与排查
维度错误:
ValueError: Found array with dim 3. Estimator expected <= 2.- 原因:基础分类器(如sklearn的SVM、GBDT)无法直接处理3D时间序列数据。
- 解决:在构建分类管道时,第一步必须是降维/展平操作,例如使用
FunctionTransformer进行展平,或者使用aeon的特征提取器。
时间戳不匹配:
ValueError: The timestamps ...- 原因:
CostMatrices、ClassifiersCollection和EarlyClassifier中使用的timestamps不一致。 - 解决:始终使用同一个
timestamps数组,最好将其定义为一个变量并传递给各个组件。
- 原因:
成本矩阵未定义:
TypeError: __init__() missing 1 required positional argument: 'cost_matrices'- 原因:实例化
EarlyClassifier时忘记传入cost_matrices参数。 - 解决:确保在构建
EarlyClassifier时提供了定义好的CostMatrices对象。
- 原因:实例化
预测结果全是NaN或决策时间异常:
- 原因:可能触发了模型的“拒绝决策”机制(如果实现的话),或者延迟成本设置得极低,导致模型倾向于无限期等待(现实中,通常会设置一个最大等待时间约束)。
- 排查:检查成本矩阵,确保延迟成本函数是单调递增的。检查触发模型的参数(如阈值是否设得过高)。可以打印出模型在预测过程中各个时间点的置信度来调试。
5.5 模型保存与部署
ml_edm的模型对象(EarlyClassifier,ClassifiersCollection)基于scikit-learn的基类,因此可以使用标准工具进行序列化。
import joblib # 保存模型 joblib.dump(early_clf, 'early_fault_detector.pkl') # 加载模型 loaded_clf = joblib.load('early_fault_detector.pkl') # 在线预测(模拟数据流) def online_predict(model, data_stream): """模拟在线数据流预测""" accumulated_data = [] for i, new_data_point in enumerate(data_stream): accumulated_data.append(new_data_point) current_sequence = np.array(accumulated_data).reshape(1, -1, n_features) # 构造成3D # 注意:在线使用时,模型会基于当前累计序列长度,在内部选择合适的时间戳分类器进行预测和触发判断。 prediction, decision_flag = model.predict(current_sequence, return_early_decision_flag=True) if decision_flag: return prediction[0], i+1 # 返回预测结果和决策时刻 return None, len(data_stream) # 如果始终未触发,返回None和最终长度 # 模拟一个测试样本的数据流 test_stream = X_test[0] # 形状 (150, 5) pred_label, pred_time = online_predict(loaded_clf, test_stream) print(f"在线预测: 在第 {pred_time} 个时间点触发决策,预测类别为 {pred_label}")这个在线预测函数展示了如何将训练好的早期分类器部署到一个实时数据流系统中。模型会持续监控累积的数据,并在其内部触发机制认为时机成熟时立即输出预测,从而实现真正的早期预警。
