Catatan gudang data Bagian 6: Metode pemrosesan lapisan PSA SCD2 – Copenhagen Husky (aspnetx)
**SCD2 (Perlahan-lahan Mengubah Dimensi Tipe 2)** adalah standar industri dan solusi yang paling umum digunakan untuk menangani perubahan historis dalam pemodelan dimensi. Dibandingkan dengan metode snapshot, SCD2 hanya menulis baris data yang benar-benar berubah, secara akurat mencatat siklus hidup setiap data melalui bidang “masa validitas”, dan mencapai ketertelusuran historis lengkap dengan biaya penyimpanan yang sangat rendah. Artikel ini menjelaskan alur kerja dengan kode mendetail. Basis data yang digunakan dalam catatan ini adalah SQL Server, dan contoh skrip terkait didasarkan pada basis data ini. Implementasinya terkait dengan database lain akan sedikit berbeda.
ringkasan
SCD2 (Perlahan-lahan Mengubah Dimensi Tipe 2) Ini adalah standar industri dan solusi yang paling umum digunakan untuk menangani perubahan historis dalam pemodelan dimensi. Dibandingkan dengan metode snapshot, SCD2 hanya menulis baris data yang benar-benar berubah. 有效期 Fields secara akurat mencatat siklus hidup setiap bagian data, mencapai ketertelusuran historis penuh dengan biaya penyimpanan yang sangat rendah. Artikel ini menjelaskan alur kerja dengan kode mendetail.
Basis data yang digunakan dalam catatan ini adalah SQL Server, dan contoh skrip terkait didasarkan pada basis data ini. Implementasinya terkait dengan database lain akan sedikit berbeda.
Dalam artikel “Penjelasan Detail Lapisan PSA (Persistence Temporary Area)”, lapisan PSA menggunakanCuplikan mode penuhMemuat data – ETL menulis data lengkap dari tabel sumber ke PSA setiap saat, dan semua catatan ditandai secara seragam psa_operation='I'. Keuntungan metode ini adalah mudah diterapkan dan cocok untuk skenario di mana jumlah datanya sedikit dan perubahannya jarang terjadi.
Namun dalam lingkungan produksi nyata, keterbatasan metode snapshot jelas:
| pertanyaan | ilustrasi |
|---|---|
| Tidak dapat melacak riwayat perubahan | Tidak mungkin mengetahui apakah nilai suatu data tertentu berubah dari nilai apa ke nilai apa pada hari itu. |
| Penyimpanan limbah berat | Penulisan penuh setiap saat, volume data berkembang pesat |
Artikel ini akan mengambil tabel pelanggan, tabel pesanan, dan tabel produk dari lapisan PSA sebagai contoh untuk mendemonstrasikan sepenuhnya cara membangun kembali lapisan PSA menggunakan SCD2.
konsep inti SCD2
Tiga Aturan Inti Tipe 2
SCD2 menambahkan tiga bidang utama ke setiap catatan sejarah:
| bidang | jenis | ilustrasi |
|---|---|---|
start_date |
TANGGAL WAKTU | Waktu mulai efektif untuk data versi ini |
end_date |
TANGGAL WAKTU | Waktu habis masa berlaku versi data ini (9999-12-31 berarti valid saat ini) |
is_current |
karakter(1) | Y = versi saat ini, N = versi historis |
Tiga aturan perubahan:
┌─────────────────────────────────────────────────────────────┐
│ SCD2 变更判定逻辑 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 新增(源表有,PSA无) │
│ → 插入新行,start_date=当前时间,end_date=9999-12-31, │
│ is_current="Y" │
│ │
│ 属性变化(源表有,PSA有,但Checksum不同) │
│ → 旧行标记过期:end_date=当前时间-1秒,is_current="N" │
│ → 插入新行:start_date=当前时间,end_date=9999-12-31, │
│ is_current="Y" │
│ │
│ 删除(源表无,PSA有) │
│ → 旧行标记过期:end_date=当前时间,is_current="N" │
│ → 不插入新行 │
│ │
│ 无变化(源表有,PSA有,Checksum相同) │
│ → 不做任何操作 │
└─────────────────────────────────────────────────────────────┘
Cara mendeteksi perubahan
Agar gudang data dapat melacak riwayat perubahan data, penting untuk mengidentifikasi perubahan pada data sistem sumber. Namun tabel sistem sumber biasanya memiliki banyak kolom. Bagaimana cara kerja deteksi perubahan? Apakah Anda membandingkan kolom demi kolom?
Metode yang paling umum digunakan adalah verifikasi data Checksum, yang merupakan inti dari deteksi perubahan SCD2. Setiap kali ETL dilakukan, nilai MD5 dari seluruh bidang bisnis di setiap baris tabel sumber dihitung:
-- 示例:张三的Checksum计算
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
'张三', -- customer_name
'|', 'zhangsan@email.com', -- email
'|', '13800138001', -- phone
'|', '北京市朝阳区建国路88号', -- address
...
)
), 2)
-- 结果:0xE8B2A5C3D9F1... ← 32位MD5值
Artinya, field yang akan dideteksi akan dimasukkan ke dalam field tersebut dan kemudian diubah menjadi nilai MD5. Dengan cara ini, selama terjadi perubahan bidang usaha maka nilai MD5 akan berbeda-beda, sehingga dapat menyebabkan terjadinya proses perubahan SCD2, sehingga dapat mengidentifikasi perubahan pada data sistem sumber dengan lebih efisien.
Desain struktur tabel PSA-SCD2
Seperti ketiga tabel PSA yang dibuat pada artikel kedua, disini kita membuat tabel dengan akhiran scd2 untuk membedakannya dengan tabel pada artikel kedua.
Tabel pelanggan dari struktur SCD2
-- ============================================================
-- PSA-SCD2 客户表
-- 相比快照方式:增加 start_date / end_date / is_current
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.customers_scd2', 'U') IS NOT NULL
DROP TABLE dbo.customers_scd2;
GO
CREATE TABLE dbo.customers_scd2 (
-- PSA 系统字段
psa_record_seq BIGINT IDENTITY(1,1) NOT NULL,
psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),
psa_batch_id VARCHAR(50) NOT NULL,
psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.customers',
psa_checksum VARCHAR(32) NULL,
-- SCD2 有效期字段
start_date DATETIME NOT NULL,
end_date DATETIME NOT NULL,
is_current CHAR(1) NOT NULL DEFAULT 'Y',
-- 业务字段
customer_id VARCHAR(50) NOT NULL,
customer_name NVARCHAR(100) NOT NULL,
email VARCHAR(100) NULL,
phone VARCHAR(20) NULL,
address NVARCHAR(200) NULL,
city NVARCHAR(50) NULL,
region NVARCHAR(50) NULL,
register_date DATE NOT NULL,
customer_type VARCHAR(20) NOT NULL,
is_active BIT NULL,
created_at DATETIME NULL,
updated_at DATETIME NULL
);
GO
CREATE NONCLUSTERED INDEX idx_customers_scd2_bk
ON dbo.customers_scd2(customer_id, is_current);
CREATE NONCLUSTERED INDEX idx_customers_scd2_period
ON dbo.customers_scd2(start_date, end_date);
CREATE NONCLUSTERED INDEX idx_customers_scd2_batch
ON dbo.customers_scd2(psa_batch_id);
GO
Deskripsi desain: Tabel SCD2 tidak memiliki batasan kunci utama (karena akan ada beberapa versi historis dari kunci bisnis yang sama). menggunakan
(customer_id, is_current="Y")Cari versi saat ini.
Tabel pemesanan struktur SCD2
-- ============================================================
-- PSA-SCD2 订单表
-- 订单的主要变更是状态流转(pending→confirmed→shipped→cancelled)
-- SCD2能够完整记录订单的全生命周期
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.orders_scd2', 'U') IS NOT NULL
DROP TABLE dbo.orders_scd2;
GO
CREATE TABLE dbo.orders_scd2 (
-- PSA 系统字段
psa_record_seq BIGINT IDENTITY(1,1) NOT NULL,
psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),
psa_batch_id VARCHAR(50) NOT NULL,
psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.orders',
psa_checksum VARCHAR(32) NULL,
-- SCD2 有效期字段
start_date DATETIME NOT NULL,
end_date DATETIME NOT NULL,
is_current CHAR(1) NOT NULL DEFAULT 'Y',
-- 业务字段
order_id VARCHAR(50) NOT NULL,
customer_id VARCHAR(50) NOT NULL,
product_id VARCHAR(50) NOT NULL,
order_date DATE NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_amount DECIMAL(12,2) NOT NULL,
status VARCHAR(20) NOT NULL, -- pending/confirmed/shipped/cancelled
created_at DATETIME NULL,
updated_at DATETIME NULL
);
GO
CREATE NONCLUSTERED INDEX idx_orders_scd2_bk
ON dbo.orders_scd2(order_id, is_current);
CREATE NONCLUSTERED INDEX idx_orders_scd2_period
ON dbo.orders_scd2(start_date, end_date);
CREATE NONCLUSTERED INDEX idx_orders_scd2_batch
ON dbo.orders_scd2(psa_batch_id);
CREATE NONCLUSTERED INDEX idx_orders_scd2_customer
ON dbo.orders_scd2(customer_id, is_current);
GO
Tabel produk struktural SCD2
-- ============================================================
-- PSA-SCD2 商品表
-- 商品表变更多为价格调整、上下架、品类调整
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.products_scd2', 'U') IS NOT NULL
DROP TABLE dbo.products_scd2;
GO
CREATE TABLE dbo.products_scd2 (
-- PSA 系统字段
psa_record_seq BIGINT IDENTITY(1,1) NOT NULL,
psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),
psa_batch_id VARCHAR(50) NOT NULL,
psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.products',
psa_checksum VARCHAR(32) NULL,
-- SCD2 有效期字段
start_date DATETIME NOT NULL,
end_date DATETIME NOT NULL,
is_current CHAR(1) NOT NULL DEFAULT 'Y',
-- 业务字段
product_id VARCHAR(50) NOT NULL,
product_name NVARCHAR(200) NOT NULL,
category NVARCHAR(50) NOT NULL,
sub_category NVARCHAR(50) NULL,
brand NVARCHAR(50) NULL,
unit_cost DECIMAL(10,2) NULL,
unit_price DECIMAL(10,2) NOT NULL,
supplier_id VARCHAR(50) NULL,
is_active BIT NULL,
created_at DATETIME NULL,
updated_at DATETIME NULL
);
GO
CREATE NONCLUSTERED INDEX idx_products_scd2_bk
ON dbo.products_scd2(product_id, is_current);
CREATE NONCLUSTERED INDEX idx_products_scd2_period
ON dbo.products_scd2(start_date, end_date);
CREATE NONCLUSTERED INDEX idx_products_scd2_batch
ON dbo.products_scd2(psa_batch_id);
GO
Prosedur Tersimpan ETL SCD2: Tabel Pelanggan
Dari sini kita masuk ke bagian utama ETL. Kodenya akan agak panjang, tetapi logika pemrosesan setiap kodenya sama.
Implementasi penuh
-- ============================================================
-- SCD2 加载存储过程:客户表
-- 使用 MERGE 语句一次完成 新增 / 更新 / 删除 的判定
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.sp_load_customers_scd2', 'P') IS NOT NULL
DROP PROCEDURE dbo.sp_load_customers_scd2;
GO
CREATE PROCEDURE dbo.sp_load_customers_scd2
@batch_id VARCHAR(50)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @start_time DATETIME = GETDATE();
DECLARE @current_date DATETIME = GETDATE();
DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
DECLARE @rows_inserted INT = 0;
DECLARE @rows_closed INT = 0;
DECLARE @error_msg NVARCHAR(MAX);
BEGIN TRY
-- 记录 ETL 开始
INSERT INTO etl_db.dbo.etl_log (
batch_id, layer_name, db_name, table_name,
start_time, status
)
VALUES (
@batch_id, 'psa_scd2', 'psa_db', 'customers_scd2',
@start_time, 'RUNNING'
);
-- ================================================
-- SCD2 MERGE 语句
-- 匹配逻辑:ON target(customer_id) = source(customer_id)
-- AND target.is_current="Y"(只匹配当前版本)
-- ================================================
;MERGE dbo.customers_scd2 AS target
USING (
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
region,
register_date,
customer_type,
is_active,
created_at,
updated_at,
-- 计算业务字段的MD5校验和
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(customer_name, N''),
N'|', ISNULL(email, N''),
N'|', ISNULL(phone, N''),
N'|', ISNULL(address, N''),
N'|', ISNULL(city, N''),
N'|', ISNULL(region, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
N'|', ISNULL(customer_type, N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.customers
) AS source
ON (
target.customer_id = source.customer_id
AND target.is_current="Y"
)
-- ② 属性变化 → 关闭旧版本
WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
THEN UPDATE SET
target.end_date = DATEADD(SECOND, -1, @current_date),
target.is_current="N"
-- ③ 新增数据 → 插入新行
WHEN NOT MATCHED BY TARGET
THEN INSERT (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
customer_id, customer_name, email, phone,
address, city, region, register_date,
customer_type, is_active, created_at, updated_at
) VALUES (
@batch_id,
'business_db.dbo.customers',
source.psa_checksum,
@current_date,
@far_future,
'Y',
source.customer_id,
source.customer_name,
source.email,
source.phone,
source.address,
source.city,
source.region,
source.register_date,
source.customer_type,
source.is_active,
source.created_at,
source.updated_at
)
-- ④ 软删除(源表已删除)→ 关闭旧版本
WHEN NOT MATCHED BY SOURCE
AND target.is_current="Y"
THEN UPDATE SET
target.end_date = @current_date,
target.is_current="N";
-- ================================================
-- 插入【被关闭的旧记录对应的新版本数据】
-- ================================================
INSERT INTO dbo.customers_scd2 (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
customer_id, customer_name, email, phone,
address, city, region, register_date,
customer_type, is_active, created_at, updated_at
)
SELECT
@batch_id,
'business_db.dbo.customers',
s.psa_checksum,
@current_date,
@far_future,
'Y',
s.customer_id,
s.customer_name,
s.email,
s.phone,
s.address,
s.city,
s.region,
s.register_date,
s.customer_type,
s.is_active,
s.created_at,
s.updated_at
FROM (
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
region,
register_date,
customer_type,
is_active,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(customer_name, N''),
N'|', ISNULL(email, N''),
N'|', ISNULL(phone, N''),
N'|', ISNULL(address, N''),
N'|', ISNULL(city, N''),
N'|', ISNULL(region, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
N'|', ISNULL(customer_type, N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.customers
) s
-- 只插入:刚刚被 MERGE 关闭的旧记录(即发生更新的客户)
INNER JOIN dbo.customers_scd2 t
ON s.customer_id = t.customer_id
AND t.is_current="N"
AND t.end_date = DATEADD(SECOND, -1, @current_date)
-- 避免重复插入
WHERE NOT EXISTS (
SELECT 1
FROM dbo.customers_scd2
WHERE customer_id = s.customer_id
AND is_current="Y"
);
-- ================================================
-- 汇总统计
-- ================================================
SELECT @rows_inserted = COUNT(*)
FROM dbo.customers_scd2
WHERE psa_batch_id = @batch_id;
SELECT @rows_closed = COUNT(*)
FROM dbo.customers_scd2
WHERE end_date >= @current_date
AND end_date < @far_future
AND psa_batch_id <> @batch_id
AND is_current="N";
-- 更新 ETL 日志
UPDATE etl_db.dbo.etl_log
SET
end_time = GETDATE(),
rows_inserted = ISNULL(@rows_inserted, 0),
rows_updated = ISNULL(@rows_closed, 0),
status="SUCCESS"
WHERE batch_id = @batch_id
AND table_name="customers_scd2";
PRINT N'✅ 客户表 SCD2 加载完成';
PRINT N' 本次新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');
PRINT N' 历史版本关闭: ' + ISNULL(CAST(@rows_closed AS VARCHAR), '0');
END TRY
BEGIN CATCH
SET @error_msg = ERROR_MESSAGE();
UPDATE etl_db.dbo.etl_log
SET
end_time = GETDATE(),
status="FAILED",
error_message = @error_msg
WHERE batch_id = @batch_id
AND table_name="customers_scd2";
THROW;
END CATCH;
END;
GO
Prosedur Tersimpan SCD2 ETL: Tabel Pesanan
-- ============================================================
-- SCD2 加载存储过程:订单表
-- 订单的核心变化是状态流转(pending→confirmed→shipped/cancelled)
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.sp_load_orders_scd2', 'P') IS NOT NULL
DROP PROCEDURE dbo.sp_load_orders_scd2;
GO
CREATE PROCEDURE dbo.sp_load_orders_scd2
@batch_id VARCHAR(50)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @current_date DATETIME = GETDATE();
DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
DECLARE @rows_inserted INT = 0;
DECLARE @rows_closed INT = 0;
DECLARE @error_msg NVARCHAR(MAX);
BEGIN TRY
INSERT INTO etl_db.dbo.etl_log (
batch_id, layer_name, db_name, table_name,
start_time, status
)
VALUES (
@batch_id, 'psa_scd2', 'psa_db', 'orders_scd2',
GETDATE(), 'RUNNING'
);
;MERGE dbo.orders_scd2 AS target
USING (
SELECT
order_id,
customer_id,
product_id,
order_date,
quantity,
unit_price,
total_amount,
status,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(order_id, N''),
N'|', ISNULL(customer_id, N''),
N'|', ISNULL(product_id, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
N'|', ISNULL(status, N'')
)
), 2) AS psa_checksum
FROM business_db.dbo.orders
) AS source
ON (
target.order_id = source.order_id
AND target.is_current="Y"
)
-- 属性变化(订单状态变更)
WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
THEN UPDATE SET
target.end_date = DATEADD(SECOND, -1, @current_date),
target.is_current="N"
-- 新增订单
WHEN NOT MATCHED BY TARGET
THEN INSERT (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
order_id, customer_id, product_id, order_date,
quantity, unit_price, total_amount, status,
created_at, updated_at
) VALUES (
@batch_id,
'business_db.dbo.orders',
source.psa_checksum,
@current_date,
@far_future,
'Y',
source.order_id,
source.customer_id,
source.product_id,
source.order_date,
source.quantity,
source.unit_price,
source.total_amount,
source.status,
source.created_at,
source.updated_at
)
-- 订单取消/删除
WHEN NOT MATCHED BY SOURCE
AND target.is_current="Y"
THEN UPDATE SET
target.end_date = @current_date,
target.is_current="N";
-- =============================================
-- 补插【数据更新后的新版本记录】
-- =============================================
INSERT INTO dbo.orders_scd2 (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
order_id, customer_id, product_id, order_date,
quantity, unit_price, total_amount, status,
created_at, updated_at
)
SELECT
@batch_id,
'business_db.dbo.orders',
s.psa_checksum,
@current_date,
@far_future,
'Y',
s.order_id,
s.customer_id,
s.product_id,
s.order_date,
s.quantity,
s.unit_price,
s.total_amount,
s.status,
s.created_at,
s.updated_at
FROM (
SELECT
order_id,
customer_id,
product_id,
order_date,
quantity,
unit_price,
total_amount,
status,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(order_id, N''),
N'|', ISNULL(customer_id, N''),
N'|', ISNULL(product_id, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
N'|', ISNULL(status, N'')
)
), 2) AS psa_checksum
FROM business_db.dbo.orders
) s
INNER JOIN dbo.orders_scd2 t
ON s.order_id = t.order_id
AND t.is_current="N"
AND t.end_date = DATEADD(SECOND, -1, @current_date)
WHERE NOT EXISTS (
SELECT 1 FROM dbo.orders_scd2
WHERE order_id = s.order_id AND is_current="Y"
);
SELECT @rows_inserted = COUNT(*)
FROM dbo.orders_scd2
WHERE psa_batch_id = @batch_id;
SELECT @rows_closed = COUNT(*)
FROM dbo.orders_scd2
WHERE end_date >= @current_date
AND end_date < @far_future
AND psa_batch_id <> @batch_id
AND is_current="N";
UPDATE etl_db.dbo.etl_log
SET
end_time = GETDATE(),
rows_inserted = ISNULL(@rows_inserted, 0),
rows_updated = ISNULL(@rows_closed, 0),
status="SUCCESS"
WHERE batch_id = @batch_id
AND table_name="orders_scd2";
PRINT N'✅ 订单表 SCD2 加载完成';
PRINT N' 新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');
END TRY
BEGIN CATCH
SET @error_msg = ERROR_MESSAGE();
UPDATE etl_db.dbo.etl_log
SET end_time = GETDATE(), status="FAILED", error_message = @error_msg
WHERE batch_id = @batch_id AND table_name="orders_scd2";
THROW;
END CATCH;
END;
GO
Prosedur tersimpan SCD2 ETL: tabel produk
-- ============================================================
-- SCD2 加载存储过程:商品表
-- 商品表的变化包括:价格调整、品类调整、上下架
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.sp_load_products_scd2', 'P') IS NOT NULL
DROP PROCEDURE dbo.sp_load_products_scd2;
GO
CREATE PROCEDURE dbo.sp_load_products_scd2
@batch_id VARCHAR(50)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @current_date DATETIME = GETDATE();
DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
DECLARE @rows_inserted INT = 0;
DECLARE @rows_closed INT = 0;
DECLARE @error_msg NVARCHAR(MAX);
BEGIN TRY
INSERT INTO etl_db.dbo.etl_log (
batch_id, layer_name, db_name, table_name,
start_time, status
)
VALUES (
@batch_id, 'psa_scd2', 'psa_db', 'products_scd2',
GETDATE(), 'RUNNING'
);
;MERGE dbo.products_scd2 AS target
USING (
SELECT
product_id,
product_name,
category,
sub_category,
brand,
unit_cost,
unit_price,
supplier_id,
is_active,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(product_name, N''),
N'|', ISNULL(category, N''),
N'|', ISNULL(sub_category, N''),
N'|', ISNULL(brand, N''),
N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.products
) AS source
ON (
target.product_id = source.product_id
AND target.is_current="Y"
)
WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
THEN UPDATE SET
target.end_date = DATEADD(SECOND, -1, @current_date),
target.is_current="N"
WHEN NOT MATCHED BY TARGET
THEN INSERT (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
product_id, product_name, category, sub_category,
brand, unit_cost, unit_price, supplier_id,
is_active, created_at, updated_at
) VALUES (
@batch_id,
'business_db.dbo.products',
source.psa_checksum,
@current_date,
@far_future,
'Y',
source.product_id,
source.product_name,
source.category,
source.sub_category,
source.brand,
source.unit_cost,
source.unit_price,
source.supplier_id,
source.is_active,
source.created_at,
source.updated_at
)
WHEN NOT MATCHED BY SOURCE
AND target.is_current="Y"
THEN UPDATE SET
target.end_date = @current_date,
target.is_current="N";
-- ================================================
-- 插入【被关闭的旧记录对应的新版本数据】
-- ================================================
INSERT INTO dbo.products_scd2 (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
product_id, product_name, category, sub_category,
brand, unit_cost, unit_price, supplier_id,
is_active, created_at, updated_at
)
SELECT
@batch_id,
'business_db.dbo.products',
s.psa_checksum,
@current_date,
@far_future,
'Y',
s.product_id,
s.product_name,
s.category,
s.sub_category,
s.brand,
s.unit_cost,
s.unit_price,
s.supplier_id,
s.is_active,
s.created_at,
s.updated_at
FROM (
SELECT
product_id, product_name, category, sub_category,
brand, unit_cost, unit_price, supplier_id,
is_active, created_at, updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(product_name, N''),
N'|', ISNULL(category, N''),
N'|', ISNULL(sub_category, N''),
N'|', ISNULL(brand, N''),
N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.products
) s
INNER JOIN dbo.products_scd2 t
ON s.product_id = t.product_id
AND t.is_current="N"
AND t.end_date = DATEADD(SECOND, -1, @current_date)
WHERE NOT EXISTS (
SELECT 1
FROM dbo.products_scd2
WHERE product_id = s.product_id AND is_current="Y"
);
SELECT @rows_inserted = COUNT(*)
FROM dbo.products_scd2
WHERE psa_batch_id = @batch_id;
UPDATE etl_db.dbo.etl_log
SET end_time = GETDATE(), rows_inserted = @rows_inserted, status="SUCCESS"
WHERE batch_id = @batch_id AND table_name="products_scd2";
PRINT N'✅ 商品表 SCD2 加载完成,插入 ' + CAST(@rows_inserted AS VARCHAR);
END TRY
BEGIN CATCH
SET @error_msg = ERROR_MESSAGE();
UPDATE etl_db.dbo.etl_log
SET end_time = GETDATE(), status="FAILED", error_message = @error_msg
WHERE batch_id = @batch_id AND table_name="products_scd2";
THROW;
END CATCH;
END;
GO
Demonstrasi skenario: menyelesaikan siklus hidup ETL
Langkah 0: Persiapan Lingkungan
Pastikan itu telah dibuat
business_db(Perpustakaan Bisnis) danpsa_db(Perpustakaan PSA), baca penjelasan rinci tentang lapisan PSA (tempat penyimpanan sementara persistensi) untuk struktur Tabel tertentu. Jika Anda belum menjalankan skrip untuk menambahkan data pengujian, Anda harus menjalankannya terlebih dahulu. Jika sudah berjalan sesuai artikel, TRUNCATE dulu data yang ada di tabel bisnis, lalu buat ulang data pengujiannya. Script persiapannya adalah sebagai berikut:
-- 清理已有测试数据
TRUNCATE TABLE [business_db].[dbo].[customers]
TRUNCATE TABLE [business_db].[dbo].[orders]
TRUNCATE TABLE [business_db].[dbo].[products]
TRUNCATE TABLE [psa_db].[dbo].[customers_scd2]
TRUNCATE TABLE [psa_db].[dbo].[orders_scd2]
TRUNCATE TABLE [psa_db].[dbo].[products_scd2]
-- 插入模拟业务数据
USE business_db;
GO
-- 客户数据
INSERT INTO dbo.customers (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type, is_active)
VALUES
(N'C001', N'张三', 'zhangsan@email.com', '13800138001', N'北京市朝阳区建国路88号', N'北京', N'华北', '2023-01-15', 'individual', 1),
(N'C002', N'李四科技有限公司', 'lisi@company.com', '13800138002', N'上海市浦东新区张江路100号', N'上海', N'华东', '2023-03-20', 'enterprise', 1),
(N'C003', N'王五', 'wangwu@email.com', '13800138003', N'广州市天河区体育西路103号', N'广州', N'华南', '2023-06-10', 'individual', 1),
(N'C004', N'赵六集团', 'zhaoliu@group.com', '13800138004', N'深圳市南山区科技园南路10号', N'深圳', N'华南', '2023-08-05', 'enterprise', 1),
(N'C005', N'钱七', 'qianqi@email.com', '13800138005', N'杭州市西湖区文三路478号', N'杭州', N'华东', '2024-01-20', 'individual', 1);
GO
-- 商品数据
INSERT INTO dbo.products (product_id, product_name, category, sub_category, brand, unit_cost, unit_price, supplier_id, is_active)
VALUES
('P001', N'iPhone 15 Pro', N'电子产品', N'手机', N'Apple', 6500.00, 8999.00, 'S001', 1),
('P002', N'MacBook Pro 16', N'电子产品', N'笔记本', N'Apple', 15000.00, 19999.00, 'S001', 1),
('P003', N'AirPods Pro 2', N'电子产品', N'耳机', N'Apple', 1200.00, 1899.00, 'S001', 1),
('P004', N'华为Mate 60 Pro', N'电子产品', N'手机', N'华为', 4500.00, 6999.00, 'S002', 1),
('P005', N'戴森吸尘器V15', N'家用电器', N'吸尘器', N'Dyson', 2800.00, 4999.00, 'S003', 1),
('P006', N'索尼WH-1000XM5', N'电子产品', N'耳机', N'Sony', 1800.00, 2999.00, 'S004', 1),
('P007', N'小米空气净化器', N'家用电器', N'空气净化器', N'小米', 600.00, 1299.00, 'S005', 1),
('P008', N'iPad Air 5', N'电子产品', N'平板', N'Apple', 3200.00, 4799.00, 'S001', 1);
GO
-- 订单数据
INSERT INTO dbo.orders (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O001', 'C001', 'P001', '2024-01-15', 1, 8999.00, 8999.00, 'shipped'),
('O002', 'C002', 'P002', '2024-01-16', 5, 19999.00, 99995.00, 'confirmed'),
('O003', 'C001', 'P003', '2024-01-17', 2, 1899.00, 3798.00, 'pending'),
('O004', 'C003', 'P004', '2024-01-18', 1, 6999.00, 6999.00, 'shipped'),
('O005', 'C004', 'P005', '2024-01-19', 3, 4999.00, 14997.00, 'confirmed'),
('O006', 'C001', 'P002', '2024-01-20', 1, 19999.00, 19999.00, 'pending'),
('O007', 'C005', 'P006', '2024-01-21', 1, 2999.00, 2999.00, 'cancelled'),
('O008', 'C002', 'P001', '2024-01-22', 10, 8999.00, 89990.00, 'confirmed'),
('O009', 'C003', 'P007', '2024-01-23', 2, 1299.00, 2598.00, 'shipped'),
('O010', 'C004', 'P008', '2024-01-24', 4, 4799.00, 19196.00, 'pending');
GO
-- 验证数据
SELECT 'orders' AS table_name, COUNT(*) AS row_count FROM dbo.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM dbo.customers
UNION ALL
SELECT 'products', COUNT(*) FROM dbo.products;
GO
Langkah 1: Pemuatan Awal (Hari 1)
-- ============================================================
-- 第一天:初始全量加载
-- ============================================================
DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240115_001';
PRINT N'========== 第一天初始加载 ==========';
PRINT N'批次号: ' + @batch_id;
EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;
Status tabel pelanggan setelah dimuat pada hari pertama:
| id_pelanggan | nama_pelanggan | alamat | tanggal_mulai | tanggal_akhir | adalah_saat ini |
|---|---|---|---|---|---|
| C001 | Zhang San | No.88, Jalan Jianguo, Distrik Chaoyang, Beijing | 15-01-2024 | 9999-12-31 | Y |
| C002 | Li Si Technology Co., Ltd. | No.100, Jalan Zhangjiang, Area Baru Pudong, Shanghai | 15-01-2024 | 9999-12-31 | Y |
| C003 | Wang Wu | No.103, Jalan Tiyu Barat, Distrik Tianhe, Guangzhou | 15-01-2024 | 9999-12-31 | Y |
| C004 | Grup Zhaoliu | No.10, Science Park South Road, Distrik Nanshan, Shenzhen | 15-01-2024 | 9999-12-31 | Y |
| C005 | Qian Qi | No.478, Jalan Wensan, Distrik Xihu, Hangzhou | 15-01-2024 | 9999-12-31 | Y |
Langkah 2: Simulasikan perubahan bisnis (hari kedua)
-- ============================================================
-- 第二天:模拟业务系统发生变更
-- ============================================================
USE business_db;
GO
-- 变更①:客户属性变化 — 张三搬家
UPDATE dbo.customers
SET address = N'北京市朝阳区望京SOHO',
updated_at = GETDATE()
WHERE customer_id = 'C001';
-- 变更②:新增客户 — 孙八
INSERT INTO dbo.customers
(customer_id, customer_name, email, phone, address, city, region, register_date, customer_type)
VALUES
(N'C006', N'孙八', 'sunba@email.com', '13800138006',
N'成都市高新区天府大道888号', N'成都', N'西南', '2024-01-25', 'individual');
-- 变更③:客户注销(软删除)
UPDATE dbo.customers
SET is_active = 0,
updated_at = GETDATE()
WHERE customer_id = 'C005';
-- 变更④:订单状态流转
UPDATE dbo.orders
SET status="shipped",
updated_at = GETDATE()
WHERE order_id IN ('O003', 'O006');
-- 变更⑤:商品价格调整
UPDATE dbo.products
SET unit_price = 6499.00,
updated_at = GETDATE()
WHERE product_id = 'P004'; -- 华为Mate 60 Pro 降价
-- 变更⑥:新订单
INSERT INTO dbo.orders
(order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O011', 'C001', 'P004', '2024-01-25', 2, 6499.00, 12998.00, 'pending');
Ringkasan perubahan bisnis:
| Ubah jenis kelamin | kunci bisnis | Ubah kontennya |
|---|---|---|
| perubahan properti | Babak 001 Zhang San | Alamat: Jalan No.88 Jianguo → Wangjing SOHO |
| Tambahkan pelanggan baru | Bab 006 Sunba | Akun pelanggan baru |
| hapus dengan lembut | Bab 005 Qian Qi | is_aktif:1 → 0 |
| Status pesanan | O003/O006 | tertunda → terkirim |
| penyesuaian harga produk | P004 | harga_satuan: 6999 → 6499 |
| pesanan baru | O011 | pesanan baru |
Langkah 3: ETL dimuat pada hari berikutnya
-- ============================================================
-- 第二天:SCD2增量加载
-- ============================================================
DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240125_002';
PRINT N'========== 第二天增量加载 ==========';
PRINT N'批次号: ' + @batch_id;
EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;
Rantai versi lengkap setelah pengunggahan hari berikutnya:
【客户 C001 — 张三】(属性变化)
├── V1: 建国路88号 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: 望京SOHO | 2024-01-25 → 9999-12-31 | Y(当前)
【客户 C005 — 钱七】(软删除)
└── V1: 西湖区文三路478号 | 2024-01-15 → 2024-01-25 | N(已删除,无新版本)
【客户 C006 — 孙八】(新增)
└── V1: 天府大道888号 | 2024-01-25 → 9999-12-31 | Y(当前)
【订单 O003 — 状态流转】
├── V1: status=pending | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: status=shipped | 2024-01-25 → 9999-12-31 | Y(当前)
【商品 P004 — 华为Mate 60 Pro】(价格变更)
├── V1: unit_price=6999 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: unit_price=6499 | 2024-01-25 → 9999-12-31 | Y(当前)
【其余客户/订单/商品】→ 无变化,不产生新记录 ✅
Metode pertanyaan
Pertanyaan 1: Data valid sekarang
-- 查询当前所有有效客户(去历史版本)
SELECT
customer_id,
customer_name,
address,
is_active,
start_date AS effective_from
FROM psa_db.dbo.customers_scd2
WHERE is_current="Y"
ORDER BY customer_id;
Pertanyaan 2: Jepretan kapan saja
-- 查询2024-01-20各客户的地址(张三变更前的快照)
DECLARE @as_of_date DATETIME = '2024-01-20 23:59:59';
SELECT
customer_id,
customer_name,
address,
city
FROM psa_db.dbo.customers_scd2
WHERE start_date <= @as_of_date
AND end_date > @as_of_date;
Pertanyaan 3: Riwayat perubahan pelanggan (rantai versi lengkap)
-- 查询C001的完整变更记录
SELECT
customer_id,
customer_name,
address,
customer_type,
is_active,
start_date,
end_date,
CASE
WHEN end_date="9999-12-31 23:59:59" THEN '🔵 当前版本'
WHEN is_active = 0 THEN '🔴 已删除'
ELSE '⚪ 历史版本'
END AS version_status
FROM psa_db.dbo.customers_scd2
WHERE customer_id = 'C001'
ORDER BY start_date;
Pertanyaan 4: Lengkapi urutan siklus hidup
-- 查询O003订单的完整状态流转
SELECT
order_id,
customer_id,
product_id,
quantity,
total_amount,
status,
start_date AS 状态生效时间,
end_date AS 状态失效时间,
CASE
WHEN end_date="9999-12-31 23:59:59" THEN '🔵 当前状态'
ELSE '⚪ 已流转'
END AS version_status
FROM psa_db.dbo.orders_scd2
WHERE order_id = 'O003'
ORDER BY start_date;
Pertanyaan 5: Mempertanyakan sejarah harga produk
-- 查询P004的历史价格变动
SELECT
product_id,
product_name,
unit_price,
brand,
start_date AS 生效日期,
CASE
WHEN end_date="9999-12-31 23:59:59" THEN '当前价格'
ELSE CONVERT(VARCHAR(10), DATEADD(DAY, -1, end_date), 120)
END AS 失效日期,
CASE
WHEN end_date="9999-12-31 23:59:59" THEN '🔵'
ELSE '⚪'
END AS flag
FROM psa_db.dbo.products_scd2
WHERE product_id = 'P004'
ORDER BY start_date;
Pertanyaan 6: Ubah laporan statistik
-- 统计各批次的新增/变更/删除情况
SELECT
batch_id,
table_name,
rows_inserted AS 新增或变更行数,
rows_updated AS 关闭的历史行数,
status,
CONVERT(VARCHAR(19), start_time, 120) AS 开始时间,
CONVERT(VARCHAR(19), end_time, 120) AS 结束时间,
DATEDIFF(SECOND, start_time, end_time) AS 耗时_秒
FROM etl_db.dbo.etl_log
WHERE layer_name="psa_scd2"
ORDER BY start_time DESC;
Metode snapshot vs metode SCD2: perbandingan komprehensif
| ukuran | Metode snapshot (teks asli) | Pendekatan SCD2 (artikel ini) |
|---|---|---|
| menulis strategi | MASUKKAN N baris sepenuhnya setiap kali | Hanya baris tulis yang diubah |
| model penyimpanan | Urutan horizontal (N batch = N × jumlah baris) | Rantai versi vertikal (hanya perubahan yang disimpan) |
| Ubah granularitas pelacakan | Level batch (hanya mengetahui “ada perubahan dalam batch tertentu”) | Tingkat baris (mengetahui secara pasti “bagian data yang berubah”) |
| Ubah kontennya | Tidak dikenal | Perbandingan lengkap versi lama + versi baru |
| Pertanyaan sejarah | Filter berdasarkan batch, tidak akurat | start_date/end_date Interval presisi |
| Tingkatkan pengakuan | psa_operation='I' |
is_current="Y" dan diinstal untuk pertama kalinya |
| Ubah pengakuan | Tidak dikenal | Checksum memicu SCD2 dengan cara yang berbeda |
| Hapus pengakuan | psa_operation='D'(membutuhkan perawatan tambahan) |
Pengenalan alami (Tidak kompatibel dengan SUMBER) |
| biaya penyimpanan | O(n × jumlah_batch) | O(n × rata-rata_perubahan_hitungan) |
| Kompleksitas implementasi | RENDAH (PILIH MASUKKAN) | Sedang (MERGE + logika masa berlaku) |
| Skenario yang berlaku | Tabel kecil, perubahan frekuensi rendah, pembuatan prototipe cepat | Tabel besar, perubahan sering, kepatuhan audit |
| Konsumsi hilir | Sederhana (ambil batch terbaru) | Perlu memfilter is_current="Y" |
| Ketertelusuran data | Terbatas (ukuran batch) | Penuh (dimensi ganda garis + waktu) |
meringkaskan
Melalui artikel ini, Anda dapat melihat keseluruhan proses SCD2 dan implementasi spesifiknya di SQL Server.
SCD2 adalah solusi dasar untuk melacak riwayat data warehouse. Setelah menguasai artikel ini, data warehouse juga tersedia dengan cara yang lebih efisien.“Mesin Waktu”——Anda dapat kembali kapan saja dan kapan saja untuk melihat status data yang benar.
OMONG-OMONG:
Untuk meringkas tujuan artikel ini, pertama-tama saya akan membuat ringkasan pengalaman kerja saya sebelumnya. Kedua, jika saya harus melakukan hal yang sama di masa mendatang, menggunakan alat AI untuk merujuk ke URL artikel ini akan membantu AI menghasilkan kode yang diinginkan dengan lebih efisien dan akurat.
tautan terkait
PakarPBN
A Private Blog Network (PBN) is a collection of websites that are controlled by a single individual or organization and used primarily to build backlinks to a “money site” in order to influence its ranking in search engines such as Google. The core idea behind a PBN is based on the importance of backlinks in Google’s ranking algorithm. Since Google views backlinks as signals of authority and trust, some website owners attempt to artificially create these signals through a controlled network of sites.
In a typical PBN setup, the owner acquires expired or aged domains that already have existing authority, backlinks, and history. These domains are rebuilt with new content and hosted separately, often using different IP addresses, hosting providers, themes, and ownership details to make them appear unrelated. Within the content published on these sites, links are strategically placed that point to the main website the owner wants to rank higher. By doing this, the owner attempts to pass link equity (also known as “link juice”) from the PBN sites to the target website.
The purpose of a PBN is to give the impression that the target website is naturally earning links from multiple independent sources. If done effectively, this can temporarily improve keyword rankings, increase organic visibility, and drive more traffic from search results.