数据仓库笔记 第五篇:Data Mart 层(数据集市)
数据仓库笔记 第五篇:Data Mart 层(数据集市)
摘要
什么是 Data Mart?Data Mart(数据集市)是数据仓库的面向主题的子集,专门为特定业务部门或分析场景服务。
我个人建议,前端BI工具,无论是Power BI或者是Tableau,都从这一层取数据,这样一来可以提高前端报表性能,同时也可以降低前端工具的建模难度。
此笔记使用的数据库为SQLServer,相应的示例脚本都围绕于此,其它数据库的相应实现会略有不同。
┌───────────────────────────────────────────────────────────────┐
│ Enterprise Data Warehouse │
│ (企业级数据仓库) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Star Schema / Data Vault 统一数据模型 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ↓ ↓ ↓ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ 销售数据集市 │ │ 财务数据集市 │ │ 客户数据集市 │ │
│ │ Sales Mart │ │ Finance Mart │ │ Customer Mart │ │
│ │ │ │ │ │ │ │
│ │ 销售报表 │ │ 财务报表 │ │ 客户分析 │ │
│ │ 业绩分析 │ │ 预算分析 │ │ 流失分析 │ │
│ │ 区域分析 │ │ 成本分析 │ │ 价值分析 │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
└───────────────────────────────────────────────────────────────┘
Data Mart vs Data Warehouse
| 维度 | Data Warehouse(全量) | Data Mart(子集) |
|---|---|---|
| 范围 | 全企业 | 单个部门/主题 |
| 数据量 | TB~PB 级 | GB~TB 级 |
| 用户 | 全企业用户 | 特定业务用户 |
| 建设周期 | 长(6个月~数年) | 短(数周~数月) |
| 成本 | 高 | 较低 |
| 性能 | 查询较慢 | 查询极快 |
| 依赖关系 | 独立或从 DW 取数 | 通常从 DW 取数 |
Data Mart 的三种架构模式
模式一:独立数据集市(Standalone) 模式二:依赖数据集市(Dependent)┌────────┐ ┌────────┐ ┌────────┐ ┌────────────────┐
│ 源系统 │ │ 源系统 │ │ 源系统 │ │ 数据仓库 │
└───┬────┘ └───┬────┘ └───┬────┘ └───────┬────────┘│ │ │ │└─────────────┼─────────────┘ │↓ ↓┌──────────────────┐ ┌──────────────────┐│ 销售数据集市 │ │ 销售数据集市 │└──────────────────┘ └──────────────────┘↑ 各自从源系统直接取数 从数据仓库取数(推荐)模式三:联邦数据集市(Federated)┌──────────────┐│ 数据仓库 │└──────┬───────┘│┌──────────┼──────────┐↓ ↓ ↓
┌────────┐ ┌────────┐ ┌────────┐
│ 销售 │ │ 财务 │ │ 客户 │
│ 数据 │ │ 数据 │ │ 数据 │
│ 集市 │ │ 集市 │ │ 集市 │
└────────┘ └────────┘ └────────┘
SQL 实战:创建 Data Mart 层
销售数据集市
-- ============================================================
-- 在 mart_db 中创建销售数据集市
-- ============================================================USE mart_db;
GO-- 月度销售汇总事实表
IF OBJECT_ID('dbo.fact_monthly_sales', 'U') IS NOT NULL DROP TABLE dbo.fact_monthly_sales;
GOCREATE TABLE dbo.fact_monthly_sales (month_id BIGINT IDENTITY(1,1) PRIMARY KEY,month_key INT NOT NULL, -- YYYYMMyear SMALLINT NOT NULL,month SMALLINT NOT NULL,category NVARCHAR(50) NULL,brand NVARCHAR(50) NULL,customer_type VARCHAR(20) NULL,city NVARCHAR(50) NULL,region NVARCHAR(50) NULL,-- 销售指标order_count INT NOT NULL DEFAULT 0,order_quantity INT NOT NULL DEFAULT 0,gross_sales DECIMAL(16,2) NOT NULL DEFAULT 0,net_sales DECIMAL(16,2) NOT NULL DEFAULT 0,discount_amount DECIMAL(12,2) DEFAULT 0,total_profit DECIMAL(16,2) DEFAULT 0,profit_margin DECIMAL(6,3) DEFAULT 0,-- 对比指标prev_month_sales DECIMAL(16,2) DEFAULT 0,prev_year_sales DECIMAL(16,2) DEFAULT 0,mom_growth DECIMAL(8,3) NULL, -- 环比增长%yoy_growth DECIMAL(8,3) NULL, -- 同比增长%-- 元数据etl_batch_id VARCHAR(50) NULL,load_time DATETIME DEFAULT GETDATE()
);
GOCREATE NONCLUSTERED INDEX idx_mms_month ON dbo.fact_monthly_sales(year, month);
CREATE NONCLUSTERED INDEX idx_mms_category ON dbo.fact_monthly_sales(category);
CREATE NONCLUSTERED INDEX idx_mms_brand ON dbo.fact_monthly_sales(brand);
GO-- 日趋势事实表
IF OBJECT_ID('dbo.fact_daily_sales_trend', 'U') IS NOT NULL DROP TABLE dbo.fact_daily_sales_trend;
GOCREATE TABLE dbo.fact_daily_sales_trend (date_key INT NOT NULL,category NVARCHAR(50) NULL,brand NVARCHAR(50) NULL,order_count INT NOT NULL DEFAULT 0,order_quantity INT NOT NULL DEFAULT 0,net_sales DECIMAL(16,2) NOT NULL DEFAULT 0,total_profit DECIMAL(16,2) DEFAULT 0
);
GOCREATE NONCLUSTERED INDEX idx_dst_date ON dbo.fact_daily_sales_trend(date_key);
CREATE NONCLUSTERED INDEX idx_dst_category ON dbo.fact_daily_sales_trend(category);
GO-- 销售目标追踪表
IF OBJECT_ID('dbo.fact_sales_target', 'U') IS NOT NULL DROP TABLE dbo.fact_sales_target;
GOCREATE TABLE dbo.fact_sales_target (target_id BIGINT IDENTITY(1,1) PRIMARY KEY,year SMALLINT NOT NULL,month SMALLINT NOT NULL,category NVARCHAR(50) NULL,region NVARCHAR(50) NULL,target_amount DECIMAL(16,2) NOT NULL,actual_amount DECIMAL(16,2) DEFAULT 0,completion_rate DECIMAL(6,3) DEFAULT 0,gap_amount DECIMAL(16,2) DEFAULT 0,etl_batch_id VARCHAR(50) NULL,load_time DATETIME DEFAULT GETDATE()
);
GOCREATE NONCLUSTERED INDEX idx_st_year ON dbo.fact_sales_target(year);
CREATE NONCLUSTERED INDEX idx_st_month ON dbo.fact_sales_target(year, month);
GO
客户数据集市
USE mart_db;
GO-- 客户月度汇总事实表
IF OBJECT_ID('dbo.fact_customer_monthly', 'U') IS NOT NULL DROP TABLE dbo.fact_customer_monthly;
GOCREATE TABLE dbo.fact_customer_monthly (monthly_id BIGINT IDENTITY(1,1) PRIMARY KEY,month_key INT NOT NULL,customer_type VARCHAR(20) NULL,city NVARCHAR(50) NULL,region NVARCHAR(50) NULL,-- 客户数量指标total_customers INT NOT NULL DEFAULT 0,new_customers INT NOT NULL DEFAULT 0,active_customers INT NOT NULL DEFAULT 0,churned_customers INT NOT NULL DEFAULT 0,inactive_customers INT NOT NULL DEFAULT 0,-- 销售贡献total_sales DECIMAL(16,2) DEFAULT 0,avg_order_value DECIMAL(10,2) DEFAULT 0,-- 客户价值total_profit DECIMAL(16,2) DEFAULT 0,avg_lifetime_value DECIMAL(12,2) DEFAULT 0,etl_batch_id VARCHAR(50) NULL,load_time DATETIME DEFAULT GETDATE()
);
GOCREATE NONCLUSTERED INDEX idx_fcm_month ON dbo.fact_customer_monthly(month_key);
GO-- 客户 RFM 分析表
IF OBJECT_ID('dbo.fact_customer_rfm', 'U') IS NOT NULL DROP TABLE dbo.fact_customer_rfm;
GOCREATE TABLE dbo.fact_customer_rfm (customer_key BIGINT NOT NULL PRIMARY KEY,customer_id VARCHAR(50) NOT NULL,customer_name NVARCHAR(100) NULL,city NVARCHAR(50) NULL,-- RFM 指标r_recency INT NULL, -- 距上次购买天数r_score VARCHAR(1) NULL, -- R评分 1-5f_frequency INT NULL, -- 购买次数f_score VARCHAR(1) NULL,m_monetary DECIMAL(12,2) NULL, -- 总消费金额m_score VARCHAR(1) NULL,rfm_score VARCHAR(3) NULL, -- 组合评分(如"543")rfm_segment NVARCHAR(50) NULL, -- 客户分群rfm_segment_desc NVARCHAR(200) NULL, -- 分群说明last_order_date DATE NULL,load_time DATETIME DEFAULT GETDATE()
);
GOCREATE NONCLUSTERED INDEX idx_rfm_score ON dbo.fact_customer_rfm(rfm_score);
CREATE NONCLUSTERED INDEX idx_rfm_seg ON dbo.fact_customer_rfm(rfm_segment);
CREATE NONCLUSTERED INDEX idx_rfm_city ON dbo.fact_customer_rfm(city);
GO
财务数据集市
USE mart_db;
GO-- 月度财务汇总事实表
IF OBJECT_ID('dbo.fact_monthly_finance', 'U') IS NOT NULL DROP TABLE dbo.fact_monthly_finance;
GOCREATE TABLE dbo.fact_monthly_finance (finance_id BIGINT IDENTITY(1,1) PRIMARY KEY,month_key INT NOT NULL,year SMALLINT NOT NULL,month SMALLINT NOT NULL,category NVARCHAR(50) NULL,customer_type VARCHAR(20) NULL,-- 收入gross_revenue DECIMAL(16,2) DEFAULT 0,net_revenue DECIMAL(16,2) DEFAULT 0,discount_given DECIMAL(12,2) DEFAULT 0,-- 成本total_cost DECIMAL(16,2) DEFAULT 0,cost_of_goods DECIMAL(16,2) DEFAULT 0,-- 利润gross_profit DECIMAL(16,2) DEFAULT 0,net_profit DECIMAL(16,2) DEFAULT 0,-- 利润率gross_margin DECIMAL(6,3) DEFAULT 0,net_margin DECIMAL(6,3) DEFAULT 0,-- 同比/环比mom_growth DECIMAL(8,3) NULL,yoy_growth DECIMAL(8,3) NULL,etl_batch_id VARCHAR(50) NULL,load_time DATETIME DEFAULT GETDATE()
);
GOCREATE NONCLUSTERED INDEX idx_mf_month ON dbo.fact_monthly_finance(year, month);
CREATE NONCLUSTERED INDEX idx_mf_cat ON dbo.fact_monthly_finance(category);
GO-- 产品利润分析表
IF OBJECT_ID('dbo.fact_product_profitability', 'U') IS NOT NULL DROP TABLE dbo.fact_product_profitability;
GOCREATE TABLE dbo.fact_product_profitability (product_key BIGINT NOT NULL PRIMARY KEY,product_id VARCHAR(50) NOT NULL,product_name NVARCHAR(200) NOT NULL,category NVARCHAR(50) NULL,brand NVARCHAR(50) NULL,-- 成本与价格unit_cost DECIMAL(10,2) NULL,unit_price DECIMAL(10,2) NULL,-- 销量与收入total_quantity INT DEFAULT 0,total_revenue DECIMAL(16,2) DEFAULT 0,total_cost DECIMAL(16,2) DEFAULT 0,total_profit DECIMAL(16,2) DEFAULT 0,-- 利润率指标unit_profit DECIMAL(10,2) NULL,profit_margin DECIMAL(6,3) NULL,load_time DATETIME DEFAULT GETDATE()
);
GOCREATE NONCLUSTERED INDEX idx_pp_profit ON dbo.fact_product_profitability(total_profit DESC);
CREATE NONCLUSTERED INDEX idx_pp_margin ON dbo.fact_product_profitability(profit_margin DESC);
GO
Data Mart ETL 流程
销售数据集市 ETL
USE mart_db;
GOIF OBJECT_ID('dbo.sp_load_monthly_sales', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_monthly_sales;
GOCREATE PROCEDURE dbo.sp_load_monthly_sales@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRYINSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name, start_time, status) VALUES (@batch_id, 'mart', 'mart_db', 'fact_monthly_sales', @start_time, 'RUNNING');-- 先清空旧数据(全量重算)TRUNCATE TABLE dbo.fact_monthly_sales;-- 按月、品类、品牌、客户类型、城市维度聚合INSERT INTO dbo.fact_monthly_sales (month_key, year, month, category, brand, customer_type, city, region,order_count, order_quantity,gross_sales, net_sales, discount_amount,total_profit, profit_margin,etl_batch_id, load_time)SELECTd.year * 100 + d.month,d.year, d.month,p.category, p.brand, c.customer_type, c.city, c.region,COUNT(DISTINCT f.order_id),SUM(f.quantity),SUM(f.total_amount + ISNULL(f.discount_amount, 0)),SUM(f.total_amount),SUM(ISNULL(f.discount_amount, 0)),SUM(ISNULL(f.gross_profit, 0)),CASE WHEN SUM(f.total_amount) > 0THEN CAST(ROUND(SUM(ISNULL(f.gross_profit, 0)) / SUM(f.total_amount) * 100, 3) AS DECIMAL(6,3))ELSE 0 END,@batch_id,GETDATE()FROM star_db.dbo.fact_sales fINNER JOIN star_db.dbo.dim_date d ON f.date_key = d.date_keyINNER JOIN star_db.dbo.dim_product p ON f.product_key = p.product_keyINNER JOIN star_db.dbo.dim_customer c ON f.customer_key = c.customer_keyWHERE p.is_current = 1 AND c.is_current = 1GROUP BY d.year, d.month, p.category, p.brand, c.customer_type, c.city, c.region;SET @rows_inserted = @@ROWCOUNT;-- 更新环比/同比指标(用 CTE 计算窗口函数);WITH monthly_agg AS (SELECTmonth_id, month_key, year, month,category, brand, customer_type, city, region,net_sales,LAG(net_sales) OVER (PARTITION BY category, brand, customer_type, city, regionORDER BY year, month) AS prev_m_sales,LAG(net_sales, 12) OVER (PARTITION BY category, brand, customer_type, city, regionORDER BY year, month) AS prev_y_salesFROM dbo.fact_monthly_sales)UPDATE mSET m.prev_month_sales = ISNULL(a.prev_m_sales, 0),m.prev_year_sales = ISNULL(a.prev_y_sales, 0),m.mom_growth = CASE WHEN a.prev_m_sales > 0THEN CAST(ROUND((a.net_sales - a.prev_m_sales) / a.prev_m_sales * 100, 3) AS DECIMAL(8,3))ELSE NULL END,m.yoy_growth = CASE WHEN a.prev_y_sales > 0THEN CAST(ROUND((a.net_sales - a.prev_y_sales) / a.prev_y_sales * 100, 3) AS DECIMAL(8,3))ELSE NULL ENDFROM dbo.fact_monthly_sales mINNER JOIN monthly_agg a ON m.month_id = a.month_id;UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), rows_inserted = @rows_inserted, status = 'SUCCESS'WHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_monthly_sales';PRINT N'fact_monthly_sales 加载完成: ' + CAST(@rows_inserted AS VARCHAR) + N' 行';END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), status = 'FAILED', error_message = @error_msgWHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_monthly_sales';THROW;END CATCH;
END;
GO-- ============================================================
-- 日趋势加载
-- ============================================================IF OBJECT_ID('dbo.sp_load_daily_trend', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_daily_trend;
GOCREATE PROCEDURE dbo.sp_load_daily_trend@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRYINSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name, start_time, status) VALUES (@batch_id, 'mart', 'mart_db', 'fact_daily_sales_trend', @start_time, 'RUNNING');TRUNCATE TABLE dbo.fact_daily_sales_trend;INSERT INTO dbo.fact_daily_sales_trend (date_key, category, brand,order_count, order_quantity, net_sales, total_profit)SELECTf.date_key,p.category,p.brand,COUNT(DISTINCT f.order_id),SUM(f.quantity),SUM(f.total_amount),SUM(ISNULL(f.gross_profit, 0))FROM star_db.dbo.fact_sales fINNER JOIN star_db.dbo.dim_product p ON f.product_key = p.product_keyWHERE p.is_current = 1GROUP BY f.date_key, p.category, p.brand;SET @rows_inserted = @@ROWCOUNT;UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), rows_inserted = @rows_inserted, status = 'SUCCESS'WHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_daily_sales_trend';PRINT N'fact_daily_sales_trend 加载完成: ' + CAST(@rows_inserted AS VARCHAR) + N' 行';END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), status = 'FAILED', error_message = @error_msgWHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_daily_sales_trend';THROW;END CATCH;
END;
GO
客户数据集市 ETL(RFM 分析)
USE mart_db;
GOIF OBJECT_ID('dbo.sp_load_rfm_analysis', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_rfm_analysis;
GOCREATE PROCEDURE dbo.sp_load_rfm_analysis@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @ref_date DATE;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRYINSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name, start_time, status) VALUES (@batch_id, 'mart', 'mart_db', 'fact_customer_rfm', @start_time, 'RUNNING');-- 参考日期:订单最大日期 + 1SELECT @ref_date = DATEADD(day, 1, MAX(d.date_value))FROM star_db.dbo.fact_sales fINNER JOIN star_db.dbo.dim_date d ON f.date_key = d.date_key;-- 清除旧数据(全量重算)TRUNCATE TABLE dbo.fact_customer_rfm;-- 用 CTE 计算基础 RFM 指标;WITH base AS (SELECTc.customer_key,c.customer_id,c.customer_name,c.city,-- R: 最近一次购买距今天数DATEDIFF(day, MAX(d.date_value), @ref_date) AS r_recency,-- F: 购买次数COUNT(DISTINCT f.order_id) AS f_frequency,-- M: 总消费金额SUM(f.total_amount) AS m_monetary,-- 最后购买日期MAX(d.date_value) AS last_order_dateFROM star_db.dbo.dim_customer cLEFT JOIN star_db.dbo.fact_sales f ON c.customer_key = f.customer_keyLEFT JOIN star_db.dbo.dim_date d ON f.date_key = d.date_keyWHERE c.is_current = 1GROUP BY c.customer_key, c.customer_id, c.customer_name, c.city)INSERT INTO dbo.fact_customer_rfm (customer_key, customer_id, customer_name, city,r_recency, r_score,f_frequency, f_score,m_monetary, m_score,rfm_score, rfm_segment, rfm_segment_desc,last_order_date, load_time)SELECTb.customer_key,b.customer_id,b.customer_name,b.city,b.r_recency,-- R 评分:天数越少评分越高CASEWHEN b.r_recency <= 30 THEN '5'WHEN b.r_recency <= 60 THEN '4'WHEN b.r_recency <= 90 THEN '3'WHEN b.r_recency <= 180 THEN '2'ELSE '1'END,b.f_frequency,-- F 评分:次数越多评分越高CASEWHEN b.f_frequency >= 10 THEN '5'WHEN b.f_frequency >= 5 THEN '4'WHEN b.f_frequency >= 3 THEN '3'WHEN b.f_frequency >= 2 THEN '2'ELSE '1'END,ROUND(b.m_monetary, 2),-- M 评分:金额越高评分越高CASEWHEN b.m_monetary >= 100000 THEN '5'WHEN b.m_monetary >= 50000 THEN '4'WHEN b.m_monetary >= 20000 THEN '3'WHEN b.m_monetary >= 5000 THEN '2'ELSE '1'END,-- RFM 综合评分CASEWHEN b.last_order_date IS NULL THEN '000'ELSECASE WHEN b.r_recency <= 30 THEN '5' WHEN b.r_recency <= 60 THEN '4'WHEN b.r_recency <= 90 THEN '3' WHEN b.r_recency <= 180 THEN '2' ELSE '1' END +CASE WHEN b.f_frequency >= 10 THEN '5' WHEN b.f_frequency >= 5 THEN '4'WHEN b.f_frequency >= 3 THEN '3' WHEN b.f_frequency >= 2 THEN '2' ELSE '1' END +CASE WHEN b.m_monetary >= 100000 THEN '5' WHEN b.m_monetary >= 50000 THEN '4'WHEN b.m_monetary >= 20000 THEN '3' WHEN b.m_monetary >= 5000 THEN '2' ELSE '1' ENDEND,-- RFM 分群CASEWHEN b.last_order_date IS NULL THEN N'流失客户'WHEN b.r_recency <= 30 AND b.f_frequency >= 5 AND b.m_monetary >= 50000 THEN N'高价值客户'WHEN b.r_recency <= 30 AND b.f_frequency >= 3 THEN N'活跃客户'WHEN b.r_recency <= 90 AND b.f_frequency >= 1 THEN N'潜力客户'WHEN b.r_recency <= 180 AND b.f_frequency >= 1 THEN N'沉默客户'WHEN b.r_recency > 180 THEN N'流失风险客户'ELSE N'新客户'END,-- 分群描述CASEWHEN b.last_order_date IS NULL THEN N'超过180天无购买,建议流失预警'WHEN b.r_recency <= 30 AND b.f_frequency >= 5 AND b.m_monetary >= 50000THEN N'核心高价值客户,需重点维护 VIP 服务'WHEN b.r_recency <= 30 AND b.f_frequency >= 3THEN N'近期活跃客户,可推送新品和促销'WHEN b.r_recency <= 90 AND b.f_frequency >= 1THEN N'有购买历史但近期减少,需激活营销'WHEN b.r_recency <= 180 AND b.f_frequency >= 1THEN N'长期未购买,存在流失风险,需挽回策略'WHEN b.r_recency > 180THEN N'已判定流失客户,可尝试最后一次唤醒'ELSE N'新注册客户,需引导首次购买'END,b.last_order_date,GETDATE()FROM base b;SET @rows_inserted = @@ROWCOUNT;UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), rows_inserted = @rows_inserted, status = 'SUCCESS'WHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_customer_rfm';PRINT N'fact_customer_rfm 加载完成,参考日期: ' + CAST(@ref_date AS VARCHAR);END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), status = 'FAILED', error_message = @error_msgWHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_customer_rfm';THROW;END CATCH;
END;
GO
财务数据集市 ETL
USE mart_db;
GOIF OBJECT_ID('dbo.sp_load_monthly_finance', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_monthly_finance;
GOCREATE PROCEDURE dbo.sp_load_monthly_finance@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRYINSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name, start_time, status) VALUES (@batch_id, 'mart', 'mart_db', 'fact_monthly_finance', @start_time, 'RUNNING');TRUNCATE TABLE dbo.fact_monthly_finance;INSERT INTO dbo.fact_monthly_finance (month_key, year, month, category, customer_type,gross_revenue, net_revenue, discount_given,total_cost, cost_of_goods,gross_profit, net_profit,gross_margin, net_margin,etl_batch_id, load_time)SELECTd.year * 100 + d.month,d.year, d.month,p.category,c.customer_type,-- 收入SUM(f.total_amount + ISNULL(f.discount_amount, 0)),SUM(f.total_amount),SUM(ISNULL(f.discount_amount, 0)),-- 成本SUM(ISNULL(f.total_cost, 0)),SUM(ISNULL(f.total_cost, 0)),-- 利润SUM(ISNULL(f.gross_profit, 0)),SUM(ISNULL(f.gross_profit, 0)),-- 利润率CASE WHEN SUM(f.total_amount) > 0THEN CAST(ROUND(SUM(ISNULL(f.gross_profit, 0)) / SUM(f.total_amount) * 100, 3) AS DECIMAL(6,3))ELSE 0 END,CASE WHEN SUM(f.total_amount) > 0THEN CAST(ROUND(SUM(ISNULL(f.gross_profit, 0)) / SUM(f.total_amount) * 100, 3) AS DECIMAL(6,3))ELSE 0 END,@batch_id,GETDATE()FROM star_db.dbo.fact_sales fINNER JOIN star_db.dbo.dim_date d ON f.date_key = d.date_keyINNER JOIN star_db.dbo.dim_product p ON f.product_key = p.product_keyINNER JOIN star_db.dbo.dim_customer c ON f.customer_key = c.customer_keyWHERE p.is_current = 1 AND c.is_current = 1GROUP BY d.year, d.month, p.category, c.customer_type;SET @rows_inserted = @@ROWCOUNT;-- 更新环比/同比;WITH finance_agg AS (SELECTfinance_id, year, month, category, customer_type,net_revenue,LAG(net_revenue) OVER (PARTITION BY category, customer_type ORDER BY year, month) AS prev_m_rev,LAG(net_revenue, 12) OVER (PARTITION BY category, customer_type ORDER BY year, month) AS prev_y_revFROM dbo.fact_monthly_finance)UPDATE fSET f.mom_growth = CASE WHEN a.prev_m_rev > 0THEN CAST(ROUND((a.net_revenue - a.prev_m_rev) / a.prev_m_rev * 100, 3) AS DECIMAL(8,3))ELSE NULL END,f.yoy_growth = CASE WHEN a.prev_y_rev > 0THEN CAST(ROUND((a.net_revenue - a.prev_y_rev) / a.prev_y_rev * 100, 3) AS DECIMAL(8,3))ELSE NULL ENDFROM dbo.fact_monthly_finance fINNER JOIN finance_agg a ON f.finance_id = a.finance_id;UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), rows_inserted = @rows_inserted, status = 'SUCCESS'WHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_monthly_finance';PRINT N'fact_monthly_finance 加载完成: ' + CAST(@rows_inserted AS VARCHAR) + N' 行';END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(), status = 'FAILED', error_message = @error_msgWHERE batch_id = @batch_id AND db_name = 'mart_db' AND table_name = 'fact_monthly_finance';THROW;END CATCH;
END;
GO-- ============================================================
-- 产品利润分析加载
-- ============================================================IF OBJECT_ID('dbo.sp_load_product_profitability', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_product_profitability;
GOCREATE PROCEDURE dbo.sp_load_product_profitability@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRYTRUNCATE TABLE dbo.fact_product_profitability;INSERT INTO dbo.fact_product_profitability (product_key, product_id, product_name, category, brand,unit_cost, unit_price,total_quantity, total_revenue, total_cost, total_profit,unit_profit, profit_margin)SELECTp.product_key,p.product_id,p.product_name,p.category,p.brand,p.unit_cost,p.unit_price,SUM(f.quantity),SUM(f.total_amount),SUM(ISNULL(f.total_cost, 0)),SUM(ISNULL(f.gross_profit, 0)),p.unit_price - ISNULL(p.unit_cost, 0),CASE WHEN p.unit_price > 0THEN CAST(ROUND((p.unit_price - ISNULL(p.unit_cost, 0)) / p.unit_price * 100, 3) AS DECIMAL(6,3))ELSE 0 ENDFROM star_db.dbo.fact_sales fINNER JOIN star_db.dbo.dim_product p ON f.product_key = p.product_keyWHERE p.is_current = 1GROUP BY p.product_key, p.product_id, p.product_name, p.category, p.brand,p.unit_cost, p.unit_price;SET @rows_inserted = @@ROWCOUNT;PRINT N'fact_product_profitability 加载完成: ' + CAST(@rows_inserted AS VARCHAR) + N' 行';END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();THROW;END CATCH;
END;
GO
Data Mart 主 ETL 调度
USE mart_db;
GOIF OBJECT_ID('dbo.sp_run_mart_etl', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_run_mart_etl;
GOCREATE PROCEDURE dbo.sp_run_mart_etl@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;PRINT N'=== Data Mart ETL 开始 ===';PRINT N'批次号: ' + @batch_id;-- 销售数据集市EXEC dbo.sp_load_monthly_sales @batch_id;EXEC dbo.sp_load_daily_trend @batch_id;-- 客户数据集市EXEC dbo.sp_load_rfm_analysis @batch_id;-- 财务数据集市EXEC dbo.sp_load_monthly_finance @batch_id;EXEC dbo.sp_load_product_profitability @batch_id;PRINT N'=== Data Mart ETL 完成 ===';
END;
GO
执行 ETL 并验证
-- ============================================================
-- 执行 Data Mart ETL
-- ============================================================DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_MART_' + CONVERT(VARCHAR(8), GETDATE(), 112) + '_' + REPLACE(CONVERT(VARCHAR(8), GETDATE(), 108), ':', '');EXEC mart_db.dbo.sp_run_mart_etl @batch_id;
GO-- ============================================================
-- 验证结果
-- ============================================================USE mart_db;
GO-- 查看 ETL 日志
SELECT batch_id, table_name, start_time, end_time,DATEDIFF(second, start_time, end_time) AS duration_sec,rows_inserted, status
FROM etl_db.dbo.etl_log
WHERE layer_name = 'mart'
ORDER BY start_time;
GO-- 查看各表记录数
SELECT 'fact_monthly_sales' AS tbl, COUNT(*) AS cnt FROM dbo.fact_monthly_sales
UNION ALL SELECT 'fact_daily_sales_trend', COUNT(*) FROM dbo.fact_daily_sales_trend
UNION ALL SELECT 'fact_sales_target', COUNT(*) FROM dbo.fact_sales_target
UNION ALL SELECT 'fact_customer_monthly', COUNT(*) FROM dbo.fact_customer_monthly
UNION ALL SELECT 'fact_customer_rfm', COUNT(*) FROM dbo.fact_customer_rfm
UNION ALL SELECT 'fact_monthly_finance', COUNT(*) FROM dbo.fact_monthly_finance
UNION ALL SELECT 'fact_product_profitability', COUNT(*) FROM dbo.fact_product_profitability;
GO
Data Mart 分析示例
销售数据集市分析
USE mart_db;
GO-- 月度销售报表
SELECTyear, month, category,SUM(order_count) AS order_count,SUM(order_quantity) AS quantity,SUM(net_sales) AS sales,SUM(total_profit) AS profit,ROUND(AVG(profit_margin), 2) AS avg_margin,ROUND(AVG(mom_growth), 2) AS avg_mom_growth,ROUND(AVG(yoy_growth), 2) AS avg_yoy_growth
FROM dbo.fact_monthly_sales
WHERE category IS NOT NULL
GROUP BY year, month, category
ORDER BY year DESC, month DESC, sales DESC;
GO-- 品牌销售排行
SELECTbrand,SUM(order_count) AS order_count,SUM(net_sales) AS sales,SUM(total_profit) AS profit,CAST(ROUND(SUM(total_profit) / NULLIF(SUM(net_sales), 0) * 100, 2) AS DECIMAL(6,2)) AS margin_rate
FROM dbo.fact_monthly_sales
WHERE brand IS NOT NULL
GROUP BY brand
ORDER BY sales DESC;
GO-- 区域销售分析
SELECTregion,SUM(order_count) AS order_count,SUM(net_sales) AS sales,SUM(total_profit) AS profit
FROM dbo.fact_monthly_sales
WHERE region IS NOT NULL
GROUP BY region
ORDER BY sales DESC;
GO
客户数据集市分析
USE mart_db;
GO-- RFM 分群统计
SELECTrfm_segment,COUNT(*) AS customer_count,ROUND(AVG(m_monetary), 2) AS avg_spend,ROUND(AVG(CAST(r_recency AS FLOAT)), 1) AS avg_recency,ROUND(AVG(CAST(f_frequency AS FLOAT)), 1) AS avg_frequency,SUM(m_monetary) AS total_revenue
FROM dbo.fact_customer_rfm
GROUP BY rfm_segment
ORDER BY customer_count DESC;
GO-- 高价值客户名单
SELECT TOP 20customer_id,customer_name,city,rfm_segment,m_monetary AS total_spend,r_recency AS days_since_last_order,f_frequency AS order_count,rfm_segment_desc AS action_suggestion
FROM dbo.fact_customer_rfm
WHERE rfm_segment IN (N'高价值客户', N'活跃客户')
ORDER BY m_monetary DESC;
GO
财务数据集市分析
USE mart_db;
GO-- 月度利润报表
SELECTyear, month,SUM(gross_revenue) AS gross_revenue,SUM(net_revenue) AS net_revenue,SUM(total_cost) AS total_cost,SUM(gross_profit) AS gross_profit,ROUND(AVG(gross_margin), 2) AS avg_margin_pct,ROUND(AVG(yoy_growth), 2) AS yoy_growth_pct
FROM dbo.fact_monthly_finance
GROUP BY year, month
ORDER BY year DESC, month DESC;
GO-- 品类利润贡献
SELECTcategory,SUM(gross_revenue) AS revenue,SUM(gross_profit) AS profit,CAST(ROUND(SUM(gross_profit) / NULLIF(SUM(gross_revenue), 0) * 100, 2) AS DECIMAL(6,2)) AS margin_pct
FROM dbo.fact_monthly_finance
WHERE category IS NOT NULL
GROUP BY category
ORDER BY profit DESC;
GO
完整数据仓库 ETL 调度
-- ============================================================
-- 完整的数据仓库 ETL 调度(跨数据库调用)
-- ============================================================USE etl_db;
GOIF OBJECT_ID('dbo.sp_run_full_etl', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_run_full_etl;
GOCREATE PROCEDURE dbo.sp_run_full_etl
AS
BEGINSET NOCOUNT ON;DECLARE @batch_id VARCHAR(50);SET @batch_id = 'BATCH_FULL_' + CONVERT(VARCHAR(8), GETDATE(), 112) + '_' + REPLACE(CONVERT(VARCHAR(8), GETDATE(), 108), ':', '');PRINT N'========================================';PRINT N'开始完整 ETL 批次: ' + @batch_id;PRINT N'========================================';-- Step 1: PSA 层加载(从 business_db 抽数)PRINT N'--- Step 1: PSA 层加载 ---';EXEC psa_db.dbo.sp_load_customers_full @batch_id;EXEC psa_db.dbo.sp_load_products_full @batch_id;EXEC psa_db.dbo.sp_load_orders_full @batch_id;-- Step 2: Star Schema 层加载PRINT N'--- Step 3: Star Schema 层加载 ---';EXEC star_db.dbo.sp_run_star_etl @batch_id;-- Step 3: Data Mart 层加载PRINT N'--- Step 4: Data Mart 层加载 ---';EXEC mart_db.dbo.sp_run_mart_etl @batch_id;PRINT N'========================================';PRINT N'ETL 批次 ' + @batch_id + N' 全部完成!';PRINT N'========================================';
END;
GO-- 执行完整 ETL
EXEC etl_db.dbo.sp_run_full_etl;
GO
架构总结
┌─────────────────────────────────────────────────────────────────────┐
│ 企业级数据仓库五层架构 │
│ (SQL Server 分库架构) │
│ │
│ ┌─────────────┐ 业务系统 OLTP,数据源头 │
│ │ business_db │ 每次抽取原始数据,不做转换 │
│ └──────┬──────┘ │
│ ↓ │
│ ┌─────────────┐ 持久化历史快照,保留所有历史版本 │
│ │ psa_db │ 可重跑 ETL,满足审计合规,hash 校验 │
│ └──────┬──────┘ │
│ ↓ │
│ ┌─────────────┐ 面向分析的高性能维度模型 │
│ │ star_db │ 事实表(度量)+ 维度表(描述) │
│ │ │ 代理键,SCD Type 2,索引优化 │
│ └──────┬──────┘ │
│ ↓ │
│ ┌─────────────┐ 面向业务部门的预聚合数据 │
│ │ mart_db │ 销售/客户/财务等多个数据集市 │
│ │ │ 聚合表、RFM 分析、即席查询 │
│ └─────────────┘ │
│ │
│ 辅助:etl_db(ETL 日志、工具函数、全局调度) │
│ │
│ 流向: business_db → psa_db → star_db → mart_db │
└─────────────────────────────────────────────────────────────────────┘
---------------------------------------------------------------
来自博客园的aspnetx宋卫东
