数据仓库笔记 第二篇:PSA 层(持久化暂存区)详解
数据仓库笔记 第二篇:PSA 层(持久化暂存区)详解
摘要
PSA(Persistent Staging Area,持久化暂存区)是数据仓库的第一层持久化存储,直接接收从业务系统抽取的数据,并长期保存所有历史版本。
作为数据库的第一层,叫法很多,也有叫stage,也有叫OBS,标准不是非常的统一。在我的系列笔记里都统一使用psa的这个叫法。
此笔记使用的数据库为SQLServer,相应的示例脚本都围绕于此,其它数据库的相应实现会略有不同。
此篇对于PSA层数据的更新,采用的是快照的方式。这种方式适合处理数据量比较小的场景,实际操作中,SCD2是被选择最多的。会在后续版本中描述SCD2方式的实现PSA。
┌──────────────────────────────────────────────────────────────────┐
│ 数据仓库架构 │
│ │
│ ┌──────────────┐ │
│ │ business_db │ ← OLTP 业务系统(演示用源系统) │
│ │ orders │ │
│ │ customers │ │
│ │ products │ │
│ └──────────────┘ │
│ ↓ Extract(直接抽取) │
│ ┌──────────────┐ │
│ │ psa_db │ ← PSA 层:持久化暂存,保留全部历史 │
│ │ orders │ │
│ │ customers │ │
│ │ products │ │
│ └──────────────┘ │
│ ↓ Transform │
│ ┌──────────────┐ │
│ │ star_db │ ← 维度模型 层(下一篇) │
│ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘
为什么需要 PSA 层?
| 原因 | 说明 |
|---|---|
| 历史追溯 | 可以回溯到任意历史时间点的数据状态 |
| ETL 重跑 | 下游出错时,可从 PSA 重新加载,无需重新抽取源系统 |
| 审计合规 | 满足数据审计要求,证明数据未被篡改 |
| 源系统数据丢失 | 业务系统可能只保留近期数据,PSA 作为长期备份 |
PSA vs 传统 Staging
| 特性 | 传统 Staging(临时) | PSA(持久化) |
|---|---|---|
| 数据保留 | 仅当批,处理完删除 | 长期保留所有历史 |
| 写入方式 | 覆盖(TRUNCATE + INSERT) | 追加(INSERT) |
| 历史数据 | 不保留 | 完整保留 |
| 主要用途 | ETL 中转 | 数据备份 + 重跑 |
业务系统模拟(OLTP)
在创建PSA层之前,我们先搭建OLTP数据库,以便跟PSA层做对接。
创建业务数据库表
在 business_db 中创建三个核心业务表:订单、客户、商品。
-- ============================================================
-- 在 business_db 中创建 OLTP 业务表
-- ============================================================USE business_db;
GO-- 订单表
IF OBJECT_ID('dbo.orders', 'U') IS NOT NULL DROP TABLE dbo.orders;
GOCREATE TABLE dbo.orders (order_id VARCHAR(50) NOT NULL PRIMARY KEY,customer_id VARCHAR(50) NOT NULL,product_id VARCHAR(50) NOT NULL,order_date DATE NOT NULL,quantity INT NOT NULL,unit_price DECIMAL(10,2) NOT NULL,total_amount DECIMAL(12,2) NOT NULL,status VARCHAR(20) NOT NULL, -- pending/confirmed/shipped/cancelledcreated_at DATETIME DEFAULT GETDATE(),updated_at DATETIME DEFAULT GETDATE()
);
GO-- 客户表
IF OBJECT_ID('dbo.customers', 'U') IS NOT NULL DROP TABLE dbo.customers;
GOCREATE TABLE dbo.customers (customer_id VARCHAR(50) NOT NULL PRIMARY KEY,customer_name NVARCHAR(100) NOT NULL,email VARCHAR(100) NULL,phone VARCHAR(20) NULL,address NVARCHAR(200) NULL,city NVARCHAR(50) NULL,region NVARCHAR(50) NULL,register_date DATE NOT NULL,customer_type VARCHAR(20) NOT NULL, -- individual/enterpriseis_active BIT DEFAULT 1,created_at DATETIME DEFAULT GETDATE(),updated_at DATETIME DEFAULT GETDATE()
);
GO-- 商品表
IF OBJECT_ID('dbo.products', 'U') IS NOT NULL DROP TABLE dbo.products;
GOCREATE TABLE dbo.products (product_id VARCHAR(50) NOT NULL PRIMARY KEY,product_name NVARCHAR(200) NOT NULL,category NVARCHAR(50) NOT NULL,sub_category NVARCHAR(50) NULL,brand NVARCHAR(50) NULL,unit_cost DECIMAL(10,2) NULL,unit_price DECIMAL(10,2) NOT NULL,supplier_id VARCHAR(50) NULL,is_active BIT DEFAULT 1,created_at DATETIME DEFAULT GETDATE(),updated_at DATETIME DEFAULT GETDATE()
);
GO-- 创建索引(优化业务查询)
CREATE NONCLUSTERED INDEX idx_orders_customer ON dbo.orders(customer_id);
CREATE NONCLUSTERED INDEX idx_orders_product ON dbo.orders(product_id);
CREATE NONCLUSTERED INDEX idx_orders_date ON dbo.orders(order_date);
CREATE NONCLUSTERED INDEX idx_customers_type ON dbo.customers(customer_type);
CREATE NONCLUSTERED INDEX idx_products_category ON dbo.products(category);
GO
插入模拟业务数据
-- ============================================================
-- 插入模拟业务数据
-- ============================================================USE business_db;
GO-- 客户数据
INSERT INTO dbo.customers (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type, is_active)
VALUES
(N'C001', N'张三', 'zhangsan@email.com', '13800138001', N'北京市朝阳区建国路88号', N'北京', N'华北', '2023-01-15', 'individual', 1),
(N'C002', N'李四科技有限公司', 'lisi@company.com', '13800138002', N'上海市浦东新区张江路100号', N'上海', N'华东', '2023-03-20', 'enterprise', 1),
(N'C003', N'王五', 'wangwu@email.com', '13800138003', N'广州市天河区体育西路103号', N'广州', N'华南', '2023-06-10', 'individual', 1),
(N'C004', N'赵六集团', 'zhaoliu@group.com', '13800138004', N'深圳市南山区科技园南路10号', N'深圳', N'华南', '2023-08-05', 'enterprise', 1),
(N'C005', N'钱七', 'qianqi@email.com', '13800138005', N'杭州市西湖区文三路478号', N'杭州', N'华东', '2024-01-20', 'individual', 0);
GO-- 商品数据
INSERT INTO dbo.products (product_id, product_name, category, sub_category, brand, unit_cost, unit_price, supplier_id, is_active)
VALUES
('P001', N'iPhone 15 Pro', N'电子产品', N'手机', N'Apple', 6500.00, 8999.00, 'S001', 1),
('P002', N'MacBook Pro 16', N'电子产品', N'笔记本', N'Apple', 15000.00, 19999.00, 'S001', 1),
('P003', N'AirPods Pro 2', N'电子产品', N'耳机', N'Apple', 1200.00, 1899.00, 'S001', 1),
('P004', N'华为Mate 60 Pro', N'电子产品', N'手机', N'华为', 4500.00, 6999.00, 'S002', 1),
('P005', N'戴森吸尘器V15', N'家用电器', N'吸尘器', N'Dyson', 2800.00, 4999.00, 'S003', 1),
('P006', N'索尼WH-1000XM5', N'电子产品', N'耳机', N'Sony', 1800.00, 2999.00, 'S004', 1),
('P007', N'小米空气净化器', N'家用电器', N'空气净化器', N'小米', 600.00, 1299.00, 'S005', 1),
('P008', N'iPad Air 5', N'电子产品', N'平板', N'Apple', 3200.00, 4799.00, 'S001', 1);
GO-- 订单数据
INSERT INTO dbo.orders (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O001', 'C001', 'P001', '2024-01-15', 1, 8999.00, 8999.00, 'shipped'),
('O002', 'C002', 'P002', '2024-01-16', 5, 19999.00, 99995.00, 'confirmed'),
('O003', 'C001', 'P003', '2024-01-17', 2, 1899.00, 3798.00, 'pending'),
('O004', 'C003', 'P004', '2024-01-18', 1, 6999.00, 6999.00, 'shipped'),
('O005', 'C004', 'P005', '2024-01-19', 3, 4999.00, 14997.00, 'confirmed'),
('O006', 'C001', 'P002', '2024-01-20', 1, 19999.00, 19999.00, 'pending'),
('O007', 'C005', 'P006', '2024-01-21', 1, 2999.00, 2999.00, 'cancelled'),
('O008', 'C002', 'P001', '2024-01-22', 10, 8999.00, 89990.00, 'confirmed'),
('O009', 'C003', 'P007', '2024-01-23', 2, 1299.00, 2598.00, 'shipped'),
('O010', 'C004', 'P008', '2024-01-24', 4, 4799.00, 19196.00, 'pending');
GO-- 验证数据
SELECT 'orders' AS table_name, COUNT(*) AS row_count FROM dbo.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM dbo.customers
UNION ALL
SELECT 'products', COUNT(*) FROM dbo.products;
GO
PSA 层表设计
PSA 表结构特点
PSA 表在业务表基础上增加以下字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
psa_record_seq |
BIGINT IDENTITY(1,1) |
PSA 记录序号(自增主键) |
psa_load_time |
DATETIME |
数据加载时间 |
psa_batch_id |
VARCHAR(50) |
ETL 批次号 |
psa_source_table |
VARCHAR(100) |
源表名(用于追溯) |
psa_checksum |
VARCHAR(32) |
数据校验和(检测变化) |
psa_operation |
CHAR(1) |
操作类型:I=新增, U=更新, D=删除 |
创建 PSA 层表
-- ============================================================
-- 在 psa_db 中创建 PSA 层表
-- 每张表都保留完整历史版本
-- ============================================================USE psa_db;
GO-- PSA 订单表
IF OBJECT_ID('dbo.orders', 'U') IS NOT NULL DROP TABLE dbo.orders;
GOCREATE TABLE dbo.orders (-- PSA 系统字段(放在最前面)psa_record_seq BIGINT IDENTITY(1,1) PRIMARY KEY,psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),psa_batch_id VARCHAR(50) NOT NULL,psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.orders',psa_checksum VARCHAR(32) NULL,psa_operation CHAR(1) NOT NULL DEFAULT 'I', -- I/U/D-- 业务字段(与源表一致)order_id VARCHAR(50) NOT NULL,customer_id VARCHAR(50) NOT NULL,product_id VARCHAR(50) NOT NULL,order_date DATE NOT NULL,quantity INT NOT NULL,unit_price DECIMAL(10,2) NOT NULL,total_amount DECIMAL(12,2) NOT NULL,status VARCHAR(20) NOT NULL,created_at DATETIME NULL,updated_at DATETIME NULL
);
GO-- 创建索引
CREATE NONCLUSTERED INDEX idx_psa_orders_batch ON dbo.orders(psa_batch_id);
CREATE NONCLUSTERED INDEX idx_psa_orders_load_time ON dbo.orders(psa_load_time);
CREATE NONCLUSTERED INDEX idx_psa_orders_business_key ON dbo.orders(order_id);
GO-- PSA 客户表
IF OBJECT_ID('dbo.customers', 'U') IS NOT NULL DROP TABLE dbo.customers;
GOCREATE TABLE dbo.customers (psa_record_seq BIGINT IDENTITY(1,1) PRIMARY KEY,psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),psa_batch_id VARCHAR(50) NOT NULL,psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.customers',psa_checksum VARCHAR(32) NULL,psa_operation CHAR(1) NOT NULL DEFAULT 'I',customer_id VARCHAR(50) NOT NULL,customer_name NVARCHAR(100) NOT NULL,email VARCHAR(100) NULL,phone VARCHAR(20) NULL,address NVARCHAR(200) NULL,city NVARCHAR(50) NULL,region NVARCHAR(50) NULL,register_date DATE NOT NULL,customer_type VARCHAR(20) NOT NULL,is_active BIT NULL,created_at DATETIME NULL,updated_at DATETIME NULL
);
GOCREATE NONCLUSTERED INDEX idx_psa_customers_batch ON dbo.customers(psa_batch_id);
CREATE NONCLUSTERED INDEX idx_psa_customers_load_time ON dbo.customers(psa_load_time);
CREATE NONCLUSTERED INDEX idx_psa_customers_business_key ON dbo.customers(customer_id);
GO-- PSA 商品表
IF OBJECT_ID('dbo.products', 'U') IS NOT NULL DROP TABLE dbo.products;
GOCREATE TABLE dbo.products (psa_record_seq BIGINT IDENTITY(1,1) PRIMARY KEY,psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),psa_batch_id VARCHAR(50) NOT NULL,psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.products',psa_checksum VARCHAR(32) NULL,psa_operation CHAR(1) NOT NULL DEFAULT 'I',product_id VARCHAR(50) NOT NULL,product_name NVARCHAR(200) NOT NULL,category NVARCHAR(50) NOT NULL,sub_category NVARCHAR(50) NULL,brand NVARCHAR(50) NULL,unit_cost DECIMAL(10,2) NULL,unit_price DECIMAL(10,2) NOT NULL,supplier_id VARCHAR(50) NULL,is_active BIT NULL,created_at DATETIME NULL,updated_at DATETIME NULL
);
GOCREATE NONCLUSTERED INDEX idx_psa_products_batch ON dbo.products(psa_batch_id);
CREATE NONCLUSTERED INDEX idx_psa_products_load_time ON dbo.products(psa_load_time);
CREATE NONCLUSTERED INDEX idx_psa_products_business_key ON dbo.products(product_id);
GO
ETL 流程:从业务库到 PSA
全量加载策略
PSA 层采用追加写入,每次 ETL 将业务表全量数据写入 PSA,并打上批次标识。
-- ============================================================
-- 全量加载存储过程:订单表
-- ============================================================USE psa_db;
GOIF OBJECT_ID('dbo.sp_load_orders_full', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_orders_full;
GOCREATE PROCEDURE dbo.sp_load_orders_full@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRY-- 记录 ETL 开始INSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name,start_time, status)VALUES (@batch_id, 'psa', 'psa_db', 'orders',@start_time, 'RUNNING');-- 全量插入(带 Hash 校验,修复版)INSERT INTO dbo.orders (psa_batch_id,psa_source_table,psa_checksum,psa_operation,order_id, customer_id, product_id, order_date,quantity, unit_price, total_amount, status,created_at, updated_at)SELECT@batch_id,'business_db.dbo.orders',-- 修复:MD5 计算安全写法,无 NULL、无截断、无类型错误CONVERT(VARCHAR(32), HASHBYTES('MD5',CONCAT(ISNULL(CAST(order_id AS NVARCHAR(MAX)), N''), N'|',ISNULL(CAST(customer_id AS NVARCHAR(MAX)), N''), N'|',ISNULL(CAST(product_id AS NVARCHAR(MAX)), N''), N'|',ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''), N'|',ISNULL(CAST(quantity AS NVARCHAR(20)), N''), N'|',ISNULL(CAST(unit_price AS NVARCHAR(20)), N''), N'|',ISNULL(CAST(total_amount AS NVARCHAR(20)), N''), N'|',ISNULL(status, N''))), 2) AS psa_checksum,'I',order_id, customer_id, product_id, order_date,quantity, unit_price, total_amount, status,created_at, updated_atFROM business_db.dbo.orders;SET @rows_inserted = @@ROWCOUNT;-- 更新 ETL 日志UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(),rows_inserted = @rows_inserted,status = 'SUCCESS'WHERE batch_id = @batch_idAND table_name = 'orders';PRINT N'✅ 订单表加载完成,插入 ' + CAST(@rows_inserted AS VARCHAR) + N' 行';END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();-- 记录错误UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(),status = 'FAILED',error_message = @error_msgWHERE batch_id = @batch_idAND table_name = 'orders';THROW;END CATCH;
END;
GO
客户表加载存储过程
-- ============================================================
-- 全量加载存储过程:客户表
-- ============================================================USE psa_db;
GOIF OBJECT_ID('dbo.sp_load_customers_full', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_customers_full;
GOCREATE OR ALTER PROCEDURE dbo.sp_load_customers_full@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRY-- 日志开始INSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name,start_time, status)VALUES (@batch_id, 'psa', 'psa_db', 'customers',@start_time, 'RUNNING');-- 插入客户数据(修复版)INSERT INTO dbo.customers (psa_batch_id,psa_source_table,psa_checksum,psa_operation,customer_id, customer_name, email, phone,address, city, region, register_date,customer_type, is_active, created_at, updated_at)SELECT@batch_id,'business_db.dbo.customers',-- 修复:统一转字符串 + 强制非NULL + 标准MD5格式CONVERT(VARCHAR(32), HASHBYTES('MD5', CONCAT(ISNULL(CAST(customer_id AS NVARCHAR(MAX)), N''), N'|',ISNULL(customer_name, N''), N'|',ISNULL(email, N''), N'|',ISNULL(phone, N''), N'|',ISNULL(address, N''), N'|',ISNULL(city, N''), N'|',ISNULL(region, N''), N'|',ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''), N'|',ISNULL(customer_type, N''), N'|',CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END)), 2) AS psa_checksum,'I',customer_id, customer_name, email, phone,address, city, region, register_date,customer_type, is_active, created_at, updated_atFROM business_db.dbo.customers;-- 记录行数SET @rows_inserted = @@ROWCOUNT;-- 更新日志成功UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(),rows_inserted = @rows_inserted,status = 'SUCCESS'WHERE batch_id = @batch_idAND table_name = 'customers';PRINT N'✅ 客户数据加载完成,插入行数:' + CAST(@rows_inserted AS VARCHAR(20));END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();-- 更新日志失败UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(),status = 'FAILED',error_message = @error_msgWHERE batch_id = @batch_idAND table_name = 'customers';-- 抛出真实错误信息THROW;END CATCH;
END;
GO
商品表加载存储过程
-- ============================================================
-- 全量加载存储过程:商品表
-- ============================================================USE psa_db;
GOIF OBJECT_ID('dbo.sp_load_products_full', 'P') IS NOT NULLDROP PROCEDURE dbo.sp_load_products_full;
GOCREATE PROCEDURE dbo.sp_load_products_full@batch_id VARCHAR(50)
AS
BEGINSET NOCOUNT ON;DECLARE @start_time DATETIME = GETDATE();DECLARE @rows_inserted BIGINT = 0;DECLARE @error_msg NVARCHAR(MAX);BEGIN TRY-- 记录 ETL 开始日志INSERT INTO etl_db.dbo.etl_log (batch_id, layer_name, db_name, table_name,start_time, status)VALUES (@batch_id, 'psa', 'psa_db', 'products',@start_time, 'RUNNING');-- 全量插入产品数据(健壮版 MD5 校验)INSERT INTO dbo.products (psa_batch_id,psa_source_table,psa_checksum,psa_operation,product_id, product_name, category, sub_category,brand, unit_cost, unit_price, supplier_id,is_active, created_at, updated_at)SELECT@batch_id,'business_db.dbo.products',-- 修复:统一 MD5 写法,全字段 ISNULL,无报错CONVERT(VARCHAR(32), HASHBYTES('MD5',CONCAT(ISNULL(CAST(product_id AS NVARCHAR(MAX)), N''), N'|',ISNULL(product_name, N''), N'|',ISNULL(category, N''), N'|',ISNULL(sub_category, N''), N'|',ISNULL(brand, N''), N'|',ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''), N'|',ISNULL(CAST(unit_price AS NVARCHAR(20)), N''), N'|',ISNULL(CAST(supplier_id AS NVARCHAR(MAX)), N''), N'|',CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END)), 2) AS psa_checksum,'I',product_id, product_name, category, sub_category,brand, unit_cost, unit_price, supplier_id,is_active, created_at, updated_atFROM business_db.dbo.products;SET @rows_inserted = @@ROWCOUNT;-- 更新日志:成功UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(),rows_inserted = @rows_inserted,status = 'SUCCESS'WHERE batch_id = @batch_idAND table_name = 'products';PRINT N'✅ 产品表加载完成,插入 ' + CAST(@rows_inserted AS VARCHAR) + N' 行';END TRYBEGIN CATCHSET @error_msg = ERROR_MESSAGE();-- 更新日志:失败UPDATE etl_db.dbo.etl_logSET end_time = GETDATE(),status = 'FAILED',error_message = @error_msgWHERE batch_id = @batch_idAND table_name = 'products';THROW;END CATCH;
END;
GO
执行 ETL 并验证
生成批次号并执行
-- ============================================================
-- 生成批次号并执行全量加载
-- ============================================================DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_' + CONVERT(VARCHAR(8), GETDATE(), 112) + '_' + REPLACE(CONVERT(VARCHAR(8), GETDATE(), 108), ':', '');
-- 例如:BATCH_20240424_153045PRINT N'批次号: ' + @batch_id;
GO-- 执行三张表的加载
DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_20240424_153045';EXEC psa_db.dbo.sp_load_orders_full @batch_id;
EXEC psa_db.dbo.sp_load_customers_full @batch_id;
EXEC psa_db.dbo.sp_load_products_full @batch_id;
GO
验证加载结果
-- ============================================================
-- 验证 PSA 层数据
-- ============================================================USE psa_db;
GO-- 查看 ETL 日志
SELECT batch_id,layer_name,table_name,start_time,end_time,DATEDIFF(second, start_time, end_time) AS duration_seconds,rows_inserted,status
FROM etl_db.dbo.etl_log
WHERE batch_id = 'BATCH_20240424_153045'
ORDER BY table_name;
GO-- 查看 PSA 表记录数
SELECT 'orders' AS table_name, COUNT(*) AS psa_records FROM dbo.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM dbo.customers
UNION ALL
SELECT 'products', COUNT(*) FROM dbo.products;
GO-- 查看订单数据(包含 PSA 元数据)
SELECT TOP 10psa_record_seq,psa_batch_id,psa_load_time,psa_checksum,order_id,customer_id,product_id,order_date,total_amount,status
FROM dbo.orders
ORDER BY psa_record_seq;
GO
PSA 层历史版本管理
模拟数据变更
-- ============================================================
-- 模拟业务数据变更(第二天 ETL)
-- ============================================================USE business_db;
GO-- 订单状态更新
UPDATE dbo.orders
SET status = 'shipped',updated_at = GETDATE()
WHERE order_id IN ('O003', 'O006');-- 新增订单
INSERT INTO dbo.orders (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O011', 'C001', 'P004', '2024-01-25', 2, 6999.00, 13998.00, 'pending');-- 客户信息更新
UPDATE dbo.customers
SET address = N'北京市朝阳区望京SOHO',updated_at = GETDATE()
WHERE customer_id = 'C001';
GO-- 验证变更
SELECT order_id, status, updated_at FROM dbo.orders WHERE order_id IN ('O003', 'O006', 'O011');
SELECT customer_id, address FROM dbo.customers WHERE customer_id = 'C001';
GO
第二次 ETL 加载
-- ============================================================
-- 第二次全量加载(带变更数据)
-- ============================================================-- 新批次号
DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_' + CONVERT(VARCHAR(8), GETDATE(), 112) + '_' + REPLACE(CONVERT(VARCHAR(8), GETDATE(), 108), ':', '');PRINT N'新批次号: ' + @batch_id;-- 执行加载
EXEC psa_db.dbo.sp_load_orders_full @batch_id;
EXEC psa_db.dbo.sp_load_customers_full @batch_id;
EXEC psa_db.dbo.sp_load_products_full @batch_id;
GO
验证历史版本
-- ============================================================
-- 验证 PSA 层保留了历史版本
-- ============================================================USE psa_db;
GO-- 订单表:查看同一订单的多条记录
SELECT psa_batch_id,psa_load_time,order_id,status,updated_at
FROM dbo.orders
WHERE order_id = 'O003'
ORDER BY psa_load_time;
GO-- 客户表:查看地址变更历史
SELECT psa_batch_id,psa_load_time,customer_id,address
FROM dbo.customers
WHERE customer_id = 'C001'
ORDER BY psa_load_time;
GO-- 统计每批次加载数量
SELECT psa_batch_id,COUNT(*) AS record_count
FROM dbo.orders
GROUP BY psa_batch_id
ORDER BY psa_batch_id;
GO-- 查看所有 ETL 日志
SELECT batch_id,table_name,start_time,end_time,rows_inserted,status
FROM etl_db.dbo.etl_log
WHERE layer_name = 'psa'
ORDER BY start_time;
GO
PSA 层数据查询示例
查询最新数据
-- ============================================================
-- 查询最新批次数据(等效于当前状态)
-- ============================================================USE psa_db;
GO-- 方法1:取最新批次
WITH latest_batch AS (SELECT MAX(psa_batch_id) AS batch_id FROM dbo.orders
)
SELECT o.order_id,o.customer_id,o.order_date,o.total_amount,o.status,o.psa_load_time
FROM dbo.orders o
CROSS JOIN latest_batch lb
WHERE o.psa_batch_id = lb.batch_id
ORDER BY o.order_date;
GO-- 方法2:使用 ROW_NUMBER 取每个业务键的最新记录
WITH latest_orders AS (SELECT *,ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY psa_load_time DESC) AS rnFROM dbo.orders
)
SELECT order_id,customer_id,order_date,total_amount,status,psa_load_time AS latest_load_time
FROM latest_orders
WHERE rn = 1
ORDER BY order_date;
GO
查询历史时点数据
-- ============================================================
-- 查询某个历史时点的数据快照
-- ============================================================USE psa_db;
GO-- 查询 2024-04-24 16:00 之前的订单状态
DECLARE @as_of_time DATETIME = '2024-04-24 16:00:00';WITH orders_as_of AS (SELECT *,ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY psa_load_time DESC) AS rnFROM dbo.ordersWHERE psa_load_time <= @as_of_time
)
SELECT order_id,customer_id,order_date,total_amount,status,psa_load_time
FROM orders_as_of
WHERE rn = 1
ORDER BY order_id;
GO
小结
| 概念 | 要点 |
|---|---|
| PSA 层定位 | 数据仓库第一层持久化存储,直接接收业务系统数据 |
| 写入方式 | 追加(INSERT),不删除历史,保留所有版本 |
| 核心字段 | psa_record_seq、psa_batch_id、psa_load_time、psa_checksum |
| Hash 校验 | 用于后续增量检测和变化识别 |
| 历史查询 | 通过批次或时间过滤,可还原任意时点数据 |
| ETL 策略 | 每次全量抽取,追加写入,简单可靠 |
---------------------------------------------------------------
来自博客园的aspnetx宋卫东
