别再硬算任务分配了!用Python手搓匈牙利算法,5分钟搞定运筹学指派问题
用Python实现匈牙利算法:5分钟解决任务分配难题
当面对"将5个任务分配给5个工程师使总耗时最短"这类问题时,传统的手工计算既耗时又容易出错。匈牙利算法(Hungarian Algorithm)作为解决指派问题的经典方法,能高效找到最优分配方案。本文将带你用Python从零实现该算法,并深入解析其背后的数学原理。
1. 问题建模与算法原理
指派问题属于0-1整数规划问题,其标准形式可以表示为:
最小化 ∑∑ c_ij * x_ij 约束条件: ∑ x_ij = 1 对每个i(每行恰好一个1) ∑ x_ij = 1 对每个j(每列恰好一个1) x_ij ∈ {0,1}匈牙利算法的核心思想是通过矩阵变换,在不改变最优解的前提下,逐步构造出可以直接读取解的矩阵形式。其理论基础是克尼格定理(König's theorem),该定理指出:
在二分图中,最小点覆盖数等于最大匹配数
算法主要步骤包括:
- 行归约:每行减去该行最小值
- 列归约:每列减去该列最小值
- 试指派:寻找独立零元素
- 矩阵调整:当无法找到完整匹配时
import numpy as np def hungarian_algorithm(cost_matrix): # 步骤1:复制矩阵以避免修改原始数据 mat = cost_matrix.copy() # 步骤2:行归约 for i in range(mat.shape[0]): mat[i] -= np.min(mat[i]) # 步骤3:列归约 for j in range(mat.shape[1]): mat[:,j] -= np.min(mat[:,j]) # 后续步骤将在下面章节实现 return mat2. 矩阵变换与零元素生成
实现完整的匈牙利算法需要处理更复杂的情况。让我们完善矩阵变换部分:
def adjust_matrix(mat): # 覆盖所有零元素的最小直线数 covered_rows = [] covered_cols = [] # 步骤1:标记所有没有独立零元素的行 independent_zeros = find_independent_zeros(mat) marked_rows = [i for i in range(mat.shape[0]) if i not in [z[0] for z in independent_zeros]] # 步骤2:标记这些行中零元素所在的列 new_marked_cols = [] for row in marked_rows: for col in range(mat.shape[1]): if mat[row,col] == 0 and col not in new_marked_cols: new_marked_cols.append(col) break # 步骤3:标记这些列中独立零元素所在的行 new_marked_rows = [] for col in new_marked_cols: for zero in independent_zeros: if zero[1] == col and zero[0] not in new_marked_rows: new_marked_rows.append(zero[0]) break # 步骤4:重复直到没有新的标记 while len(new_marked_rows) > 0: temp_marked_cols = [] for row in new_marked_rows: for col in range(mat.shape[1]): if mat[row,col] == 0 and col not in temp_marked_cols: temp_marked_cols.append(col) break new_marked_rows = [] for col in temp_marked_cols: for zero in independent_zeros: if zero[1] == col and zero[0] not in new_marked_rows: new_marked_rows.append(zero[0]) break new_marked_cols += temp_marked_cols # 确定覆盖线:未标记的行和已标记的列 covered_rows = [i for i in range(mat.shape[0]) if i not in marked_rows] covered_cols = new_marked_cols # 找到未覆盖区域的最小值 min_val = np.inf for i in range(mat.shape[0]): if i in covered_rows: continue for j in range(mat.shape[1]): if j in covered_cols: continue if mat[i,j] < min_val: min_val = mat[i,j] # 调整矩阵 for i in range(mat.shape[0]): if i in covered_rows: continue mat[i] -= min_val for j in covered_cols: mat[:,j] += min_val return mat3. 独立零元素的查找与匹配
寻找独立零元素是算法的关键步骤,这实际上是在寻找二分图的最大匹配:
def find_independent_zeros(mat): rows, cols = mat.shape max_match = 0 assignments = {} # 使用深度优先搜索实现 def bpm(u, seen, match_to): for v in range(cols): if mat[u,v] == 0 and not seen[v]: seen[v] = True if match_to[v] == -1 or bpm(match_to[v], seen, match_to): match_to[v] = u return True return False match_to = [-1] * cols for u in range(rows): seen = [False] * cols bpm(u, seen, match_to) # 转换为(row,col)格式 result = [] for v in range(cols): if match_to[v] != -1: result.append((match_to[v], v)) return result4. 完整算法实现与示例
现在我们可以组合所有部分实现完整的匈牙利算法:
def hungarian_algorithm(cost_matrix): # 确保矩阵是方阵 assert cost_matrix.shape[0] == cost_matrix.shape[1] # 步骤1:复制矩阵 mat = cost_matrix.copy() # 步骤2:行归约 for i in range(mat.shape[0]): mat[i] -= np.min(mat[i]) # 步骤3:列归约 for j in range(mat.shape[1]): mat[:,j] -= np.min(mat[:,j]) # 步骤4:迭代调整直到找到完整匹配 while True: independent_zeros = find_independent_zeros(mat) if len(independent_zeros) == mat.shape[0]: break mat = adjust_matrix(mat) return independent_zeros # 示例使用 cost_matrix = np.array([ [6, 7, 11, 2], [4, 5, 9, 8], [3, 1, 10, 4], [5, 9, 8, 2] ]) assignment = hungarian_algorithm(cost_matrix) print("最优分配方案:", assignment) print("总成本:", sum(cost_matrix[i,j] for i,j in assignment))对于给定的示例,算法将输出:
最优分配方案: [(0, 3), (1, 0), (2, 1), (3, 2)] 总成本: 135. 算法优化与实用技巧
实际应用中,我们可以对基础算法进行多项优化:
- 非方阵处理:当任务和人员数量不等时,可以通过添加虚拟行或列(填充足够大的数)来扩展为方阵
def pad_matrix(mat): rows, cols = mat.shape if rows == cols: return mat elif rows > cols: new_cols = rows - cols padding = np.max(mat) * np.ones((rows, new_cols)) return np.hstack((mat, padding)) else: new_rows = cols - rows padding = np.max(mat) * np.ones((new_rows, cols)) return np.vstack((mat, padding))性能优化:使用更高效的匹配算法如Hopcroft-Karp算法来加速独立零元素的查找
大规模问题处理:对于非常大的矩阵,可以考虑近似算法或分布式计算
可视化调试:实现矩阵变换的可视化有助于理解算法过程
def visualize_assignment(mat, assignment): plt.figure(figsize=(8,6)) plt.imshow(mat, cmap='viridis') plt.colorbar() for i,j in assignment: plt.text(j, i, 'X', ha='center', va='center', color='red', fontsize=14) plt.title("Assignment Visualization") plt.xlabel("Tasks") plt.ylabel("Workers") plt.show()匈牙利算法的时间复杂度为O(n³),虽然不如某些现代算法快,但其思路清晰且在小规模问题上表现优异。理解并实现这一经典算法,不仅能解决实际问题,还能加深对组合优化和图论的理解。
