用Python+NetworkX复现经典交通分配:手把手教你从零搭建Frank-Wolfe算法求解UE模型
用Python+NetworkX复现经典交通分配:手把手教你从零搭建Frank-Wolfe算法求解UE模型
交通网络分析是城市规划与智能交通系统的核心课题。当我们在导航软件中看到实时路况预测,或参与城市道路规划方案评估时,背后都离不开交通分配模型的支持。用户均衡(User Equilibrium)作为Wardrop第一原理的数学表达,描述了出行者在路径选择中追求个人最优的群体行为模式。本文将带您用Python的NetworkX库,从零实现Frank-Wolfe算法求解UE模型的全过程。
1. 理论基础与工具准备
1.1 Wardrop均衡原理的工程意义
1952年提出的Wardrop第一原理指出:在均衡状态下,所有被使用的路径具有相同且最小的行驶时间。这一原理看似简单,却揭示了交通流自组织行为的本质特征。例如:
- 早晚高峰时,驾驶员不断切换路线导致各替代路径时间趋于一致
- 导航App的实时推荐实际上在推动系统向均衡状态演化
- 新开通道路初期可能无法分流,直到部分驾驶员发现时间优势
1.2 Beckmann变换与BPR函数
Beckmann在1956年将Wardrop原理转化为数学规划问题:
minimize Z(x) = ∑∫₀ˣᵃ tₐ(w)dw subject to: ∑ₖ fₖʳˢ = qʳˢ ∀(r,s) fₖʳˢ ≥ 0其中路阻函数常采用BPR(Bureau of Public Roads)公式:
def BPR(FFT, flow, capacity, alpha=0.15, beta=4.0): return FFT * (1 + alpha * (flow / capacity) ** beta)参数典型值:
| 参数 | 物理意义 | 典型值 |
|---|---|---|
| FFT | 自由流时间 | 路段属性 |
| α | 拥堵敏感度 | 0.15 |
| β | 非线性程度 | 4.0 |
1.3 开发环境配置
推荐使用Python 3.8+环境,主要依赖库:
pip install networkx pandas numpy matplotlib scipy数据文件结构示例:
/SiouxFalls ├── Link.csv # 路段属性 ├── Node.csv # 节点坐标 └── ODPairs.csv # OD需求矩阵2. 网络构建与数据预处理
2.1 拓扑结构可视化
利用NetworkX构建有向图并可视化:
def build_network(link_path, node_path): G = nx.from_pandas_edgelist( pd.read_csv(link_path), source='O', target='D', edge_attr=['FFT', 'Capacity'], create_using=nx.DiGraph() ) # 设置节点位置属性 pos = {row['id']:(row['pos_x'], row['pos_y']) for _,row in pd.read_csv(node_path).iterrows()} nx.set_node_attributes(G, pos, 'pos') return G常见问题处理:
- 节点ID不连续时需重建索引
- 缺失值建议用相邻路段均值填充
- 通行能力为0的路段应移除
2.2 流量初始化技巧
全有全无分配法(All-or-Nothing)实现要点:
def all_none_initialize(G, od_df): for _, (o, d, demand) in od_df.iterrows(): path = nx.shortest_path(G, o, d, weight='FFT') # 零流最短路 for u, v in zip(path[:-1], path[1:]): G[u][v]['flow_real'] += demand # 流量累积 # 更新初始阻抗 for u, v, data in G.edges(data=True): data['weight'] = BPR(data['FFT'], data['flow_real'], data['Capacity'])注意:NetworkX的shortest_path默认使用Dijkstra算法,对于大型网络可考虑A*算法加速
3. Frank-Wolfe算法实现细节
3.1 算法流程分解
Frank-Wolfe算法迭代过程可分为四个关键步骤:
- 辅助流量分配:在当前阻抗下执行全有全无分配
- 方向确定:计算辅助流量与当前流量的差值向量
- 步长搜索:沿下降方向进行一维线性搜索
- 流量更新:合并当前流量与辅助流量
3.2 核心代码实现
辅助流量生成:
def all_none_temp(G, od_df): nx.set_edge_attributes(G, 0, 'flow_temp') # 重置临时流量 for o, d, demand in od_df.values: path = nx.shortest_path(G, o, d, weight='weight') for u, v in zip(path[:-1], path[1:]): G[u][v]['flow_temp'] += demand最优步长搜索(使用SciPy优化器):
def get_best_step(G): res = minimize_scalar( lambda a: sum( data['FFT'] * (x := data['flow_real'] + a*data['descent']) * (1 + 0.15/(4+1)*(x/data['Capacity'])**4) for *_, data in G.edges(data=True) ), bounds=(0, 1), method='bounded' ) return res.x3.3 收敛性优化技巧
实践中我们采用相对误差作为停止准则:
def compute_error(G): flows = np.array(list(nx.get_edge_attributes(G, 'flow_real').values())) new_flows = np.array(list(nx.get_edge_attributes(G, 'flow_temp').values())) return np.linalg.norm(new_flows - flows) / (np.sum(flows) + 1e-6)常见收敛问题解决方案:
- 震荡现象:引入Nesterov动量加速
- 早熟收敛:自适应调整步长下限
- 数值不稳定:对BPR函数进行平滑处理
4. 完整实现与结果分析
4.1 主程序架构
def main(max_iter=100, tol=1e-4): G = build_network("Link.csv", "Node.csv") od_df = pd.read_csv("ODPairs.csv") # 初始化 all_none_initialize(G, od_df) errors = [] # 迭代过程 for epoch in range(max_iter): all_none_temp(G, od_df) get_descent(G) update_flow_real(G) err = compute_error(G) errors.append(err) if err < tol: break # 结果输出 plot_convergence(errors) save_results(G)4.2 结果可视化方法
收敛曲线绘制:
def plot_convergence(errors): plt.figure(figsize=(10, 5)) plt.semilogy(errors, marker='o', linestyle='--') plt.xlabel('Iteration') plt.ylabel('Relative Gap') plt.grid(True)流量热力图生成:
def plot_heatmap(G): pos = nx.get_node_attributes(G, 'pos') flows = [d['flow_real'] for *_, d in G.edges(data=True)] nx.draw_networkx_edges( G, pos, width=[f/max(flows)*5 for f in flows], edge_color=flows, edge_cmap=plt.cm.Reds )4.3 性能优化建议
对于大规模网络(>1000节点)可考虑:
并行计算:将OD对分配到不同进程处理
from multiprocessing import Pool def parallel_all_none(args): G, od_chunk = args local_G = G.copy() all_none_temp(local_G, od_chunk) return nx.get_edge_attributes(local_G, 'flow_temp')稀疏矩阵存储:使用CSR格式存储大规模OD矩阵
增量更新:仅对受影响子图重新计算最短路
5. 工程实践中的扩展应用
5.1 多类用户均衡
考虑不同车辆类型(如货车/客车)的阻抗差异:
class MultiClassUE: def __init__(self, vehicle_classes): self.classes = vehicle_classes # 各车型占比及BPR参数 def combined_BPR(self, flow): return sum( p * BPR(FFT, flow, cap, a, b) for p, (FFT, cap, a, b) in zip( self.classes['proportion'], self.classes['params'] ) )5.2 动态交通分配
引入时间维度后的实现要点:
- 将路网扩展为时空网络
- 阻抗函数加入排队延迟项
- 使用动态规划求解最优出发时间
5.3 与微观仿真结合
宏观均衡结果可作为微观仿真的输入:
def generate_od_matrix(G, interval=15): return { (o, d): G.nodes[o]['population'] * G.nodes[d]['attraction'] for o, d in product(G.nodes, repeat=2) }实际项目中,我们常先用UE模型进行快速方案评估,再通过微观仿真验证细节。这种组合方法在深圳某交通枢纽改造中,将方案评估周期从2周缩短到3天。
