用Python手把手实现协同过滤推荐:从UserCF到ItemCF的完整代码与避坑指南
用Python手把手实现协同过滤推荐:从UserCF到ItemCF的完整代码与避坑指南
推荐系统早已渗透进我们数字生活的每个角落——从电商平台的"猜你喜欢"到视频网站的"推荐观看",背后都离不开协同过滤(Collaborative Filtering)这一经典算法。不同于复杂的深度学习模型,协同过滤以其直观的逻辑和可解释性,成为许多企业的首选推荐方案。本文将带你用Python完整实现UserCF和ItemCF算法,重点解决实际编码中的三大难题:相似度矩阵计算效率、数据稀疏性处理,以及如何将数学公式转化为可维护的工程代码。
1. 环境准备与数据加载
在开始编写推荐算法前,我们需要搭建一个可复现的Python环境。推荐使用Anaconda创建独立环境:
conda create -n recommender python=3.8 conda activate recommender pip install numpy pandas scikit-learn协同过滤的核心数据是用户-物品交互矩阵。我们模拟一个电影评分数据集,包含10位用户对50部电影的评分(1-5分),其中90%的评分为空值,模拟真实场景的数据稀疏性:
import numpy as np import pandas as pd from sklearn.model_selection import train_test_split def generate_movie_ratings(num_users=10, num_items=50, sparsity=0.9): np.random.seed(42) ratings = np.random.randint(1, 6, size=(num_users, num_items)) mask = np.random.random(size=(num_users, num_items)) < sparsity ratings[mask] = 0 # 0表示未评分 return pd.DataFrame(ratings, index=[f'User_{i}' for i in range(1, num_users+1)], columns=[f'Movie_{i}' for i in range(1, num_items+1)]) ratings_df = generate_movie_ratings() print(ratings_df.head())关键点注意:
- 实际项目中应该使用
scipy.sparse矩阵存储数据以节省内存 - 保留原始评分和归一化评分两个版本,便于不同相似度计算
- 建议将数据拆分为训练集和测试集,评估推荐质量:
train_df, test_df = train_test_split(ratings_df, test_size=0.2, random_state=42)2. UserCF实现与优化技巧
基于用户的协同过滤(UserCF)遵循"相似用户喜欢相似物品"的基本假设。其核心步骤可分为相似度计算、邻居选取和评分预测三个阶段。
2.1 相似度矩阵的高效计算
余弦相似度和皮尔逊相关系数是两种最常用的相似度度量。我们实现一个支持多种相似度计算的通用函数:
from sklearn.metrics.pairwise import cosine_similarity def calculate_similarity(df, method='cosine'): """计算用户相似度矩阵""" if method == 'cosine': # 填充未评分为用户平均分 filled = df.apply(lambda row: row.replace(0, row[row!=0].mean()), axis=1) return pd.DataFrame(cosine_similarity(filled), index=df.index, columns=df.index) elif method == 'pearson': # 皮尔逊相关系数 return df.T.corr() else: raise ValueError("method must be 'cosine' or 'pearson'") user_sim_matrix = calculate_similarity(train_df, method='pearson')性能优化技巧:
- 对于大型矩阵,使用
numba加速循环计算 - 并行化计算:将矩阵分块后使用
joblib并行处理 - 近似最近邻(ANN)算法如
faiss加速相似用户查找
2.2 评分预测与推荐生成
选取Top-K相似用户后,加权聚合他们的评分作为预测:
def predict_rating(user_sim, ratings, target_user, target_item, k=5): """预测目标用户对目标物品的评分""" if target_item in ratings.loc[target_user][ratings.loc[target_user] > 0].index: return ratings.loc[target_user, target_item] # 已评分则返回原值 # 获取相似用户 sim_users = user_sim[target_user].sort_values(ascending=False)[1:k+1] # 排除自己 # 计算加权平均 weighted_sum = 0 sim_sum = 0 mean_rating = ratings.loc[target_user][ratings.loc[target_user] > 0].mean() for user, sim in sim_users.items(): if ratings.loc[user, target_item] > 0: # 相似用户已评价该物品 user_mean = ratings.loc[user][ratings.loc[user] > 0].mean() weighted_sum += sim * (ratings.loc[user, target_item] - user_mean) sim_sum += sim if sim_sum == 0: return mean_rating # 无相似用户评价过该物品 return mean_rating + weighted_sum / sim_sum # 示例:预测User_1对Movie_25的评分 pred = predict_rating(user_sim_matrix, train_df, 'User_1', 'Movie_25') print(f"预测评分: {pred:.2f}")常见陷阱:
- 未处理冷启动用户(相似用户数为0的情况)
- 相似度计算时未考虑共同评分项数量
- 未对预测评分进行截断(应限制在评分范围内)
3. ItemCF实现与工程实践
基于物品的协同过滤(ItemCF)在实际系统中更为常用,因为它更稳定且可预先计算物品相似度。其核心思想是"用户喜欢与其历史物品相似的物品"。
3.1 物品相似度计算优化
物品相似度矩阵的计算是ItemCF的关键步骤,需要注意以下几点:
def item_similarity(ratings, method='adjusted_cosine'): """计算物品相似度矩阵""" if method == 'adjusted_cosine': # 调整余弦相似度:减去用户平均分 user_means = ratings.apply(lambda row: row[row!=0].mean(), axis=1) centered = ratings.apply(lambda row: row - user_means[row.name], axis=0) centered[ratings == 0] = 0 # 保持未评分为0 sim = cosine_similarity(centered.T) np.fill_diagonal(sim, 0) # 对角线置零 return pd.DataFrame(sim, index=ratings.columns, columns=ratings.columns) elif method == 'conditional_probability': # 基于条件概率的相似度(适合隐式反馈) cooccur = ratings.T @ ratings diag = np.diag(cooccur) return cooccur / (diag[:, None] + diag[None, :] - cooccur) else: raise ValueError("Unsupported similarity method") item_sim_matrix = item_similarity(train_df)工程实践建议:
- 物品相似度矩阵可离线计算并缓存
- 对长尾物品进行降权处理(如
log(1+count)) - 引入时间衰减因子,让近期交互物品获得更高权重
3.2 生成个性化推荐列表
结合用户历史行为和物品相似度生成推荐:
def generate_recommendations(user, ratings, item_sim, k=5, n_recommend=10): """为用户生成Top-N推荐""" rated_items = ratings.loc[user][ratings.loc[user] > 0].index if len(rated_items) == 0: return [] # 冷启动用户 # 聚合相似物品 recommendation_scores = {} for item in rated_items: similar_items = item_sim[item].sort_values(ascending=False)[:k] for similar_item, score in similar_items.items(): if similar_item not in rated_items: recommendation_scores[similar_item] = recommendation_scores.get(similar_item, 0) + score # 返回Top-N推荐 return sorted(recommendation_scores.items(), key=lambda x: x[1], reverse=True)[:n_recommend] # 为User_1生成推荐 recs = generate_recommendations('User_1', train_df, item_sim_matrix) print("推荐电影及相似度分数:", recs)关键改进点:
- 引入多样性控制:避免推荐过于相似的物品
- 考虑物品流行度偏差:对热门物品适当降权
- 实时更新:将用户最近交互快速纳入推荐计算
4. 评估与调优策略
推荐系统的评估需要从预测准确度、排序质量和业务指标三个维度考量。
4.1 离线评估指标实现
from sklearn.metrics import mean_squared_error def evaluate(ratings_true, ratings_pred): """评估预测准确度""" mask = ratings_true > 0 # 只评估有真实评分的项 rmse = np.sqrt(mean_squared_error(ratings_true[mask], ratings_pred[mask])) mae = np.mean(np.abs(ratings_true[mask] - ratings_pred[mask])) return {'RMSE': rmse, 'MAE': mae} # 在测试集上评估UserCF test_users = test_df.index test_items = test_df.columns user_cf_pred = np.zeros_like(test_df) for i, user in enumerate(test_users): for j, item in enumerate(test_items): user_cf_pred[i, j] = predict_rating(user_sim_matrix, train_df, user, item) metrics = evaluate(test_df.values, user_cf_pred) print(f"UserCF评估结果: {metrics}")4.2 线上AB测试关键指标
虽然离线指标重要,但真实效果还需线上验证:
| 指标类型 | 具体指标 | 测量方法 |
|---|---|---|
| 用户参与度 | CTR、观看时长、转化率 | 实验组vs对照组的均值比较 |
| 系统多样性 | 推荐覆盖率、新颖性 | 统计推荐物品的分布情况 |
| 商业目标 | GMV、客单价、留存率 | 漏斗分析和留存曲线对比 |
4.3 超参数调优策略
协同过滤中有几个关键超参数需要优化:
相似度计算方式选择:
- 余弦相似度:计算简单但对评分尺度敏感
- 皮尔逊系数:消除用户评分偏差
- 改进余弦相似度:考虑共同评分项数量
邻居数量K:
- 太小会导致推荐不稳定
- 太大会引入噪声
- 建议网格搜索:
[5, 10, 20, 50]
相似度阈值:
- 过滤低相似度的用户/物品
- 提高推荐相关性但可能减少覆盖
from itertools import product param_grid = { 'similarity': ['cosine', 'pearson'], 'k': [3, 5, 10], 'min_sim': [0, 0.2, 0.4] } best_score = float('inf') best_params = {} for params in product(*param_grid.values()): current_params = dict(zip(param_grid.keys(), params)) # 交叉验证评估... if score < best_score: best_score = score best_params = current_params5. 生产环境部署建议
将协同过滤模型投入生产需要考虑实时性、可扩展性和容错性。
5.1 系统架构设计
[实时日志] → [流处理] → [特征存储] ↓ [离线训练] → [模型存储] → [在线服务] → [API] ↑ [用户画像] ← [数据仓库]关键组件:
- 特征存储:保存用户/物品嵌入向量
- 模型服务:低延迟的相似度查询
- 缓存层:Redis缓存热门推荐结果
5.2 性能优化方案
当用户量超过百万级别时,需要特殊处理:
矩阵分解降维:
from sklearn.decomposition import TruncatedSVD svd = TruncatedSVD(n_components=100) user_embeddings = svd.fit_transform(ratings_df)近似最近邻搜索:
from annoy import AnnoyIndex index = AnnoyIndex(100, 'angular') # 100维向量 for i, vec in enumerate(user_embeddings): index.add_item(i, vec) index.build(10) # 10棵树分布式计算:
- 使用Spark MLlib的
ALS实现 - 相似度矩阵分块计算
- 使用Spark MLlib的
5.3 冷启动解决方案
对于新用户和新物品的推荐问题:
混合推荐策略:
- 新用户:基于人口统计学的推荐
- 新物品:基于内容相似度的推荐
探索-利用机制:
- Thompson Sampling
- Epsilon-Greedy
迁移学习:
from gensim.models import Word2Vec # 将用户行为序列视为句子 sequences = [[item for item in row[row>0].index] for _, row in ratings_df.iterrows()] model = Word2Vec(sequences, vector_size=64, window=5)
在实际项目中,协同过滤往往作为基础推荐模块,与深度学习模型(如NeuralCF、Two-Tower模型)结合使用。但无论如何组合,理解这些基础算法的实现细节和适用场景,都是构建高质量推荐系统的关键第一步。
