超效率SBM模型Python实战:用scipy.optimize处理含非期望产出的政府数据效率排名
超效率SBM模型Python实战:用scipy.optimize处理含非期望产出的政府数据效率排名
在公共管理领域,政府开放数据的效率评估一直是学术界和实务界关注的焦点。传统的DEA模型虽然被广泛应用,但在处理包含非期望产出(如污染排放、能源消耗)的复杂场景时,往往显得力不从心。超效率SBM(Slack-Based Measure)模型作为一种非径向、非角度的效率评估方法,不仅能够有效处理期望产出与非期望产出并存的情况,还能对效率值为1的决策单元进行进一步区分排序,为政策制定者提供更精细的决策依据。
本文将带您深入超效率SBM模型的数学原理和Python实现细节,使用scipy.optimize从零构建完整的求解流程。不同于简单的库调用,我们将重点关注:
- 如何将数学模型精确转化为线性规划问题
- 处理多类型产出数据时的reshape和拼接技巧
- 约束条件矩阵(Aeq)的构建逻辑与边界条件(bounds)设置
- 实际政府数据评估中的常见陷阱与解决方案
1. 超效率SBM模型的核心原理
1.1 从传统DEA到SBM的演进
传统DEA模型(如CCR、BCC)存在两个主要局限:
- 径向性限制:要求所有投入或产出按相同比例增减
- 无法区分有效单元:当多个DMU效率值为1时无法进一步排序
超效率SBM模型通过引入松弛变量解决了这些问题:
# 关键改进对比 traditional_dea = { "径向性": "是", "效率值范围": "[0,1]", "非期望产出处理": "困难" } super_sbm = { "径向性": "否", "效率值范围": "[0,∞)", "非期望产出处理": "原生支持" }1.2 数学建模要点
考虑有n个决策单元(DMU),每个DMU有:
- m种投入:x ∈ R^m
- s1种期望产出:y^g ∈ R^s1
- s2种非期望产出:y^b ∈ R^s2
超效率SBM模型的目标函数为:
min ρ = (1 - (1/m)∑(s_i^-/x_ik)) / (1 + (1/(s1+s2))(∑(s_r^g/y_rk^g) + ∑(s_r^b/y_rk^b)))其中:
- s_i^- 为投入松弛量
- s_r^g 为期望产出松弛量
- s_r^b 为非期望产出松弛量
注意:非期望产出的处理需要特别注意方向性,其松弛量应被视为需要减少的部分
2. 数据准备与预处理
2.1 政府开放数据的典型结构
以16个城市的政府数据为例,常见指标包括:
| 指标类型 | 具体指标示例 |
|---|---|
| 投入指标 | 财政投入、人力投入、IT基础设施 |
| 期望产出 | 数据开放数量、API调用次数、公众满意度 |
| 非期望产出 | 数据泄露事件、响应延迟、能源消耗 |
import pandas as pd import numpy as np # 模拟数据加载 data = pd.read_excel("government_data.xlsx", index_col=0) raw_values = data.values # 数据拆分 x = raw_values[:, 0:2].T # 2个投入指标 y_g = raw_values[:, 2:4].T # 2个期望产出 y_b = raw_values[:, 4:5].T # 1个非期望产出2.2 非期望产出的特殊处理
非期望产出需要反向处理,常见方法包括:
- 取倒数法:1/y_b
- 线性变换:max(y_b) + 1 - y_b
- 方向性距离函数:直接纳入模型约束
# 方法1:取倒数处理 y_b_processed = 1 / y_b # 方法2:线性变换 y_b_max = np.max(y_b) y_b_processed = y_b_max + 1 - y_b提示:实际应用中建议测试不同处理方法对结果的影响
3. scipy.optimize实现详解
3.1 线性规划问题构建
将超效率SBM转化为标准LP问题:
min c^T * v s.t. Aeq * v = beq v >= 0其中决策变量v包含:
- n个λ权重变量
- m个投入松弛量
- s1个期望产出松弛量
- s2个非期望产出松弛量
- 1个ρ效率变量
from scipy import optimize m, n = x.shape s1 = y_g.shape[0] s2 = y_b.shape[0] theta = [] for i in range(n): # 对每个DMU单独求解 # 目标函数系数 c = np.concatenate([ np.zeros(n), # λ -1/(m * x[:, i]), # s_i^- (投入松弛) np.zeros(s1 + s2), # s_r^g, s_r^b (产出松弛) np.array([1]) # ρ ]) # 等式约束矩阵 Aeq1 = np.hstack([ x, # λ*x np.eye(m), # s_i^- np.zeros((m, s1 + s2)), # 产出松弛占位 -x[:, i, None] # -x_k ]) Aeq2 = np.hstack([ y_g, # λ*y^g np.zeros((s1, m)), # 投入松弛占位 -np.eye(s1), # s_r^g np.zeros((s1, s2)), # 非期望产出松弛占位 -y_g[:, i, None] # -y_k^g ]) Aeq3 = np.hstack([ y_b, # λ*y^b np.zeros((s2, m)), # 投入松弛占位 np.zeros((s2, s1)), # 期望产出松弛占位 np.eye(s2), # s_r^b -y_b[:, i, None] # -y_k^b ]) Aeq4 = np.hstack([ np.zeros(n), # λ np.zeros(m), # s_i^- 1/((s1+s2)*y_g[:, i]), # s_r^g系数 1/((s1+s2)*y_b[:, i]), # s_r^b系数 np.array([1]) # ρ ]).reshape(1, -1) Aeq = np.vstack([Aeq1, Aeq2, Aeq3, Aeq4]) beq = np.concatenate([ np.zeros(m + s1 + s2), # 前三个约束 np.array([1]) # 最后一个约束 ]) bounds = [(0, None)] * (n + m + s1 + s2 + 1) result = optimize.linprog(c=c, A_eq=Aeq, b_eq=beq, bounds=bounds) theta.append(result.fun)3.2 关键实现技巧
矩阵拼接技巧:
- 使用
np.hstack进行水平拼接 - 使用
np.vstack进行垂直拼接 - 单列向量需用
[:, None]确保二维结构
- 使用
稀疏矩阵优化: 对于大规模问题,可转换为稀疏矩阵:
from scipy.sparse import csr_matrix # 将Aeq转换为稀疏矩阵 Aeq_sparse = csr_matrix(Aeq) result = optimize.linprog(c=c, A_eq=Aeq_sparse, b_eq=beq, bounds=bounds)- 并行计算加速: 使用
joblib并行化DMU循环:
from joblib import Parallel, delayed def solve_dmu(i): # 构建并求解单个DMU的问题 return result.fun theta = Parallel(n_jobs=4)(delayed(solve_dmu)(i) for i in range(n))4. 结果分析与可视化
4.1 效率排名解读
计算完成后,我们可以得到各DMU的超效率值:
efficiency_results = pd.DataFrame({ 'City': data.index, 'SuperEfficiency': theta }).sort_values('SuperEfficiency', ascending=False)典型结果特征:
- 效率值>1:超高效单元
- 效率值=1:有效但非超高效
- 效率值<1:无效单元
4.2 改进方向分析
对于无效单元,可以从松弛变量找出改进空间:
# 获取最终解的所有变量 full_solution = result.x # 提取关键部分 lambda_weights = full_solution[:n] # λ权重 input_slacks = full_solution[n:n+m] # 投入松弛 good_output_slacks = full_solution[n+m:n+m+s1] # 期望产出松弛 bad_output_slacks = full_solution[n+m+s1:n+m+s1+s2] # 非期望产出松弛改进建议表:
| 改进类型 | 计算方式 | 政策含义 |
|---|---|---|
| 投入过剩 | s_i^- / x_ik | 可减少的投入比例 |
| 期望产出不足 | s_r^g / y_rk^g | 可增加的产出比例 |
| 非期望产出过多 | s_r^b / y_rk^b | 需减少的负面产出 |
4.3 可视化呈现
使用matplotlib绘制效率分布:
import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) efficiency_results['SuperEfficiency'].plot(kind='barh') plt.axvline(x=1, color='r', linestyle='--') plt.xlabel('Super-efficiency Score') plt.title('Government Data Openness Efficiency Ranking') plt.tight_layout() plt.show()对于地理数据,可使用geopandas绘制空间分布:
import geopandas as gpd # 假设有城市边界shp文件 cities = gpd.read_file('city_boundaries.shp') results_geo = cities.merge(efficiency_results, on='City') fig, ax = plt.subplots(1, 1, figsize=(12, 8)) results_geo.plot(column='SuperEfficiency', legend=True, scheme='quantiles', cmap='RdYlGn', ax=ax) ax.set_title('Spatial Distribution of Efficiency Scores') plt.show()5. 高级应用与扩展
5.1 窗口分析技术
对于面板数据,可以引入窗口分析:
window_size = 3 # 3年窗口 for year in range(len(years) - window_size + 1): window_data = data[(data['Year'] >= years[year]) & (data['Year'] <= years[year + window_size - 1])] # 对每个窗口计算效率5.2 网络SBM模型
当存在多阶段生产过程时,可扩展为网络SBM:
# 阶段1投入产出 x1 = data[:, 0:2].T y1 = data[:, 2:4].T # 阶段2投入产出 x2 = data[:, 4:6].T y2 = data[:, 6:8].T # 构建网络约束矩阵 Aeq_network = np.block([ [Aeq1_stage1, np.zeros_like(Aeq1_stage1)], [np.zeros_like(Aeq1_stage2), Aeq1_stage2], [linkage_constraints] # 阶段间连接约束 ])5.3 敏感性分析
评估指标选择对结果的影响:
sensitivity_results = {} for indicator in optional_indicators: modified_data = data.drop(indicator, axis=1) # 重新计算效率 sensitivity_results[indicator] = calculate_efficiency(modified_data)6. 实际应用中的挑战与解决方案
6.1 数据质量问题
常见问题及处理方法:
| 问题类型 | 检测方法 | 解决方案 |
|---|---|---|
| 零值数据 | describe()查看min | 微小值替换(如1e-6) |
| 极端离群值 | 箱线图分析 | Winsorize处理 |
| 指标负相关 | 相关系数矩阵 | 重新审视指标合理性 |
# Winsorize处理示例 from scipy.stats import mstats winsorized = mstats.winsorize(data, limits=[0.05, 0.05])6.2 模型选择验证
验证SBM模型适用性:
相关性检验:
from scipy.stats import spearmanr # 比较SBM与传统DEA结果 corr, p_value = spearmanr(sbm_scores, dea_scores)超效率必要性检验:
- 检查有效DMU数量占比
- 若>30%则需要超效率模型
6.3 计算效率优化
大规模问题加速策略:
问题分解:
# 按地区分组并行计算 regions = data['Region'].unique() regional_results = {} for region in regions: regional_data = data[data['Region'] == region] regional_results[region] = solve_sbm(regional_data)热启动技巧:
# 使用上一个DMU的解作为初始值 initial_guess = None for i in range(n): result = optimize.linprog(..., x0=initial_guess) initial_guess = result.xGPU加速:
# 使用cupy替代numpy import cupy as cp x_gpu = cp.asarray(x) # ...后续矩阵运算在GPU执行
7. 完整案例:省级政府数据开放评估
以某省16个地市为例,演示端到端分析流程:
数据收集:
- 投入:信息化投资额、专职人员数
- 期望产出:开放数据集数量、API调用量
- 非期望产出:数据安全事件数
预处理脚本:
def preprocess_data(raw_df): # 缺失值处理 df = raw_df.fillna(raw_df.median()) # 非期望产出处理 df['security_incidents'] = 1 / (df['security_incidents'] + 1) # 标准化 return (df - df.mean()) / df.std()效率计算:
# 使用前文实现的SBM求解器 efficiency_scores = solve_sbm(preprocessed_data)结果解读:
- 效率最高城市:A市(1.25)
- 效率最低城市:B市(0.68)
- 改进建议:
- B市应重点减少信息化投资冗余(松弛量23%)
- 提高API开放程度(可增加35%)
政策模拟:
# 模拟投入增加10%的效果 modified_input = data['investment'] * 1.1 new_efficiency = solve_sbm(modified_input, data[outputs])
8. 前沿扩展方向
8.1 结合机器学习方法
效率预测模型:
from sklearn.ensemble import RandomForestRegressor # 使用城市特征预测效率值 X = data[['GDP', 'Population', 'Education']] y = efficiency_scores model = RandomForestRegressor().fit(X, y)异常DMU检测:
from sklearn.svm import OneClassSVM # 检测异常效率模式 clf = OneClassSVM().fit(efficiency_features) anomalies = clf.predict(efficiency_features)
8.2 动态SBM模型
引入Malmquist指数分析效率变化:
def malmquist_index(data_year1, data_year2): eff1 = solve_sbm(data_year1) eff2 = solve_sbm(data_year2) # 计算技术效率变化和技术进步 return mi_results8.3 空间SBM模型
考虑地理空间效应:
# 构建空间权重矩阵 from libpysal.weights import Queen w = Queen.from_dataframe(geo_data) spatial_lag = weights.spatial_lag.lag_spatial(w, efficiency_scores)