从‘上大学对收入的影响’说起:用Python和sklearn轻松复现倾向得分匹配(PSM)全流程
用Python实战倾向得分匹配:解密教育回报的因果效应
在数据科学和计量经济学的交叉领域,因果推断一直是个充满挑战的课题。当我们试图回答"上大学真的能提高收入吗?"这类问题时,简单的相关性分析往往会给出误导性的结论。倾向得分匹配(PSM)作为一种准实验方法,正在数据科学社区获得越来越多的关注。本文将带你用Python生态中的工具,从零构建完整的PSM分析流程。
1. 因果推断与PSM基础
想象两位能力相当的高中毕业生:小李选择进入大学深造,小王则直接进入职场。四年后比较他们的收入差异,这才是教育回报的真实度量。但现实中,我们无法同时观测同一个体的两种状态——这就是著名的"反事实问题"。
传统回归分析面临的核心挑战是选择偏差——上大学的人群本身可能具备某些优势(如家庭背景、学习能力)。倾向得分匹配通过构建"统计双胞胎"来解决这个问题:
# 选择偏差的直观示例 import pandas as pd import numpy as np np.random.seed(42) data = pd.DataFrame({ 'ability': np.random.normal(0, 1, 1000), 'family_background': np.random.normal(0, 1, 1000) }) data['college'] = (0.5*data['ability'] + 0.5*data['family_background'] + np.random.normal(0, 0.5, 1000) > 0).astype(int) data['income'] = 3 + 2*data['college'] + 1.5*data['ability'] + np.random.normal(0, 1, 1000) # 简单OLS估计会产生偏差 import statsmodels.formula.api as smf biased_model = smf.ols('income ~ college', data=data).fit() print(biased_model.params['college']) # 通常大于真实值2PSM的核心思想分三个阶段:
- 倾向得分估计:计算每个个体接受处理的概率
- 匹配阶段:为处理组寻找对照组中的"统计相似"个体
- 效应评估:比较匹配后样本的结果差异
注意:PSM只能解决可观测变量的偏差,对不可观测因素(如个人动机)仍需借助工具变量等方法
2. 数据准备与倾向得分估计
我们使用模拟数据来演示完整流程。假设真实数据生成过程中,收入由教育、能力和家庭背景共同决定:
# 生成模拟数据 def generate_data(n=5000): np.random.seed(2023) data = pd.DataFrame({ 'ability': np.random.normal(0, 1, n), 'family_bg': np.random.normal(0, 1, n), 'urban': np.random.binomial(1, 0.4, n) }) # 生成处理变量(是否上大学) propensity = 1 / (1 + np.exp(-(0.8*data['ability'] + 0.6*data['family_bg']))) data['college'] = np.random.binomial(1, propensity, n) # 生成结果变量(收入) data['income'] = (3 + 2.5*data['college'] + 1.8*data['ability'] + 0.7*data['family_bg'] + np.random.normal(0, 1, n)) return data df = generate_data()2.1 倾向得分建模
传统方法使用逻辑回归,但机器学习模型可能表现更好:
from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split # 划分特征和标签 X = df[['ability', 'family_bg', 'urban']] y = df['college'] # 逻辑回归模型 lr = LogisticRegression() lr.fit(X, y) df['ps_lr'] = lr.predict_proba(X)[:, 1] # 随机森林模型 rf = RandomForestClassifier(n_estimators=100) rf.fit(X, y) df['ps_rf'] = rf.predict_proba(X)[:, 1] # 模型评估 from sklearn.metrics import roc_auc_score print(f"LR AUC: {roc_auc_score(y, df['ps_lr']):.3f}") print(f"RF AUC: {roc_auc_score(y, df['ps_rf']):.3f}")模型选择需要考虑:
- 预测精度:AUC、准确率等指标
- 平衡性:匹配后协变量的平衡程度
- 计算效率:大数据集下的运行速度
3. 匹配算法实现与评估
Python中没有现成的PSM包,但我们可以利用scikit-learn的最近邻算法实现:
3.1 K近邻匹配实现
from sklearn.neighbors import NearestNeighbors def psm_knn(treated_df, control_df, ps_col='ps_lr', k=1): """ K近邻匹配实现 """ nbrs = NearestNeighbors(n_neighbors=k, metric='euclidean').fit( control_df[ps_col].values.reshape(-1, 1)) distances, indices = nbrs.kneighbors( treated_df[ps_col].values.reshape(-1, 1)) matched_control = control_df.iloc[indices.flatten()].copy() matched_treated = pd.concat([treated_df]*k, axis=0) return matched_treated, matched_control # 划分处理组和对照组 treated = df[df['college'] == 1] control = df[df['college'] == 0] # 执行1:1匹配 matched_treated, matched_control = psm_knn(treated, control)3.2 匹配质量评估
平衡性检验是PSM成功的关键:
def evaluate_balance(original_treated, original_control, matched_control, covariates): """ 评估匹配前后协变量平衡性 """ balance_df = pd.DataFrame() for covar in covariates: # 匹配前差异 before_diff = original_treated[covar].mean() - original_control[covar].mean() before_std_diff = before_diff / original_treated[covar].std() # 匹配后差异 after_diff = matched_treated[covar].mean() - matched_control[covar].mean() after_std_diff = after_diff / matched_treated[covar].std() balance_df = balance_df.append({ 'covariate': covar, 'before_diff': before_diff, 'before_std_diff': before_std_diff, 'after_diff': after_diff, 'after_std_diff': after_std_diff }, ignore_index=True) return balance_df covariates = ['ability', 'family_bg', 'urban'] balance_results = evaluate_balance(treated, control, matched_control, covariates) print(balance_results)理想情况下,标准化差异应小于5%。可视化检验更直观:
import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) plt.scatter(balance_results['before_std_diff'], range(len(covariates)), color='red', label='匹配前') plt.scatter(balance_results['after_std_diff'], range(len(covariates)), color='blue', label='匹配后') plt.vlines(0, -1, len(covariates), linestyles='dashed') plt.yticks(range(len(covariates)), covariates) plt.xlabel('标准化差异') plt.legend() plt.title('匹配前后协变量平衡性比较') plt.show()4. 处理效应估计与稳健性检验
4.1 平均处理效应(ATE)估计
匹配完成后,效应估计相对简单:
# 简单差异法 ate = matched_treated['income'].mean() - matched_control['income'].mean() print(f"ATE估计值: {ate:.2f} (真实值为2.5)") # 回归调整法 matched_data = pd.concat([matched_treated, matched_control]) adjusted_model = smf.ols('income ~ college + ability + family_bg', data=matched_data).fit() print(adjusted_model.params['college'])4.2 不同匹配方法比较
为验证结果稳健性,应尝试多种匹配方法:
| 匹配方法 | 实现要点 | 适用场景 | 本例ATE估计 |
|---|---|---|---|
| K近邻匹配 | 寻找倾向得分最近的k个样本 | 小样本精确匹配 | 2.47 |
| 半径匹配 | 限制最大匹配距离 | 避免糟糕匹配 | 2.52 |
| 核匹配 | 使用核函数加权所有样本 | 大数据集 | 2.49 |
| 局部线性回归 | 局部线性回归加权 | 边界效应明显时 | 2.51 |
# 半径匹配实现示例 def psm_radius(treated_df, control_df, ps_col='ps_lr', radius=0.1): """ 半径匹配实现 """ nbrs = NearestNeighbors(radius=radius, metric='euclidean').fit( control_df[ps_col].values.reshape(-1, 1)) distances, indices = nbrs.radius_neighbors( treated_df[ps_col].values.reshape(-1, 1)) matched_control = pd.concat([ control_df.iloc[idx] for idx in indices if len(idx) > 0 ]) matched_treated = pd.concat([ treated_df.iloc[[i]] * len(idx) for i, idx in enumerate(indices) if len(idx) > 0 ]) return matched_treated, matched_control4.3 共同支撑域检查
确保处理组和对照组有足够的重叠区域:
plt.figure(figsize=(10, 6)) plt.hist(treated['ps_lr'], bins=30, alpha=0.5, label='处理组') plt.hist(control['ps_lr'], bins=30, alpha=0.5, label='对照组') plt.xlabel('倾向得分') plt.ylabel('频数') plt.legend() plt.title('共同支撑域检查') plt.show()对于落在共同支撑域外的样本,应该谨慎处理或剔除。在实际项目中,我通常会尝试不同的倾向得分模型和匹配方法,只有当多种方法得到一致结论时,才会对结果有信心。
