从贝叶斯到渠道归因:手把手教你用Python搞定几个小众但好用的归因模型
从贝叶斯到渠道归因:手把手教你用Python搞定几个小众但好用的归因模型
在数据驱动的营销和产品决策中,归因分析一直是核心难题。当用户从看到广告到最终购买,可能经历了搜索、点击广告、浏览官网、加入购物车等多个触点,如何公平地评估每个渠道的贡献?传统方法如末次点击归因或线性归因虽然简单,但往往忽略了用户旅程中的复杂互动。本文将带你用Python实现三种更精细的归因模型:贝叶斯概率归因、马尔科夫链归因和生存分析归因,并通过真实数据集对比它们与传统方法的差异。
1. 环境准备与数据加载
1.1 安装必要库
我们需要以下Python库来实现高级归因分析:
pip install numpy pandas matplotlib seaborn pymc3 lifelines1.2 加载并探索示例数据集
我们使用一个模拟的多渠道电商转化数据集,包含用户ID、接触渠道、时间戳和最终转化标志:
import pandas as pd # 加载数据集 df = pd.read_csv('customer_journey.csv') print(df.head()) # 数据概览 print(f"数据集包含 {df.shape[0]} 条用户路径记录") print(f"唯一用户数: {df['user_id'].nunique()}") print(f"渠道分布:\n{df['channel'].value_counts()}")典型的数据结构如下表所示:
| user_id | channel | timestamp | conversion |
|---|---|---|---|
| 1001 | paid_search | 2023-01-01 09:15:00 | 0 |
| 1001 | organic_social | 2023-01-03 14:22:00 | 0 |
| 1001 | 2023-01-05 11:05:00 | 1 |
2. 传统归因模型实现
2.1 末次点击归因
这是最简单的归因模型,将所有功劳归于转化前的最后一个渠道:
def last_click_attribution(df): # 获取每个用户最后接触的渠道 last_clicks = df.sort_values('timestamp').groupby('user_id').last() # 只保留转化用户的路径 converted_users = last_clicks[last_clicks['conversion'] == 1] # 计算各渠道的转化贡献 attribution = converted_users['channel'].value_counts(normalize=True) return attribution last_click = last_click_attribution(df) print("末次点击归因结果:\n", last_click)2.2 线性归因
线性归因将转化功劳平均分配给用户路径中的所有渠道:
def linear_attribution(df): # 获取转化用户的完整路径 converted_paths = df[df['user_id'].isin( df[df['conversion'] == 1]['user_id'].unique() )] # 计算每个渠道在转化路径中出现的频率 channel_counts = converted_paths['channel'].value_counts() total = channel_counts.sum() # 归一化为贡献比例 attribution = channel_counts / total return attribution linear = linear_attribution(df) print("线性归因结果:\n", linear)2.3 时间衰减归因
这种模型给予更接近转化时刻的渠道更多权重:
def time_decay_attribution(df, half_life=24*3600): # 半衰期设为24小时 # 计算每个用户路径中各渠道的时间权重 def calculate_weights(group): max_time = group['timestamp'].max() group['weight'] = 0.5**((max_time - group['timestamp']).dt.total_seconds()/half_life) return group weighted_paths = df.groupby('user_id').apply(calculate_weights) # 只保留转化用户的路径 converted = weighted_paths[weighted_paths['conversion'] == 1] # 计算各渠道的加权贡献 attribution = converted.groupby('channel')['weight'].sum() attribution = attribution / attribution.sum() return attribution time_decay = time_decay_attribution(df) print("时间衰减归因结果:\n", time_decay)3. 贝叶斯概率归因模型
3.1 贝叶斯归因原理
贝叶斯归因通过建模各渠道的转化概率分布,考虑先验知识和观测数据来估计渠道贡献。我们使用PyMC3实现:
import pymc3 as pm import numpy as np # 准备数据:计算每个渠道的展示次数和转化次数 channel_stats = df.groupby('channel').agg( impressions=('user_id', 'count'), conversions=('conversion', 'sum') ).reset_index() # 构建贝叶斯模型 with pm.Model() as bayesian_model: # 定义先验分布 alpha_prior = pm.HalfNormal('alpha', sigma=1) beta_prior = pm.HalfNormal('beta', sigma=1) # 各渠道的转化率 theta = pm.Beta('theta', alpha=alpha_prior, beta=beta_prior, shape=len(channel_stats)) # 似然函数 conv = pm.Binomial('conv', n=channel_stats['impressions'].values, p=theta, observed=channel_stats['conversions'].values) # 采样 trace = pm.sample(2000, tune=1000, cores=2) # 分析后验分布 pm.plot_posterior(trace, var_names=['theta'], ref_val=0)3.2 结果解读与应用
贝叶斯模型输出的不是单一数值,而是各渠道转化率的概率分布。我们可以计算各渠道的后验均值和高密度区间:
# 计算各渠道的后验均值 theta_means = trace['theta'].mean(axis=0) channel_stats['bayesian_attribution'] = theta_means / theta_means.sum() print("贝叶斯归因结果:\n", channel_stats[['channel', 'bayesian_attribution']])贝叶斯方法的优势在于:
- 能处理小样本渠道,通过共享先验信息避免极端估计
- 提供不确定性量化,可以计算各渠道贡献的可信区间
- 易于纳入业务先验知识,如已知某些渠道质量通常较高
4. 马尔科夫链归因模型
4.1 马尔科夫归因原理
马尔科夫归因将用户路径视为状态转移过程,通过计算移除某个渠道后整体转化概率的下降程度来评估其贡献。
首先需要构建转移矩阵:
from collections import defaultdict def build_transition_matrix(df): # 初始化转移计数字典 transitions = defaultdict(lambda: defaultdict(int)) total_counts = defaultdict(int) # 遍历所有用户路径 for user, group in df.groupby('user_id'): path = group.sort_values('timestamp')['channel'].tolist() # 添加开始和结束状态 path = ['(start)'] + path + ['(conversion)' if group['conversion'].iloc[-1] else '(null)'] # 统计转移次数 for i in range(len(path)-1): from_state, to_state = path[i], path[i+1] transitions[from_state][to_state] += 1 total_counts[from_state] += 1 # 计算转移概率 transition_matrix = {} for from_state, to_states in transitions.items(): transition_matrix[from_state] = { to_state: count / total_counts[from_state] for to_state, count in to_states.items() } return transition_matrix trans_matrix = build_transition_matrix(df)4.2 计算移除效应
def calculate_removal_effect(trans_matrix): # 计算基准转化率 def get_conversion_prob(matrix): prob = 1.0 current_state = '(start)' while current_state not in ['(conversion)', '(null)']: next_probs = matrix[current_state] if '(conversion)' in next_probs: prob *= next_probs['(conversion)'] / ( next_probs.get('(conversion)', 0) + next_probs.get('(null)', 0) ) break else: # 转移到下一个状态 next_state = max(next_probs.items(), key=lambda x: x[1])[0] prob *= next_probs[next_state] current_state = next_state return prob baseline = get_conversion_prob(trans_matrix) # 计算各渠道的移除效应 removal_effects = {} channels = [c for c in set(df['channel']) if c in trans_matrix] for channel in channels: # 创建移除该渠道后的转移矩阵 modified_matrix = {} for from_state, to_states in trans_matrix.items(): modified_matrix[from_state] = {} total = sum(v for k, v in to_states.items() if k != channel or from_state == channel) for to_state, prob in to_states.items(): if to_state == channel and from_state != channel: continue modified_matrix[from_state][to_state] = prob / total if total > 0 else 0 # 计算移除后的转化率 modified_conv = get_conversion_prob(modified_matrix) removal_effects[channel] = (baseline - modified_conv) / baseline # 归一化为贡献比例 total_effect = sum(removal_effects.values()) attribution = {k: v/total_effect for k, v in removal_effects.items()} return attribution markov_attribution = calculate_removal_effect(trans_matrix) print("马尔科夫归因结果:\n", markov_attribution)5. 模型对比与业务应用
5.1 四种模型结果对比
我们将不同归因模型的结果汇总比较:
attribution_comparison = pd.DataFrame({ 'Last Click': last_click, 'Linear': linear, 'Time Decay': time_decay, 'Bayesian': channel_stats.set_index('channel')['bayesian_attribution'], 'Markov': pd.Series(markov_attribution) }) print(attribution_comparison)典型对比结果可能如下表所示:
| channel | Last Click | Linear | Time Decay | Bayesian | Markov |
|---|---|---|---|---|---|
| paid_search | 0.45 | 0.28 | 0.32 | 0.30 | 0.35 |
| 0.25 | 0.22 | 0.24 | 0.25 | 0.20 | |
| organic_social | 0.15 | 0.20 | 0.18 | 0.18 | 0.22 |
| direct | 0.10 | 0.15 | 0.12 | 0.14 | 0.10 |
| display_ads | 0.05 | 0.15 | 0.14 | 0.13 | 0.13 |
5.2 如何选择适合的归因模型
选择归因模型应考虑以下因素:
数据特征:
- 用户路径长度:路径越长,简单模型偏差越大
- 渠道数量:渠道越多,需要考虑互动效应
- 转化周期:周期越长,时间衰减越重要
业务目标:
- 品牌认知:早期接触渠道更重要
- 转化优化:临近转化渠道更关键
- 全漏斗分析:需要平衡各阶段贡献
资源限制:
- 计算复杂度:贝叶斯和马尔科夫需要更多资源
- 实施成本:简单模型更容易解释和维护
提示:在实际应用中,可以先用简单模型建立基线,再逐步引入复杂模型,比较结果差异并分析原因。
6. 高级技巧与优化方向
6.1 处理归因窗口问题
用户转化可能发生在多天甚至多个月后,我们需要定义合适的归因窗口:
def apply_attribution_window(df, window_days=30): # 计算每个用户首次接触时间 first_touch = df.groupby('user_id')['timestamp'].min().reset_index() first_touch.columns = ['user_id', 'first_touch'] # 合并回原始数据 df = pd.merge(df, first_touch, on='user_id') # 计算时间差 df['days_since_first'] = (df['timestamp'] - df['first_touch']).dt.days # 应用归因窗口 df = df[df['days_since_first'] <= window_days] return df windowed_df = apply_attribution_window(df, window_days=14)6.2 跨设备归因处理
用户可能在不同设备上与品牌互动,需要识别同一用户:
def identify_users(df, fingerprint_cols=['ip_address', 'user_agent']): # 使用设备指纹创建临时用户ID df['temp_user_id'] = df[fingerprint_cols].apply( lambda x: hash(tuple(x)), axis=1) # 可以使用更复杂的算法如概率匹配 return df # 应用用户识别 df = identify_users(df)6.3 增量贡献分析
评估增加或减少某个渠道预算对整体转化的影响:
def incremental_contribution(df, channel, increase_pct=0.1): # 获取该渠道当前的展示次数和转化次数 channel_data = df[df['channel'] == channel] impressions = len(channel_data) conversions = channel_data['conversion'].sum() base_rate = conversions / impressions # 模拟增加展示量后的预期转化 new_impressions = impressions * (1 + increase_pct) expected_conversions = new_impressions * base_rate # 计算增量贡献 increment = expected_conversions - conversions return increment incr = incremental_contribution(df, 'paid_search', 0.1) print(f"增加10%付费搜索预算预计带来 {incr:.1f} 次额外转化")