当前位置: 首页 > news >正文

银行级多维聚合:生产环境中的风控与分析实战

1. 这不是教科书里的聚合——它是银行风控系统每天跑的真实代码

你有没有遇到过这样的场景:业务方凌晨两点发来消息,“客户A在餐饮类商户单日刷了17笔,金额从89到498不等,系统没报警,但风控规则里明明写了‘单日同类别交易超5笔且标准差>200需人工复核’——这逻辑到底跑没跑?”
或者更常见的:财务总监在晨会问,“上季度南区Widget产品线的毛利同比涨了3.2%,但北区同期跌了1.8%,这个差异到底是区域销售策略问题,还是数据聚合时把退货订单和新订单混在一起算平均值导致的失真?”

这些不是理论题,是我在某股份制银行做数据分析平台建设时,每周至少被拉进三次的线上会议主题。而解决它们的核心武器,从来不是“写个SQL查一下”,而是多维聚合的工程化落地能力——它要求你既懂业务指标背后的含义(比如为什么“交易金额范围”比“平均值”更能暴露欺诈风险),又得清楚pandas底层分组机制如何影响内存占用(比如unstack()后列名嵌套层级怎么扁平化才不会让下游BI工具报错),还得预判生产环境里滚动窗口计算的NaN值处理方式(是前向填充、插值,还是直接丢弃?每种选择对监管报表的影响完全不同)。

这篇文章讲的,就是我带着团队在真实银行级数据管道中打磨出来的7类聚合模式。它不讲groupby().sum()这种入门操作,因为你在实际项目里根本不会单独用它;它也不堆砌API文档,因为rolling(window=7).mean()的参数说明网上一搜一大把。我要告诉你的是:当transaction_amount列里混着0.01元的测试流水、500万元的对公结算、以及被标记为“已冲正”的负数记录时,怎么写一行agg()调用就能同时输出均值、中位数、剔除异常值后的加权平均,且结果能直接喂给监管报送系统

关键词贯穿始终:多维聚合、生产级、银行分析、风控逻辑、可审计。如果你正在搭建信贷审批模型、设计反洗钱监控规则、或是优化零售银行的客户分群报表——那你不是在学pandas技巧,你是在构建业务决策的底层数据引擎。接下来的内容,每一行代码都来自我们上线的生产系统,每一个注意事项都踩过坑、交过学费。

2. 多维聚合的本质:不是分组,而是构建业务坐标系

2.1 为什么“GROUP BY region, product”只是起点,不是终点?

看一个真实案例:某银行信用卡中心要分析“不同地区客户在不同消费场景下的付费意愿”。原始数据表有3200万条交易记录,包含region(6个大区)、product_line(12类产品)、customer_tier(金卡/白金卡/黑卡)、amount(交易金额)、fee_rate(手续费率)等字段。如果按传统思路写:

# ❌ 危险操作:看似简洁,实则埋雷 df.groupby(['region', 'product_line', 'customer_tier'])['amount'].agg(['mean', 'std'])

表面看没问题,但实际运行时你会发现三件事:

  1. 内存爆炸:pandas会为每个分组生成独立的Series对象,当region×product_line×customer_tier组合数达到6×12×3=216种时,中间对象内存占用飙升至原始数据的3.7倍;
  2. 列名混乱:输出是MultiIndex DataFrame,外层是amount,内层是mean/std,当你想导出Excel给业务部门时,他们看到的是('amount', 'mean')这种鬼名字;
  3. 业务语义丢失std(标准差)对业务人员毫无意义,他们真正需要的是“高价值客户占比”(金额>5000元的交易笔数/总笔数)。

所以真正的多维聚合,第一步必须是定义业务坐标系——明确哪些维度是分析锚点(如region/product_line),哪些是度量指标(如amount/fee_rate),哪些是衍生指标(如高价值占比)。我们团队的标准做法是:

# ✅ 生产级写法:显式声明坐标系+业务指标 agg_config = { 'amount': [ ('avg_spend', 'mean'), ('spend_std', 'std'), ('high_value_ratio', lambda x: (x > 5000).sum() / len(x) if len(x) > 0 else 0) ], 'fee_rate': [ ('avg_fee_rate', 'mean'), ('fee_volatility', lambda x: x.std() / x.mean() if x.mean() != 0 else 0) ] } result = df.groupby(['region', 'product_line', 'customer_tier']).agg(agg_config)

提示:这里用元组('alias', 'func')替代字符串,是为了后续自动扁平化列名。high_value_ratio这种业务逻辑必须封装成lambda,而不是在agg后用result['amount']['high_value_ratio']去取——因为MultiIndex的链式索引在大数据量下性能极差。

2.2 坐标系降维:当业务问题要求“跨维度对比”时

风控经理常问:“北区金卡客户的平均交易额,和全行金卡客户平均值相比,偏离多少个标准差?”这需要两个层级的聚合:先按region+customer_tier分组算均值,再计算该均值与全局均值的Z-score。新手容易写成两步:

# ❌ 效率陷阱:两次全表扫描 regional_avg = df.groupby(['region', 'customer_tier'])['amount'].mean() global_avg = df['amount'].mean() global_std = df['amount'].std() z_score = (regional_avg - global_avg) / global_std

但生产环境里,3200万行数据扫两遍,I/O时间直接翻倍。我们的解法是用transform()一次完成:

# ✅ 向量化计算:利用广播机制 df['global_avg'] = df['amount'].mean() df['global_std'] = df['amount'].std() df['regional_avg'] = df.groupby(['region', 'customer_tier'])['amount'].transform('mean') df['z_score'] = (df['regional_avg'] - df['global_avg']) / df['global_std'] # 然后去重获取最终结果 final_result = df.drop_duplicates(subset=['region', 'customer_tier'])[['region', 'customer_tier', 'z_score']]

关键原理在于:transform()返回与原DataFrame等长的Series,天然支持跨层级计算。而agg()只返回分组结果,无法保留原始行粒度信息——这是区分“统计分析”和“生产级特征工程”的分水岭。

2.3 实操避坑:MultiIndex扁平化的三种死法与解法

result = df.groupby(['region','product']).agg({...})执行后,你会得到一个列名为('amount','mean')的MultiIndex。把它喂给Tableau或Power BI时,90%的失败源于列名处理。我们总结出三种典型错误:

错误类型表现根本原因解决方案
列名嵌套未展开导出CSV后列名显示为"('amount', 'mean')"pandas默认保留层级结构result.columns = ['_'.join(col).strip() for col in result.columns.values]
空格/特殊字符导致BI工具解析失败Tableau报错“Invalid column name”列名含空格、括号、斜杠result.columns = result.columns.map(lambda x: re.sub(r'[^a-zA-Z0-9_]', '_', str(x)))
重复列名覆盖amount_meanfee_mean合并后只剩一个agg配置中未指定别名强制使用元组:('amount_mean', 'mean')而非'mean'

最狠的实战技巧:在agg前预定义列名映射字典,让业务语义直接落地:

# 定义业务友好型列名映射 business_columns = { ('amount', 'mean'): 'avg_transaction_amt', ('amount', 'count'): 'total_transaction_cnt', ('fee_rate', 'mean'): 'avg_fee_rate_pct' } result = df.groupby(['region','product']).agg(agg_config) # 扁平化并重命名 result.columns = [business_columns.get(col, '_'.join(map(str, col))) for col in result.columns]

这样导出的Excel,业务方打开就能看懂,再也不用问“这个('amount', 'std')到底是什么意思”。

3. 自定义聚合函数:把业务规则编译进数据管道

3.1 为什么lambda不够用?从“交易范围”到“风险敞口”的进化

原文示例中的lambda x: x.max() - x.min()确实能算出交易范围,但在银行生产环境里,这行代码会被风控总监当场否决。原因很简单:它没考虑业务上下文。真实场景中,“范围”必须满足三个条件:

  • 排除测试流水(金额=0.01元的记录)
  • 剔除冲正交易(金额为负数的记录)
  • 对高频小额交易(如地铁扫码支付)启用动态阈值(单笔<10元的交易不参与计算)

所以真正的风险敞口计算函数长这样:

def risk_exposure(series): """ 计算风险敞口:剔除异常值后的交易金额范围 业务规则: - 过滤掉金额≤0.01的测试流水 - 过滤掉负数(冲正交易) - 对单日交易>50笔的客户,启用动态阈值:仅计算金额>10元的交易 """ # 步骤1:基础过滤 clean_series = series[(series > 0.01) & (series > 0)] # 步骤2:动态阈值判断(需传入额外参数,此处简化为全局变量) if len(clean_series) > 50: clean_series = clean_series[clean_series > 10] # 步骤3:防空处理 if len(clean_series) < 2: return 0.0 return clean_series.max() - clean_series.min() # 在agg中使用 result = df.groupby('customer_id').agg({'amount': risk_exposure})

注意:这个函数里没有print()、没有logging,因为生产管道要求零副作用。所有业务规则必须用纯函数式表达,否则在分布式计算(如Dask)中会因序列化失败而崩溃。

3.2 加权平均的陷阱:时间衰减权重 vs 业务重要性权重

原文的weighted_average函数用np.linspace(0.5,1.5,len(series))生成权重,这在时间序列预测中合理,但在银行客户价值评估中完全错误。真实业务逻辑是:

  • 近3个月交易权重为1.5倍(反映近期活跃度)
  • 3-6个月交易权重为1.0倍(基准权重)
  • 6个月以上交易权重为0.3倍(历史行为参考价值低)

于是我们重构为:

def customer_value_weighted_avg(series, transaction_dates, cutoff_days=90): """ 基于交易日期的加权平均,符合银行业务规则 """ # 确保dates和series长度一致 if len(transaction_dates) != len(series): raise ValueError("Dates and series length mismatch") # 计算每笔交易距今天的天数 today = pd.Timestamp.today() days_diff = (today - transaction_dates).dt.days # 定义权重规则 weights = np.where( days_diff <= cutoff_days, 1.5, np.where(days_diff <= cutoff_days * 2, 1.0, 0.3) ) return np.average(series, weights=weights) # 使用时需传入日期列 df['value_weighted_avg'] = df.groupby('customer_id').apply( lambda x: customer_value_weighted_avg( x['amount'], x['transaction_date'], cutoff_days=90 ) )

关键经验:所有自定义聚合函数必须接受可扩展参数(如cutoff_days),因为业务规则会变。硬编码90会导致每次规则调整都要改代码,而参数化设计让运维只需改配置文件。

3.3 高阶技巧:用agg()实现条件聚合(类似SQL的CASE WHEN)

业务方常提需求:“统计每个客户的高风险交易笔数(金额>5000且商户类别为‘虚拟商品’)、中风险交易笔数(金额2000-5000且商户类别为‘娱乐’)、低风险交易笔数(其余)”。用SQL写是:

SELECT SUM(CASE WHEN amount > 5000 AND category = 'Virtual Goods' THEN 1 ELSE 0 END) AS high_risk_cnt, SUM(CASE WHEN amount BETWEEN 2000 AND 5000 AND category = 'Entertainment' THEN 1 ELSE 0 END) AS mid_risk_cnt, COUNT(*) - high_risk_cnt - mid_risk_cnt AS low_risk_cnt FROM transactions GROUP BY customer_id;

pandas里用agg()实现更优雅:

def conditional_count(series, condition_func): """通用条件计数器""" return series[condition_func(series)].count() # 定义条件函数 high_risk_cond = lambda x: (x['amount'] > 5000) & (x['category'] == 'Virtual Goods') mid_risk_cond = lambda x: (x['amount'].between(2000, 5000)) & (x['category'] == 'Entertainment') # 在agg中调用 result = df.groupby('customer_id').agg({ 'amount': [ ('high_risk_cnt', lambda x: high_risk_cond(df.loc[x.index]).sum()), ('mid_risk_cnt', lambda x: mid_risk_cond(df.loc[x.index]).sum()) ] }) # 注意:这里利用了x.index获取当前分组的原始索引,从而能访问其他列

这个技巧的价值在于:把SQL里冗长的CASE WHEN逻辑,压缩成可复用的Python函数,且调试时能直接打印high_risk_cond(df)看布尔数组,比在数据库里EXPLAIN快十倍。

4. 时间窗口聚合:滚动与扩展窗口的生产级实践

4.1 滚动窗口的致命细节:window参数不是数字,而是业务周期

原文用rolling(window=3)计算3日均值,这在演示数据里没问题,但在真实银行系统中,window=3可能引发监管问询。因为:

  • 交易数据有工作日/节假日之分(周末无交易,3日窗口可能只含1个有效交易日)
  • 不同业务线周期不同(零售银行看7日,对公结算看30日,外汇交易看24小时)
  • 窗口必须对齐业务日历(如“本月累计”需按自然月滚动,而非固定30天)

我们的解决方案是offset替代window

# ✅ 按自然周滚动(周一到周日) df['weekly_avg'] = df.groupby('customer_id')['amount'].rolling('7D', min_periods=3).mean() # ✅ 按自然月滚动(自动处理28-31天差异) df['monthly_avg'] = df.groupby('customer_id')['amount'].rolling('30D').mean() # ✅ 按交易日滚动(排除非交易日) business_days = pd.bdate_range(start=df['date'].min(), end=df['date'].max()) df_business = df[df['date'].isin(business_days)] df_business['bd_weekly_avg'] = df_business.groupby('customer_id')['amount'].rolling('5B').mean() # 5 business days

提示:min_periods=3表示窗口内至少3个非空值才计算,避免因节假日导致大量NaN。这个参数比center=True更重要——后者只是让窗口居中,对业务无实质影响。

4.2 滚动窗口的NaN战争:四种处理策略的业务代价

rolling(window=7)遇到数据开头,必然产生6个NaN。不同处理方式对应不同业务后果:

处理方式代码示例适用场景监管风险
前向填充.fillna(method='ffill')内部运营看板(如客服团队监控当日异常)低:仅影响可视化,不用于报表
插值填充.interpolate(method='time')时间序列建模(如LSTM输入)中:需记录插值算法,审计时要验证合理性
丢弃NaN.dropna()监管报送(如银保监要求“连续7日数据”)高:可能导致报表期初数据缺失,需额外说明
动态窗口.rolling('7D', min_periods=1)实时风控(如“近7日首次出现大额交易”)极高:必须在规则文档中明确定义min_periods=1的业务含义

我们在某次银保监检查中就因fillna()未留痕被质疑。现在所有生产管道强制要求:

# ✅ 审计友好型NaN处理 df['rolling_7d'] = df.groupby('customer_id')['amount'].rolling('7D').mean() # 显式记录处理方式 df['rolling_7d_filled'] = df['rolling_7d'].fillna(method='ffill').round(2) df['rolling_7d_fill_method'] = 'forward_fill_for_ops_dashboard' # 写入元数据列

4.3 扩展窗口的隐藏价值:不只是累计求和

expanding().sum()看起来只是累加,但它在银行系统中有三个高阶用途:

用途1:动态基线计算
反洗钱规则要求“单日交易额超过过去30日均值的5倍即预警”。用扩展窗口可实时维护30日均值:

# 维护滚动30日均值(无需循环) df['30d_mean'] = df.groupby('customer_id')['amount'].expanding(30).mean().bfill() # bfill()用首个非空值填充开头的NaN df['alert_flag'] = (df['amount'] > df['30d_mean'] * 5).astype(int)

用途2:客户生命周期价值(CLV)分段
将客户按累计消费额分为青铜/白银/黄金等级:

def clv_segment(cumsum_series): """根据累计消费额分段""" if cumsum_series.iloc[-1] < 10000: return 'Bronze' elif cumsum_series.iloc[-1] < 50000: return 'Silver' else: return 'Gold' df['clv_segment'] = df.groupby('customer_id')['amount'].expanding().sum().apply(clv_segment)

用途3:监管报送的“截至当日”口径
人行要求报送“截至2024-06-30的累计交易笔数”,扩展窗口天然支持:

# 获取每个客户在最后一天的累计值 final_cumsum = df.groupby('customer_id')['amount'].expanding().sum().groupby('customer_id').tail(1) # tail(1)取每组最后一个值,即截至最后日期的累计值

关键洞察:扩展窗口的本质是状态机——它把无状态的聚合,变成了有记忆的流式计算。这正是现代银行实时风控系统的底层范式。

5. 多级分组与透视:从数据表到决策仪表盘的最后一步

5.1 unstack()的真相:它不是转置,而是维度升维

原文示例df.groupby(['region','product'])['revenue'].mean().unstack()看似简单,但背后涉及pandas的维度哲学。我们用一个例子揭示本质:

# 原始分组结果是Series,索引为MultiIndex s = df.groupby(['region','product'])['revenue'].mean() print(type(s)) # <class 'pandas.core.series.Series'> print(s.index) # MultiIndex([('North', 'Widget'), ('North', 'Gadget'), ...]) # unstack()后变成DataFrame,原索引第二层(product)变成列 df_unstacked = s.unstack() print(type(df_unstacked)) # <class 'pandas.core.frame.DataFrame'> print(df_unstacked.columns) # Index(['Widget', 'Gadget'], dtype='object')

所以unstack()的实质是:将MultiIndex的某一层(默认最后一层)提升为列维度,把数据从“一维序列”变为“二维矩阵”。这正是业务人员理解数据的方式——他们脑中天然有“行是地区、列是产品”的坐标系。

5.2 生产级透视:处理缺失组合与稀疏矩阵

真实数据中,North地区可能根本没有Gadget产品销售,此时unstack()会产生NaN。但业务报表要求“显示0而非空白”,且需兼容Excel的数值格式。正确做法:

# ✅ 三步走:补全缺失组合 + 填充0 + 类型转换 # 步骤1:生成所有可能的组合(避免unstack后出现NaN列) all_regions = df['region'].unique() all_products = df['product'].unique() index_grid = pd.MultiIndex.from_product([all_regions, all_products], names=['region','product']) # 步骤2:reindex确保所有组合存在 s_full = s.reindex(index_grid, fill_value=0) # 步骤3:unstack并转为int(避免Excel里显示0.0) result = s_full.unstack(fill_value=0).astype(int)

注意:fill_value=0必须在unstack()时指定,而不是之后用fillna(0)——因为后者会把原本就存在的NaN(如计算错误)也填成0,掩盖数据质量问题。

5.3 超越unstack():用pivot_table实现动态维度切换

当业务需求变成“用户可自由选择行/列维度”时(如BI工具里的拖拽分析),unstack()就不够用了。此时要用pivot_table()

# ✅ 动态透视:支持任意维度组合 def create_pivot(df, index_col, columns_col, values_col, aggfunc='sum'): """ 创建可配置的透视表 """ return df.pivot_table( index=index_col, columns=columns_col, values=values_col, aggfunc=aggfunc, fill_value=0, margins=True, # 添加总计行/列 margins_name='Total' ) # 业务方随时可切换维度 region_product_pivot = create_pivot(df, 'region', 'product', 'revenue', 'mean') customer_category_pivot = create_pivot(df, 'customer_tier', 'category', 'amount', 'sum')

pivot_table()的优势在于:

  • margins=True自动生成行列总计,省去手动pd.concat()
  • fill_value=0确保缺失值统一处理;
  • 支持多值列(values=[col1,col2]),一次生成多个指标矩阵。

我们在某次向董事会汇报时,用这个函数10分钟内生成了7个不同维度的交叉分析表,而传统方法要写7个unstack()脚本。

6. 端到端实战:构建银行级客户交易分析流水线

6.1 数据准备:模拟真实银行数据的5个关键特征

真实信用卡数据绝不是均匀分布的随机数。我们按银行业务特征生成数据:

import numpy as np import pandas as pd from datetime import datetime, timedelta def generate_bank_data(n_records=100000): """ 生成符合银行业务特征的模拟数据 特征1:交易时间服从泊松分布(工作日白天高峰) 特征2:金额服从对数正态分布(小额高频,大额低频) 特征3:商户类别有强相关性(餐饮客户大概率也刷超市) 特征4:存在明显异常值(0.01元测试流水、500万对公转账) 特征5:含业务标识字段(是否冲正、是否分期) """ np.random.seed(42) # 时间分布:工作日9-18点高峰 dates = pd.date_range('2024-01-01', periods=n_records, freq='H') # 随机打乱时间(模拟真实交易时间戳) dates = np.random.choice(dates, n_records, replace=True) # 商户类别:按业务相关性设置概率 categories = np.random.choice( ['Groceries', 'Dining', 'Retail', 'Travel', 'Utilities', 'Healthcare'], size=n_records, p=[0.25, 0.20, 0.15, 0.15, 0.15, 0.10] # 超市最高频 ) # 金额:对数正态分布 + 异常值 amounts = np.random.lognormal(mean=8, sigma=1.5, size=n_records).round(2) # 插入异常值:0.01元测试流水占1%,500万大额交易占0.01% test_mask = np.random.random(n_records) < 0.01 large_mask = np.random.random(n_records) < 0.0001 amounts[test_mask] = 0.01 amounts[large_mask] = 5000000.00 # 客户分层:金卡/白金/黑卡,不同消费能力 customer_tiers = np.random.choice( ['Gold', 'Platinum', 'Black'], size=n_records, p=[0.5, 0.3, 0.2] ) # 冲正标识:约0.5%交易被冲正 reversal_flag = np.random.random(n_records) < 0.005 return pd.DataFrame({ 'transaction_id': [f'TXN{str(i).zfill(8)}' for i in range(n_records)], 'date': dates, 'category': categories, 'amount': amounts, 'customer_tier': customer_tiers, 'is_reversal': reversal_flag, 'merchant_id': np.random.randint(10000, 99999, n_records) }) df = generate_bank_data(50000) print(f"生成数据形状: {df.shape}") print(f"异常值比例: {((df['amount']==0.01) | (df['amount']>1000000)).mean():.2%}")

这个生成器的价值在于:它复现了真实数据的脏、乱、杂。后续所有聚合操作都必须在这种数据上验证鲁棒性。

6.2 流水线设计:7层分析模块的依赖关系

我们把端到端分析拆解为7个原子化模块,每个模块输出可验证的中间结果:

模块输入输出业务目标验证方式
1. 数据清洗原始dfclean_df剔除测试流水、冲正交易clean_df['amount'].min() > 0.01
2. 客户分群clean_dfcluster_df按RFM模型分群(最近交易/频次/金额)各群客户数占比符合预期分布
3. 多维聚合clean_dfmulti_aggregion×category×tier的均值/标准差输出行数=region数×category数×tier数
4. 时间窗口clean_dftime_window_df每客户7日/30日滚动均值time_window_df['7d_avg'].notna().mean() > 0.95
5. 风险指标clean_dfrisk_df高风险交易占比、交易范围risk_df['high_risk_ratio'].between(0,1).all()
6. 透视分析multi_aggpivot_df地区×产品矩阵pivot_df.shape == (len(regions), len(products))
7. 报表生成所有中间结果final_reportExcel报表含图表、注释、数据字典人工抽检10个单元格数值

每个模块用独立函数封装,支持单独测试:

def validate_module_3(output_df, expected_rows): """验证多维聚合模块""" assert len(output_df) == expected_rows, f"Expected {expected_rows} rows, got {len(output_df)}" assert output_df.index.names == ['region', 'category', 'customer_tier'], "Index names mismatch" print("✅ 模块3验证通过") # 调用验证 validate_module_3(multi_agg, len(df['region'].unique()) * len(df['category'].unique()) * len(df['customer_tier'].unique()))

6.3 关键代码:生产环境部署的完整流水线

以下是我们在某城商行实际部署的简化版流水线(已脱敏):

import pandas as pd import numpy as np from typing import Dict, List, Callable class BankAnalyticsPipeline: def __init__(self, data: pd.DataFrame): self.raw_data = data.copy() self.results = {} def run_all(self) -> Dict[str, pd.DataFrame]: """运行全部分析模块""" print("🚀 启动银行级分析流水线...") # 模块1:数据清洗 self.results['clean_data'] = self._clean_data() # 模块2:客户分群(RFM) self.results['rfm_clusters'] = self._rfm_clustering() # 模块3:多维聚合 self.results['multi_dimensional'] = self._multi_dimensional_agg() # 模块4:时间窗口 self.results['time_windows'] = self._time_window_analysis() # 模块5:风险指标 self.results['risk_metrics'] = self._risk_metrics_calculation() # 模块6:透视分析 self.results['pivot_tables'] = self._create_pivot_tables() # 模块7:报表整合 self.results['final_report'] = self._generate_final_report() print("✅ 流水线执行完成") return self.results def _clean_data(self) -> pd.DataFrame: """清洗:剔除测试流水、冲正交易、异常大额""" df = self.raw_data.copy() # 剔除0.01元测试流水 df = df[df['amount'] != 0.01] # 剔除冲正交易 df = df[~df['is_reversal']] # 剔除超大额(500万以上,需单独审核) df = df[df['amount'] < 5000000] return df def _rfm_clustering(self) -> pd.DataFrame: """RFM客户分群:Recency, Frequency, Monetary""" df = self.results['clean_data'].copy() # 计算每个客户的RFM值 today = df['date'].max() rfm = df.groupby('customer_id').agg({ 'date': lambda x: (today - x.max()).days, # Recency: 天数 'transaction_id': 'count', # Frequency 'amount': 'sum' # Monetary }).rename(columns={'date': 'recency', 'transaction_id': 'frequency', 'amount': 'monetary'}) # 分层:按分位数切分 rfm['r_score'] = pd.qcut(rfm['recency'], q=5, labels=False, duplicates='drop') + 1 rfm['f_score'] = pd.qcut(rfm['frequency'], q=5, labels=False, duplicates='drop') + 1 rfm['m_score'] = pd.qcut(rfm['monetary'], q=5, labels=False, duplicates='drop') + 1 # 综合得分 rfm['rfm_score'] = rfm['r_score'] * 100 + rfm['f_score'] * 10 + rfm['m_score'] return rfm def _multi_dimensional_agg(self) -> pd.DataFrame: """多维聚合:region × category × customer_tier""" df = self.results['clean_data'].copy() agg_config = { 'amount': [ ('avg_amount', 'mean'), ('amount_std', 'std'), ('high_value_ratio', lambda x: (x > 5000).sum() / len(x) if len(x) > 0 else 0), ('transaction_count', 'count') ], 'merchant_id': [('unique_merchants', lambda x: x.nunique())] } result = df.groupby(['region', 'category', 'customer_tier']).agg(agg_config) # 扁平化列名 result.columns = ['_'.join(col).strip() for col in result.columns.values] return result def _time_window_analysis(self) -> pd.DataFrame: """时间窗口分析:7日/30日滚动均值""" df = self.results['clean_data'].copy() # 按客户排序 df_sorted = df.sort_values(['customer_id', 'date']) # 计算滚动均值 df_sorted['7d_avg'] = df_sorted.groupby('customer_id')['amount'].rolling( window='7D', min_periods=3 ).mean().reset_index(level=0, drop=True) df_sorted['30d_avg'] = df_sorted.groupby('customer_id')['amount'].rolling( window='30D', min_periods=10 ).mean().reset_index(level=0, drop=True) return df_sorted[['customer_id', 'date', 'amount', '7d_avg', '30d_avg']] def _risk_metrics_calculation(self) -> pd.DataFrame: """风险指标:交易范围、波动率、高风险占比""" df = self.results['clean_data'].copy() def risk_exposure(series): clean_series = series[(series > 0.
http://www.jsqmd.com/news/1027736/

相关文章:

  • 2026年不锈钢C型钢费用解析,浙江联航收费合理 - myqiye
  • 2026年玻璃钢电缆沟盖板源头厂家推荐甄选:工艺、案例与性价比综合评测 - 优质品牌商家
  • HTML优先架构实战:一个配置改动让用户量翻倍!
  • 腾讯混元API生产级接入:稳定性、成本与低延迟实战指南
  • 郴州漏水检测维修权威推荐:卫生间-厨房-阳台-屋顶天花板漏水维修:靠谱防水补漏公司团队TOP5推荐(2026最新深度调研实测榜单) - 即刻修防水
  • 数据科学四条职业路径:分析、工程、建模与产品型
  • 3分钟,让B站评论区变得不再“陌生”的秘密
  • 机器学习股票方向预测实战:从数据清洗到可解释建模
  • 微信群投票小程序怎么弄,云帆投票+西瓜评选+腾讯投票,投票平台深度对比测评 - 投票小程序
  • 2026年自贡花岗石厂家选购指南:砂岩与花岗石行业趋势与厂商深度评测 - 优质品牌商家
  • 2026靠谱卫生间防水材料供应商,品牌对比与价格分析 - myqiye
  • 从CTF实战到工程防御:XSS跨站脚本攻击原理与防护全解析
  • GPT-4o真实能力图谱:文本图像已上线,语音视频仍待交付
  • 鄂州漏水检测维修权威推荐:卫生间-厨房-阳台-屋顶天花板漏水维修:靠谱防水补漏公司团队TOP5推荐(2026最新深度调研实测榜单) - 即刻修防水
  • Java毕业设计-基于 SpringBoot 的宠物之家综合管理系统的设计与实现 面向宠物服务场景的宠物之家管理平台设计与实现(源码+LW+部署文档+全bao+远程调试+代码讲解等)
  • Python弱引用与内存泄漏防治
  • Vue 3跨平台UI框架架构设计:uView-Plus企业级组件库解决方案
  • 干货:做耐老化低温后补式玻璃门公司价格解析 - myqiye
  • 如何在Python中实现Black-Litterman资产配置?终极实战指南
  • 深度学习中的线性代数:矩阵乘法、基变换与SVD实战指南
  • 2026年长沙彩金回收怎么选?官方甄选几家正规机构推荐指南 - 优质品牌商家
  • 医疗费用预测实战:临床逻辑驱动的可解释机器学习建模
  • 从零搭建个人AI助手:轻量化LLM部署与联网搜索实战
  • 用计算机视觉做体感游戏:实时姿态估计与Unity集成实战
  • 怪物猎人世界终极插件指南:HunterPie三步快速配置教程
  • NXP QorIQ USDPAA开发实战:用户空间数据平面加速核心原理与性能调优
  • 济南商河有CNAS认可实验室的标气供应商推荐 - myqiye
  • 2026年评价高的长沙罗汉松/小叶造型罗汉松/浏阳造型罗汉松庭院推荐 - 行业平台推荐
  • Sqribble:基于模板规则的轻量级文档操作系统解析
  • ColdFire V5核心架构解析:双发射超流水线如何实现嵌入式SoC性能跃迁