当前位置: 首页 > news >正文

基于图神经网络的交通流量预测与信号灯协同控制,当图神经网络遇上交通信号灯:我们如何用AI让城市告别堵车

目录

为什么交通网络天然就是一张图

从数据到图:我们需要准备什么

图神经网络的几个关键变体

从零搭建预测系统

第一步:模拟路网和流量数据

第二步:构建时序样本

第三步:设计时空图神经网络

第四步:训练模型

第五步:从预测到信号灯协同控制

第六步:可视化与结果分析


为什么交通网络天然就是一张图

我们先停下来想一想。城市道路交叉口之间是什么关系?

相邻路口之间会相互影响。一个路口放行,车流会在几分钟内到达下一个路口。这种影响沿着道路传播,形成一张有向图。每个路口是图上的一个节点,道路是连接节点的边。

这个结构太适合用图神经网络来建模了。

传统的卷积神经网络处理的是规则的网格数据——图像就是典型的网格结构。但交通网络不是网格,它的拓扑结构是不规则的:有的路口连接3条路,有的连接5条路,高速公路出入口更是形态各异。

图神经网络的核心思想很简单:每个节点的信息,由它自己和邻居节点的信息共同决定。翻译成交通语言就是:某个路口的交通状态,取决于这个路口本身的状态,以及周边几个路口的状态

这个直觉跟现实完全吻合。路口A的拥堵,很快就会波及路口B和路口C。如果我们想预测一个路口未来5分钟的流量,只看这个路口的历史数据是不够的,还必须看它上游路口放了多少车出来。

从数据到图:我们需要准备什么

在动手写代码之前,先理清楚我们需要什么数据。

路网拓扑结构是最基础的。每个路口的ID,以及路口之间的连接关系。如果有方向信息就更好了——左转、直行、右转对应的下游路口可能是三个不同的节点。

历史流量数据是驱动模型的燃料。常见的采集方式包括地磁线圈、雷达微波、卡口电警,现在越来越多的城市开始用雷视一体机。数据格式通常是:时间戳、路口ID、车道方向、流量值(比如每5分钟通过多少辆车)。

信号配时方案如果也能拿到,效果会更好。不过很多公开数据集不包含这部分信息,初期可以先把信号灯的影响当作隐变量来处理。

对于新手来说,可以先从公开数据集入手。METR-LA和PEMS-BAY是非常经典的两个交通速度数据集,由加州交通局收集。PEMS系列还有多个版本(PEMS03、PEMS04、PEMS07、PEMS08),覆盖了加州不同的高速公路和动脉道路。国内的话,深圳、杭州、上海都有一些公开的交通流数据集,不过需要申请。

为了这次演示,我们会模拟一个简单但完整的数据结构,让你看清楚每一步是怎么做的。模拟不是糊弄,而是因为真实数据的处理流程动辄几百G,放在一篇文章里根本讲不完。你把模拟数据替换成真实数据,代码一行都不用改。

图神经网络的几个关键变体

先快速梳理一下图神经网络的主流模型,理解它们各自的侧重点,对我们后面选模型有帮助。

GCN(图卷积网络)是最经典的版本。它的本质是把邻居节点的特征加权求和,然后通过一个线性变换。你可以理解为:每个路口的新特征 = 自己特征 × 权重 + 邻居特征平均 × 权重。GCN假设所有邻居的重要性是一样的,这一点在交通场景下不一定合理——直行过来的车流和左转过来的车流对路口的影响显然不同。

GAT(图注意力网络)解决了这个问题。它为每个邻居节点学习一个注意力权重,让模型自己决定哪个邻居更重要。在交通场景中,主干道方向的邻居路口显然应该获得更高的注意力权重。GAT近年来在交通预测任务上表现很好。

Gated GCN引入了门控机制,可以更好地控制信息的流动。它在某些数据集上表现优于GCN和GAT,但模型参数更多,训练也更慢。

还有一些专门为交通预测设计的模型,比如DCRNN(扩散卷积循环神经网络)将交通流建模为扩散过程,既考虑上游到下游的影响(正向扩散),也考虑下游到上游的影响(反向扩散),加上序列模型捕捉时间依赖,是交通预测领域的baseline级模型。STGCN(时空图卷积网络)用图卷积处理空间维度,用一维卷积处理时间维度,结构简洁高效。Graph WaveNet比较新,引入了自适应邻接矩阵,可以自动学习节点之间隐藏的空间依赖——这对真实交通网络来说非常实用,因为有些路口虽然没有道路直接相连,但在功能上高度相关(比如绕城高速上的两个远端出入口)。

这篇博客我们选择GAT作为主要模型,因为它的注意力机制特别适合交通场景,而且代码相对清晰易懂。后续你完全可以用DCRNN或Graph WaveNet替换它,框架不变。

从零搭建预测系统

我们把整个过程拆成六个步骤。每个步骤都有完整可运行的代码和逐行解释。

第一步:模拟路网和流量数据

python

import numpy as np import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from torch_geometric.nn import GATConv from torch_geometric.data import Data as GeometricData import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_absolute_error, mean_squared_error import warnings warnings.filterwarnings('ignore') # 设置随机种子保证可复现 np.random.seed(42) torch.manual_seed(42) # 1. 模拟路网:10个路口,形成一条主干道加几条分支 # 为了让图变得更真实,我们构建一个类似真实城市片区的拓扑 n_nodes = 10 # 定义边:从节点i到节点j(有向) # 主干道:0->1->2->3->4->5 # 分支:6->2, 7->3, 8->4, 9->5 # 反向也有车流(双向道路) edges = [ # 主干道正向 (0,1), (1,2), (2,3), (3,4), (4,5), # 主干道反向 (5,4), (4,3), (3,2), (2,1), (1,0), # 分支汇入主干道 (6,2), (7,3), (8,4), (9,5), # 主干道分流到分支(假设分支也可以驶出) (2,6), (3,7), (4,8), (5,9) ] edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous() # 2. 模拟时间序列数据 # 时间步长:5分钟一个点,一周的数据(2016个点) n_timesteps = 2016 # 7天 * 24小时 * 12个5分钟 n_features = 3 # 流量、平均速度、占有率 # 每个路口的特征:流量(veh/5min)、速度(km/h)、占有率(%) # 用正弦波模拟早晚高峰 + 随机噪声 + 上下游相关性 t = np.arange(n_timesteps) # 早晚高峰模式:早上8点(第96个5分钟)和下午6点(第216个5分钟)出现峰值 hour_of_day = (t % 288) / 12 # 一天288个5分钟,转换成小时 morning_peak = np.exp(-((hour_of_day - 8) ** 2) / 8) evening_peak = np.exp(-((hour_of_day - 18) ** 2) / 10) daily_pattern = morning_peak + evening_peak X = np.zeros((n_nodes, n_timesteps, n_features)) for node in range(n_nodes): # 主干道节点(0-5)流量更大 base_flow = 50 if node <= 5 else 20 # 加入日模式 flow_pattern = base_flow * daily_pattern # 加入上下游影响:上游节点车流到达下游会有延时 if node > 0 and node <= 5: # 上游节点的车流延时2-3个时间步到达 delay = np.random.randint(2, 4) upstream_flow = np.roll(flow_pattern, delay) upstream_flow[:delay] = 0 flow_pattern = flow_pattern + 0.3 * upstream_flow # 添加随机噪声 noise = np.random.normal(0, 5, n_timesteps) flow = np.maximum(0, flow_pattern + noise) # 速度与流量成反比 speed = np.maximum(20, 60 - flow / 3 + np.random.normal(0, 3, n_timesteps)) # 占有率与流量成正比 occupancy = np.minimum(100, flow / 1.5 + np.random.normal(0, 5, n_timesteps)) X[node, :, 0] = flow X[node, :, 1] = speed X[node, :, 2] = occupancy

稍微解释一下这段模拟逻辑。我们用正弦波模拟了一天之内早晚两个高峰,主干道的基础流量是分支的两倍多。为了让数据符合交通常识,上游节点产生的车流经过几个时间步的延时会影响下游——这就是交通流传播的本质。速度跟流量成反比,车越多开得越慢,这也是基本交通流理论。

实际项目中你不会需要这段模拟代码,直接加载真实数据即可。格式只需要是 (节点数, 时间步数, 特征数) 的三维数组。

第二步:构建时序样本

python

# 处理数据:使用过去12个时间步(1小时)预测未来6个时间步(30分钟) history_len = 12 future_len = 6 def create_sequences(data, history_len, future_len): """ data: (n_nodes, n_timesteps, n_features) 返回: X (样本数, 历史步数, 节点数, 特征数) y (样本数, 未来步数, 节点数) # 预测流量 """ n_nodes, n_timesteps, n_features = data.shape X_list, y_list = [], [] for i in range(n_timesteps - history_len - future_len + 1): X_seq = data[:, i:i+history_len, :] # (n_nodes, history_len, n_features) # 预测的是未来每个节点的流量 y_seq = data[:, i+history_len:i+history_len+future_len, 0] # (n_nodes, future_len) X_list.append(X_seq) y_list.append(y_seq) return np.array(X_list), np.array(y_list) X_seq, y_seq = create_sequences(X, history_len, future_len) # X_seq shape: (样本数, 节点数, 历史步数, 特征数) # y_seq shape: (样本数, 节点数, 未来步数) # 重排维度便于输入模型 # 我们希望每个样本的维度是 (节点数, 历史步数, 特征数) # 但pytorch习惯将batch放在第一维,所以我们保持现在的维度 # 不过GAT期望的输入是 (所有节点, 特征),我们需要进一步处理 # 归一化 node_wise_scalers = {} X_normalized = np.zeros_like(X_seq) for node in range(n_nodes): scaler = StandardScaler() # 提取该节点所有样本的所有历史步和所有特征 node_data = X_seq[:, node, :, :].reshape(-1, n_features) scaler.fit(node_data) node_wise_scalers[node] = scaler X_normalized[:, node, :, :] = scaler.transform(node_data).reshape(-1, history_len, n_features) # 对y也做归一化,但用同一个scaler(只针对流量特征) y_normalized = np.zeros_like(y_seq) for node in range(n_nodes): # 流量是第0个特征 node_y_data = y_seq[:, node, :].reshape(-1, 1) # 使用对应的X的scaler(流量特征的均值和标准差) y_normalized[:, node, :] = node_wise_scalers[node].transform(node_y_data).reshape(-1, future_len) # 转换为torch tensor X_tensor = torch.FloatTensor(X_normalized) # (样本数, 节点数, 历史步数, 特征数) y_tensor = torch.FloatTensor(y_normalized) # (样本数, 节点数, 未来步数) print(f"数据形状: {X_tensor.shape}, {y_tensor.shape}")

标准化这一步很多人容易踩坑。注意我们是对每个节点分别做标准化,而不是全局统一。因为不同路口的流量量级差别很大——主干道可能有1000辆车,支路可能只有100辆,混在一起标准化会让小流量的路口损失信号。同时,y要用相同的scaler转换,这样预测出来的值才能反向变换回真实的车辆数。

第三步:设计时空图神经网络

这是整个系统的核心。我们需要同时处理空间依赖(路口之间的关系)和时间依赖(历史流量如何影响未来)。

python

class SpatialTemporalGAT(nn.Module): """ 时空图注意力网络 - 空间维度:GAT捕捉路口间关系 - 时间维度:GRU捕捉时序依赖 """ def __init__(self, n_nodes, in_features, hidden_dim, out_features, n_heads=4, dropout=0.3): super(SpatialTemporalGAT, self).__init__() self.n_nodes = n_nodes self.hidden_dim = hidden_dim self.in_features = in_features # 用于将历史时间步编码的MLP self.time_encoder = nn.Linear(history_len, hidden_dim) # 图注意力层(空间聚合) # 输入特征维度是 hidden_dim,输出也是 hidden_dim self.gat1 = GATConv(hidden_dim, hidden_dim, heads=n_heads, dropout=dropout, concat=True) self.gat2 = GATConv(hidden_dim * n_heads, hidden_dim, heads=1, dropout=dropout, concat=False) # 时序建模:GRU self.gru = nn.GRU( input_size=hidden_dim, hidden_size=hidden_dim, num_layers=2, dropout=dropout, batch_first=True ) # 输出层:预测未来时间步 self.output_layer = nn.Sequential( nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim // 2, future_len) ) # 残差连接 self.residual = nn.Linear(in_features, future_len) if in_features != future_len else nn.Identity() def forward(self, x, edge_index): """ x: (batch_size, n_nodes, history_len, n_features) edge_index: (2, n_edges) """ batch_size = x.shape[0] n_nodes = self.n_nodes # 第一步:将每个节点上的历史信息编码成特征向量 # 把 (history_len, n_features) 展平,或者用MLP聚合 # 更优雅的做法:先对每个时间步分别做GAT,但效率太低 # 我们采用:将历史时间步看做时间通道,用1x1卷积或线性层压缩 # 将最后两维合并:(batch, n_nodes, history_len * n_features) x_reshaped = x.view(batch_size, n_nodes, -1) # 降维到 hidden_dim x_encoded = torch.relu(self.time_encoder(x_reshaped.transpose(1,2)).transpose(1,2)) # x_encoded: (batch, n_nodes, hidden_dim) # 空间图传播:对每个batch独立处理,但GAT需要节点特征矩阵 # 我们将batch和节点合并,或者逐个batch处理 # torch_geometric的GATConv期望输入 (所有节点, 特征) # 这里我们逐个batch处理(更清晰) spatial_outputs = [] for b in range(batch_size): h = x_encoded[b] # (n_nodes, hidden_dim) # GAT层1 h = F.elu(self.gat1(h, edge_index)) # GAT层2 h = F.elu(self.gat2(h, edge_index)) spatial_outputs.append(h) x_spatial = torch.stack(spatial_outputs, dim=0) # (batch, n_nodes, hidden_dim) # 时序建模:将每个节点看作一个时间序列 # 我们需要在时间维度上建模,但现在没有时间步了,我们只有聚合后的hidden_dim # 真正的时序建模应该在GAT之前或之后? # 改进:先对每个时间步独立做GAT,然后将时间序列输入GRU # 上面简化了,让我们实现一个更正确的版本: # 对每个时间步分别应用GAT,得到序列 (batch, n_nodes, history_len, hidden_dim) # 然后对每个节点用GRU建模时序 return self.forward_v2(x, edge_index) def forward_v2(self, x, edge_index): """ 更合理的时空建模: 1. 对每个时间步独立做GAT,提取空间特征 2. 对每个节点将时间序列输入GRU 3. 输出预测 """ batch_size, n_nodes, hist_len, n_feats = x.shape # 对每个时间步独立做GAT spatial_seq = [] for t in range(hist_len): x_t = x[:, :, t, :] # (batch, n_nodes, n_feats) # 逐个batch处理 gat_outputs = [] for b in range(batch_size): h = x_t[b] # (n_nodes, n_feats) h = F.elu(self.gat1(h, edge_index)) h = F.elu(self.gat2(h, edge_index)) gat_outputs.append(h) x_t_spatial = torch.stack(gat_outputs, dim=0) # (batch, n_nodes, hidden_dim) spatial_seq.append(x_t_spatial) # (hist_len, batch, n_nodes, hidden_dim) -> (batch, n_nodes, hist_len, hidden_dim) spatial_seq = torch.stack(spatial_seq, dim=2) # (batch, n_nodes, hist_len, hidden_dim) # 对每个节点,将其时间序列输入GRU # 调整维度为 (batch * n_nodes, hist_len, hidden_dim) batch_nodes = batch_size * n_nodes spatial_seq_reshaped = spatial_seq.view(batch_nodes, hist_len, -1) gru_out, _ = self.gru(spatial_seq_reshaped) # (batch_nodes, hist_len, hidden_dim) # 取最后一个时间步的输出 last_out = gru_out[:, -1, :] # (batch_nodes, hidden_dim) # 预测未来流量序列 predictions = self.output_layer(last_out) # (batch_nodes, future_len) predictions = predictions.view(batch_size, n_nodes, future_len) return predictions # 初始化模型 model = SpatialTemporalGAT( n_nodes=n_nodes, in_features=history_len * n_features, # 实际在forward_v2中没这么用,但为了接口一致保留 hidden_dim=64, out_features=future_len, n_heads=4, dropout=0.2 ) # 计算参数量 total_params = sum(p.numel() for p in model.parameters()) print(f"模型参数量: {total_params:,}")

这个模型设计有一个关键点:我们对每个时间步独立运行GAT,而不是先压缩时间维度。为什么?因为路口之间的空间关系可能随着交通流量的变化而动态改变。早高峰期间,某些连接(比如从住宅区到主干道的入口)变得更加重要;晚高峰则反过来。如果先把时间步压缩成一个向量,这些动态信息就丢失了。

当然,这样做的代价是计算量变大了。history_len=12,就要跑12次GAT。实践中可以用Graph WaveNet那种方式,用扩张卷积同时处理时间和空间,效率更高。但作为教学示例,我们的版本更容易理解。

注意力头数n_heads=4的意思是对每个邻居节点,我们用4组独立的注意力权重去计算,然后把结果拼接起来。多头机制让模型能捕捉不同类型的空间依赖——有些头可能专注于上游方向的车流,有些头专注于下游方向的反压。

第四步:训练模型

python

# 划分训练集、验证集、测试集 train_ratio = 0.7 val_ratio = 0.15 n_samples = X_tensor.shape[0] train_end = int(n_samples * train_ratio) val_end = int(n_samples * (train_ratio + val_ratio)) train_X, train_y = X_tensor[:train_end], y_tensor[:train_end] val_X, val_y = X_tensor[train_end:val_end], y_tensor[train_end:val_end] test_X, test_y = X_tensor[val_end:], y_tensor[val_end:] print(f"训练集: {train_X.shape}, 验证集: {val_X.shape}, 测试集: {test_X.shape}") # 创建DataLoader batch_size = 32 train_dataset = torch.utils.data.TensorDataset(train_X, train_y) val_dataset = torch.utils.data.TensorDataset(val_X, val_y) test_dataset = torch.utils.data.TensorDataset(test_X, test_y) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # 损失函数和优化器 criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=True) # 训练函数 def train_epoch(model, loader, optimizer, criterion, edge_index): model.train() total_loss = 0 for batch_X, batch_y in loader: optimizer.zero_grad() # 前向传播 predictions = model.forward_v2(batch_X, edge_index) loss = criterion(predictions, batch_y) loss.backward() # 梯度裁剪防止梯度爆炸 torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() total_loss += loss.item() return total_loss / len(loader) def evaluate(model, loader, criterion, edge_index): model.eval() total_loss = 0 all_preds = [] all_targets = [] with torch.no_grad(): for batch_X, batch_y in loader: predictions = model.forward_v2(batch_X, edge_index) loss = criterion(predictions, batch_y) total_loss += loss.item() all_preds.append(predictions) all_targets.append(batch_y) avg_loss = total_loss / len(loader) all_preds = torch.cat(all_preds, dim=0) all_targets = torch.cat(all_targets, dim=0) return avg_loss, all_preds, all_targets # 训练循环 epochs = 100 best_val_loss = float('inf') patience_counter = 0 train_losses = [] val_losses = [] for epoch in range(epochs): train_loss = train_epoch(model, train_loader, optimizer, criterion, edge_index) val_loss, _, _ = evaluate(model, val_loader, criterion, edge_index) train_losses.append(train_loss) val_losses.append(val_loss) scheduler.step(val_loss) if val_loss < best_val_loss: best_val_loss = val_loss torch.save(model.state_dict(), 'best_model.pth') patience_counter = 0 else: patience_counter += 1 if (epoch + 1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f} | LR: {optimizer.param_groups[0]['lr']:.6f}") if patience_counter >= 20: print(f"Early stopping at epoch {epoch+1}") break # 加载最优模型 model.load_state_dict(torch.load('best_model.pth')) # 测试集评估 test_loss, test_preds, test_targets = evaluate(model, test_loader, criterion, edge_index) print(f"\n测试集MSE损失: {test_loss:.6f}") # 反标准化后计算真实尺度下的误差 test_preds_np = test_preds.numpy() test_targets_np = test_targets.numpy() # 对每个节点分别反标准化 test_preds_original = np.zeros_like(test_preds_np) test_targets_original = np.zeros_like(test_targets_np) for node in range(n_nodes): scaler = node_wise_scalers[node] node_pred = test_preds_np[:, node, :].reshape(-1, 1) node_target = test_targets_np[:, node, :].reshape(-1, 1) test_preds_original[:, node, :] = scaler.inverse_transform(node_pred).reshape(-1, future_len) test_targets_original[:, node, :] = scaler.inverse_transform(node_target).reshape(-1, future_len) # 计算MAE和RMSE mae = mean_absolute_error(test_targets_original.flatten(), test_preds_original.flatten()) rmse = np.sqrt(mean_squared_error(test_targets_original.flatten(), test_preds_original.flatten())) print(f"测试集MAE(车辆数): {mae:.2f}") print(f"测试集RMSE(车辆数): {rmse:.2f}")

运行这段代码,在模拟数据上你应该能得到MAE在3-5辆车左右。真实数据集上这个数字会大很多,但MAE能控制在5%以内就算不错了。

早停耐心值设20,ReduceLROnPlateau学习率调度器在验证损失10个epoch不下降时减半,这些技巧在实际项目中都很实用。梯度裁剪max_norm=1.0可以防止GAT训练时常见的梯度爆炸问题。

第五步:从预测到信号灯协同控制

预测只是手段,控制才是目的。有了对未来5-30分钟的交通流预测,信号灯可以怎么做?

最简单的方案是基于预测的绿信比分配。假设我们知道下一个15分钟每个方向会来多少辆车,那么绿灯时间应该跟这个方向的流量成正比。但这个方案没有考虑排队长度——如果某个方向已经积压了很多车,即使预测流量不大,也应该多给绿灯时间清空排队。

更好的方案是MPC(模型预测控制)。它的逻辑是:信号灯控制器有一个目标,比如最小化所有路口的平均等待时间。在每个决策时刻,控制器用我们的GAT模型预测未来一段时间(比如15分钟)的交通状态,然后搜索最优的信号配时方案。搜索空间很大,通常用遗传算法或强化学习来做。

多智能体强化学习是更前沿的方向。每个路口是一个智能体,它观察局部状态(本路口和邻居路口的排队长度、流量),然后选择相位。智能体之间通过一个中央协调器或者通过图神经网络交换信息。这种方法的优点是完全端到端,不需要手动设计目标函数,但训练非常不稳定。

我们实现一个基于预测的简单协调控制器,让你看到二者的结合如何工作:

python

class PredictiveSignalController: """ 基于流量预测的协同信号控制 每个路口根据预测的到达流量动态调整绿灯时间 """ def __init__(self, model, edge_index, node_wise_scalers, history_len, future_len, min_green=15, max_green=60, cycle_len=120): self.model = model self.edge_index = edge_index self.node_wise_scalers = node_wise_scalers self.history_len = history_len self.future_len = future_len self.min_green = min_green # 最短绿灯时间(秒) self.max_green = max_green # 最长绿灯时间(秒) self.cycle_len = cycle_len # 信号周期长度(秒) def get_action(self, current_observation, node_id): """ current_observation: (n_nodes, history_len, n_features) 最近的观测 node_id: 要控制的路口ID 返回:该路口东西向绿灯时间(秒),南北向绿灯时间自动为 cycle_len - 东西时间 """ with torch.no_grad(): # 转换为模型输入格式 obs_tensor = torch.FloatTensor(current_observation).unsqueeze(0) # (1, n_nodes, history_len, n_features) # 预测未来流量 predictions = self.model.forward_v2(obs_tensor, self.edge_index) # (1, n_nodes, future_len) # 反标准化 pred_np = predictions.squeeze(0).numpy() # (n_nodes, future_len) pred_original = np.zeros_like(pred_np) for node in range(n_nodes): scaler = node_wise_scalers[node] pred_original[node, :] = scaler.inverse_transform(pred_np[node, :].reshape(-1, 1)).flatten() # 获取未来流量的总和作为权重 node_future_flow = pred_original[node_id, :].sum() # 简化:根据流量分配绿灯时间(实际中需要考虑进口道方向) # 假设每个路口有4个进口方向,我们根据总到达流量分配东西向和南北向的时间 # 这里为了演示,假设node_id所在的交叉口东西向流量占比为 # 实际应该根据历史数据或实时检测确定,这里用模拟比例 east_west_ratio = 0.6 # 简化假设 # 根据流量调节:流量越大,绿灯时间越长,但受最大最小限制 base_green = self.min_green + (node_future_flow / 200) * (self.max_green - self.min_green) east_west_green = np.clip(base_green * east_west_ratio * 2, self.min_green, self.max_green) north_south_green = self.cycle_len - east_west_green north_south_green = np.clip(north_south_green, self.min_green, self.max_green) return east_west_green, north_south_green # 演示控制器使用 controller = PredictiveSignalController( model=model, edge_index=edge_index, node_wise_scalers=node_wise_scalers, history_len=history_len, future_len=future_len ) # 使用验证集最后一个样本作为当前观测 sample_obs = X_normalized[-1] # (n_nodes, history_len, n_features) for node in [0, 2, 5]: # 演示几个关键路口 ew_green, ns_green = controller.get_action(sample_obs, node) print(f"路口 {node}: 东西向绿灯 = {ew_green:.1f}秒, 南北向绿灯 = {ns_green:.1f}秒")

这个控制器虽然简单,但它体现了预测驱动控制的核心思想:用GAT模型预判未来15分钟每个方向的车流到达量,然后动态分配绿灯时间。

实际部署时还有很多细节需要考虑。相位差协调就是其中之一:主干道上一串路口的绿灯起始时间需要错开,形成一个绿波带。你从第一个路口绿灯启动,以接近限速的速度行驶,到达下一个路口时刚好赶上绿灯。我们的GAT模型天然支持这种需求,因为图结构里包含了相邻路口的连接关系,模型能学到上游路口放行的车流会在多长时间后到达下游路口。

第六步:可视化与结果分析

训练完模型后,可视化是理解模型行为的最佳方式。

python

# 1. 画出训练曲线 plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) plt.plot(train_losses, label='Train Loss') plt.plot(val_losses, label='Val Loss') plt.xlabel('Epoch') plt.ylabel('MSE Loss') plt.legend() plt.title('Training and Validation Loss') # 2. 预测vs真实对比(选一个路口和一个未来时间步) plt.subplot(1, 2, 2) node_to_plot = 2 time_step_to_plot = 0 # 预测的第一个时间步(5分钟后) # 取测试集前100个样本 sample_size = min(100, test_preds_np.shape[0]) test_preds_node = test_preds_original[:sample_size, node_to_plot, time_step_to_plot] test_targets_node = test_targets_original[:sample_size, node_to_plot, time_step_to_plot] plt.scatter(test_targets_node, test_preds_node, alpha=0.5) plt.plot([0, max(test_targets_node)], [0, max(test_targets_node)], 'r--', label='Perfect Prediction') plt.xlabel('True Flow (veh/5min)') plt.ylabel('Predicted Flow (veh/5min)') plt.title(f'Node {node_to_plot} - {5 * (time_step_to_plot+1)} minutes ahead') plt.legend() plt.tight_layout() plt.show() # 3. 时空热力图:展示所有节点在未来6个时间步的预测误差 errors = np.abs(test_preds_original - test_targets_original) # (样本, 节点, 未来步) mean_errors = errors.mean(axis=0) # (节点, 未来步) plt.figure(figsize=(12, 6)) im = plt.imshow(mean_errors, cmap='YlOrRd', aspect='auto') plt.colorbar(im, label='Mean Absolute Error (veh/5min)') plt.xlabel('Prediction Horizon (5-min intervals)') plt.ylabel('Node ID') plt.title('Spatio-Temporal Prediction Error Pattern') plt.xticks(range(future_len), [f'{5*(i+1)}min' for i in range(future_len)]) plt.yticks(range(n_nodes)) plt.show() # 分析哪个节点最难预测 node_errors = mean_errors.mean(axis=1) print("\n各节点平均预测误差:") for node in range(n_nodes): print(f" 节点 {node}: {node_errors[node]:.2f} veh/5min")

可视化结果通常会揭示几个有趣的模式:主干道上的节点(0-5)预测误差可能比分支节点(6-9)更大,因为主干道流量波动更剧烈。越往未来预测,误差累积越大,这是正常的。还可以观察误差的空间传播:如果节点2的预测误差很大,节点3的误差往往也偏大,因为误差沿着车流方向传播。

在真实项目中,还会做误差分解实验:对比GAT和GCN的性能,看注意力机制带来了多少提升。对比有无空间信息(即只使用GRU)的模型,证明图结构确实有用。这些消融实验是论文的标准套路,在实际工程中也能帮你判断是否值得增加模型的复杂度。

http://www.jsqmd.com/news/764318/

相关文章:

  • 在微服务架构中利用Taotoken统一管理多模型API调用与成本
  • 上海泽固新型建材:奉贤聚合物砂浆批发厂家推荐 - LYL仔仔
  • Taotoken 用量看板与账单追溯功能如何帮助控制项目预算
  • 5分钟快速上手:memtest_vulkan终极GPU显存稳定性测试完整指南
  • 2026佛山意式轻奢家具推荐 - 真知灼见33
  • 5分钟终极解决方案:macOS上Navicat Premium试用期重置完整指南
  • 如何用EASY-HWID-SPOOFER解决硬件限制:3步实战指南
  • 终极二维码修复指南:如何用QRazyBox三步恢复损坏的二维码
  • FPGA光口通信实战:如何利用GT Wizard IP核的示例工程快速搭建你的第一个收发链路
  • 免费开源!Win11Debloat:三步打造纯净高效的Windows系统
  • 企业级国产龙虾智能体怎么挑?主流管理平台推荐与替代方 - 品牌2025
  • 如何用深度强化学习+图神经网络解决3大路由难题?完整实战指南
  • 使用 Python 快速上手 Taotoken 调用 Claude 系列模型教程
  • ADSP充电框架里的‘邮局’与‘快递员’:深入剖析LPM、DPM、PPM模块的分工与通信机制
  • 《QGIS快速入门与应用基础》326:附录C:配套资源下载地址(数据包/模板/视频)
  • 从零构建生产级AI智能体:架构设计、框架选型与实战指南
  • 重庆速洁家政:北碚区靠谱的窗帘清洗公司有哪些 - LYL仔仔
  • 2026年美国EB-5移民公司推荐及选择参考 - 品牌排行榜
  • 首驱Y3值不值得买?不同版本、通勤需求、空间动力和智能配置怎么选 - Top品牌推荐官
  • 从参数到服务:深度解析巨亚仪器JY-H-100L-40HX高低温箱 - 品牌推荐大师
  • 2026AI大模型API聚合平台榜单揭晓
  • 从倒立摆到无人机:李雅普诺夫稳定性在实际工程中的‘隐形守护’与设计误区
  • 长期使用Taotoken服务后对其API稳定性和故障切换机制的体会
  • 睿家诚家具维修:吴江可靠的沙发翻新公司选哪家 - LYL仔仔
  • 首驱电动车售后怎么样?客服入口、质保政策、维修网点和体验边界全解析 - Top品牌推荐官
  • 告别AT指令抓瞎:手把手教你用ESP-01S和EC03-DNC实现远程网络点灯(附完整C51代码)
  • 利用Taotoken的模型广场为特定任务选择性价比最优的模型
  • 企业版OpenClaw管理平台选型必看,国产龙虾智能体安全自主更懂运维 - 品牌2025
  • 2026最新叉车租赁维修服务商推荐!广东优质权威榜单发布,专业靠谱广州白云等地服务商优选 - 十大品牌榜
  • 本地大模型联网搜索实战:LLocalSearch架构解析与部署指南