34 mins read

Catatan gudang data Bagian 6: Metode pemrosesan lapisan PSA SCD2 – Copenhagen Husky (aspnetx)

**SCD2 (Perlahan-lahan Mengubah Dimensi Tipe 2)** adalah standar industri dan solusi yang paling umum digunakan untuk menangani perubahan historis dalam pemodelan dimensi. Dibandingkan dengan metode snapshot, SCD2 hanya menulis baris data yang benar-benar berubah, secara akurat mencatat siklus hidup setiap data melalui bidang “masa validitas”, dan mencapai ketertelusuran historis lengkap dengan biaya penyimpanan yang sangat rendah. Artikel ini menjelaskan alur kerja dengan kode mendetail. Basis data yang digunakan dalam catatan ini adalah SQL Server, dan contoh skrip terkait didasarkan pada basis data ini. Implementasinya terkait dengan database lain akan sedikit berbeda.

ringkasan

SCD2 (Perlahan-lahan Mengubah Dimensi Tipe 2) Ini adalah standar industri dan solusi yang paling umum digunakan untuk menangani perubahan historis dalam pemodelan dimensi. Dibandingkan dengan metode snapshot, SCD2 hanya menulis baris data yang benar-benar berubah. 有效期 Fields secara akurat mencatat siklus hidup setiap bagian data, mencapai ketertelusuran historis penuh dengan biaya penyimpanan yang sangat rendah. Artikel ini menjelaskan alur kerja dengan kode mendetail.

Basis data yang digunakan dalam catatan ini adalah SQL Server, dan contoh skrip terkait didasarkan pada basis data ini. Implementasinya terkait dengan database lain akan sedikit berbeda.

Dalam artikel “Penjelasan Detail Lapisan PSA (Persistence Temporary Area)”, lapisan PSA menggunakanCuplikan mode penuhMemuat data – ETL menulis data lengkap dari tabel sumber ke PSA setiap saat, dan semua catatan ditandai secara seragam psa_operation='I'. Keuntungan metode ini adalah mudah diterapkan dan cocok untuk skenario di mana jumlah datanya sedikit dan perubahannya jarang terjadi.

Namun dalam lingkungan produksi nyata, keterbatasan metode snapshot jelas:

pertanyaan ilustrasi
Tidak dapat melacak riwayat perubahan Tidak mungkin mengetahui apakah nilai suatu data tertentu berubah dari nilai apa ke nilai apa pada hari itu.
Penyimpanan limbah berat Penulisan penuh setiap saat, volume data berkembang pesat

Artikel ini akan mengambil tabel pelanggan, tabel pesanan, dan tabel produk dari lapisan PSA sebagai contoh untuk mendemonstrasikan sepenuhnya cara membangun kembali lapisan PSA menggunakan SCD2.


konsep inti SCD2

Tiga Aturan Inti Tipe 2

SCD2 menambahkan tiga bidang utama ke setiap catatan sejarah:

bidang jenis ilustrasi
start_date TANGGAL WAKTU Waktu mulai efektif untuk data versi ini
end_date TANGGAL WAKTU Waktu habis masa berlaku versi data ini (9999-12-31 berarti valid saat ini)
is_current karakter(1) Y = versi saat ini, N = versi historis

Tiga aturan perubahan:

┌─────────────────────────────────────────────────────────────┐
│                    SCD2 变更判定逻辑                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  新增(源表有,PSA无)                                      │
│  → 插入新行,start_date=当前时间,end_date=9999-12-31,     │
│    is_current="Y"                                           │
│                                                             │
│  属性变化(源表有,PSA有,但Checksum不同)                   │
│  → 旧行标记过期:end_date=当前时间-1秒,is_current="N"      │
│  → 插入新行:start_date=当前时间,end_date=9999-12-31,     │
│    is_current="Y"                                           │
│                                                             │
│  删除(源表无,PSA有)                                      │
│  → 旧行标记过期:end_date=当前时间,is_current="N"          │
│  → 不插入新行                                               │
│                                                             │
│  无变化(源表有,PSA有,Checksum相同)                       │
│  → 不做任何操作                                             │
└─────────────────────────────────────────────────────────────┘

Cara mendeteksi perubahan

Agar gudang data dapat melacak riwayat perubahan data, penting untuk mengidentifikasi perubahan pada data sistem sumber. Namun tabel sistem sumber biasanya memiliki banyak kolom. Bagaimana cara kerja deteksi perubahan? Apakah Anda membandingkan kolom demi kolom?

Metode yang paling umum digunakan adalah verifikasi data Checksum, yang merupakan inti dari deteksi perubahan SCD2. Setiap kali ETL dilakukan, nilai MD5 dari seluruh bidang bisnis di setiap baris tabel sumber dihitung:

-- 示例:张三的Checksum计算
CONVERT(VARCHAR(32), HASHBYTES('MD5',
    CONCAT(
        '张三',          -- customer_name
        '|', 'zhangsan@email.com',   -- email
        '|', '13800138001',          -- phone
        '|', '北京市朝阳区建国路88号', -- address
        ...
    )
), 2)
-- 结果:0xE8B2A5C3D9F1...  ← 32位MD5值

Artinya, field yang akan dideteksi akan dimasukkan ke dalam field tersebut dan kemudian diubah menjadi nilai MD5. Dengan cara ini, selama terjadi perubahan bidang usaha maka nilai MD5 akan berbeda-beda, sehingga dapat menyebabkan terjadinya proses perubahan SCD2, sehingga dapat mengidentifikasi perubahan pada data sistem sumber dengan lebih efisien.


Desain struktur tabel PSA-SCD2

Seperti ketiga tabel PSA yang dibuat pada artikel kedua, disini kita membuat tabel dengan akhiran scd2 untuk membedakannya dengan tabel pada artikel kedua.

Tabel pelanggan dari struktur SCD2

-- ============================================================
-- PSA-SCD2 客户表
-- 相比快照方式:增加 start_date / end_date / is_current
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.customers_scd2', 'U') IS NOT NULL
    DROP TABLE dbo.customers_scd2;
GO

CREATE TABLE dbo.customers_scd2 (
    -- PSA 系统字段
    psa_record_seq     BIGINT IDENTITY(1,1) NOT NULL,
    psa_load_time      DATETIME NOT NULL DEFAULT GETDATE(),
    psa_batch_id       VARCHAR(50) NOT NULL,
    psa_source_table   VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.customers',
    psa_checksum       VARCHAR(32) NULL,

    -- SCD2 有效期字段
    start_date         DATETIME NOT NULL,
    end_date           DATETIME NOT NULL,
    is_current         CHAR(1) NOT NULL DEFAULT 'Y',

    -- 业务字段
    customer_id        VARCHAR(50) NOT NULL,
    customer_name      NVARCHAR(100) NOT NULL,
    email              VARCHAR(100) NULL,
    phone              VARCHAR(20) NULL,
    address            NVARCHAR(200) NULL,
    city               NVARCHAR(50) NULL,
    region             NVARCHAR(50) NULL,
    register_date      DATE NOT NULL,
    customer_type      VARCHAR(20) NOT NULL,
    is_active          BIT NULL,
    created_at         DATETIME NULL,
    updated_at         DATETIME NULL
);
GO

CREATE NONCLUSTERED INDEX idx_customers_scd2_bk
    ON dbo.customers_scd2(customer_id, is_current);

CREATE NONCLUSTERED INDEX idx_customers_scd2_period
    ON dbo.customers_scd2(start_date, end_date);

CREATE NONCLUSTERED INDEX idx_customers_scd2_batch
    ON dbo.customers_scd2(psa_batch_id);
GO

Deskripsi desain: Tabel SCD2 tidak memiliki batasan kunci utama (karena akan ada beberapa versi historis dari kunci bisnis yang sama). menggunakan (customer_id, is_current="Y") Cari versi saat ini.

Tabel pemesanan struktur SCD2

-- ============================================================
-- PSA-SCD2 订单表
-- 订单的主要变更是状态流转(pending→confirmed→shipped→cancelled)
-- SCD2能够完整记录订单的全生命周期
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.orders_scd2', 'U') IS NOT NULL
    DROP TABLE dbo.orders_scd2;
GO

CREATE TABLE dbo.orders_scd2 (
    -- PSA 系统字段
    psa_record_seq     BIGINT IDENTITY(1,1) NOT NULL,
    psa_load_time      DATETIME NOT NULL DEFAULT GETDATE(),
    psa_batch_id       VARCHAR(50) NOT NULL,
    psa_source_table   VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.orders',
    psa_checksum       VARCHAR(32) NULL,

    -- SCD2 有效期字段
    start_date         DATETIME NOT NULL,
    end_date           DATETIME NOT NULL,
    is_current         CHAR(1) NOT NULL DEFAULT 'Y',

    -- 业务字段
    order_id           VARCHAR(50) NOT NULL,
    customer_id        VARCHAR(50) NOT NULL,
    product_id         VARCHAR(50) NOT NULL,
    order_date         DATE NOT NULL,
    quantity           INT NOT NULL,
    unit_price         DECIMAL(10,2) NOT NULL,
    total_amount       DECIMAL(12,2) NOT NULL,
    status             VARCHAR(20) NOT NULL,  -- pending/confirmed/shipped/cancelled
    created_at         DATETIME NULL,
    updated_at         DATETIME NULL
);
GO

CREATE NONCLUSTERED INDEX idx_orders_scd2_bk
    ON dbo.orders_scd2(order_id, is_current);

CREATE NONCLUSTERED INDEX idx_orders_scd2_period
    ON dbo.orders_scd2(start_date, end_date);

CREATE NONCLUSTERED INDEX idx_orders_scd2_batch
    ON dbo.orders_scd2(psa_batch_id);

CREATE NONCLUSTERED INDEX idx_orders_scd2_customer
    ON dbo.orders_scd2(customer_id, is_current);
GO

Tabel produk struktural SCD2

-- ============================================================
-- PSA-SCD2 商品表
-- 商品表变更多为价格调整、上下架、品类调整
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.products_scd2', 'U') IS NOT NULL
    DROP TABLE dbo.products_scd2;
GO

CREATE TABLE dbo.products_scd2 (
    -- PSA 系统字段
    psa_record_seq     BIGINT IDENTITY(1,1) NOT NULL,
    psa_load_time      DATETIME NOT NULL DEFAULT GETDATE(),
    psa_batch_id       VARCHAR(50) NOT NULL,
    psa_source_table   VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.products',
    psa_checksum       VARCHAR(32) NULL,

    -- SCD2 有效期字段
    start_date         DATETIME NOT NULL,
    end_date           DATETIME NOT NULL,
    is_current         CHAR(1) NOT NULL DEFAULT 'Y',

    -- 业务字段
    product_id         VARCHAR(50) NOT NULL,
    product_name       NVARCHAR(200) NOT NULL,
    category           NVARCHAR(50) NOT NULL,
    sub_category       NVARCHAR(50) NULL,
    brand              NVARCHAR(50) NULL,
    unit_cost          DECIMAL(10,2) NULL,
    unit_price         DECIMAL(10,2) NOT NULL,
    supplier_id        VARCHAR(50) NULL,
    is_active          BIT NULL,
    created_at         DATETIME NULL,
    updated_at         DATETIME NULL
);
GO

CREATE NONCLUSTERED INDEX idx_products_scd2_bk
    ON dbo.products_scd2(product_id, is_current);

CREATE NONCLUSTERED INDEX idx_products_scd2_period
    ON dbo.products_scd2(start_date, end_date);

CREATE NONCLUSTERED INDEX idx_products_scd2_batch
    ON dbo.products_scd2(psa_batch_id);
GO

Prosedur Tersimpan ETL SCD2: Tabel Pelanggan

Dari sini kita masuk ke bagian utama ETL. Kodenya akan agak panjang, tetapi logika pemrosesan setiap kodenya sama.

Implementasi penuh

-- ============================================================
-- SCD2 加载存储过程:客户表
-- 使用 MERGE 语句一次完成 新增 / 更新 / 删除 的判定
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.sp_load_customers_scd2', 'P') IS NOT NULL
    DROP PROCEDURE dbo.sp_load_customers_scd2;
GO

CREATE PROCEDURE dbo.sp_load_customers_scd2
    @batch_id VARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @start_time DATETIME = GETDATE();
    DECLARE @current_date DATETIME = GETDATE();
    DECLARE @far_future DATETIME = '9999-12-31 23:59:59';

    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_closed   INT = 0;
    DECLARE @error_msg NVARCHAR(MAX);

    BEGIN TRY
        -- 记录 ETL 开始
        INSERT INTO etl_db.dbo.etl_log (
            batch_id, layer_name, db_name, table_name,
            start_time, status
        )
        VALUES (
            @batch_id, 'psa_scd2', 'psa_db', 'customers_scd2',
            @start_time, 'RUNNING'
        );

        -- ================================================
        -- SCD2 MERGE 语句
        -- 匹配逻辑:ON target(customer_id) = source(customer_id)
        --           AND target.is_current="Y"(只匹配当前版本)
        -- ================================================
        ;MERGE dbo.customers_scd2 AS target
        USING (
            SELECT
                customer_id,
                customer_name,
                email,
                phone,
                address,
                city,
                region,
                register_date,
                customer_type,
                is_active,
                created_at,
                updated_at,
                -- 计算业务字段的MD5校验和
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(customer_name, N''),
                        N'|', ISNULL(email, N''),
                        N'|', ISNULL(phone, N''),
                        N'|', ISNULL(address, N''),
                        N'|', ISNULL(city, N''),
                        N'|', ISNULL(region, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
                        N'|', ISNULL(customer_type, N''),
                        N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.customers
        ) AS source
        ON (
            target.customer_id = source.customer_id
            AND target.is_current="Y"
        )

        -- ② 属性变化 → 关闭旧版本
        WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
        THEN UPDATE SET
            target.end_date   = DATEADD(SECOND, -1, @current_date),
            target.is_current="N"

        -- ③ 新增数据 → 插入新行
        WHEN NOT MATCHED BY TARGET
        THEN INSERT (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            customer_id, customer_name, email, phone,
            address, city, region, register_date,
            customer_type, is_active, created_at, updated_at
        ) VALUES (
            @batch_id,
            'business_db.dbo.customers',
            source.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            source.customer_id,
            source.customer_name,
            source.email,
            source.phone,
            source.address,
            source.city,
            source.region,
            source.register_date,
            source.customer_type,
            source.is_active,
            source.created_at,
            source.updated_at
        )

        -- ④ 软删除(源表已删除)→ 关闭旧版本
        WHEN NOT MATCHED BY SOURCE
             AND target.is_current="Y"
        THEN UPDATE SET
            target.end_date   = @current_date,
            target.is_current="N";

        -- ================================================
        -- 插入【被关闭的旧记录对应的新版本数据】
        -- ================================================
        INSERT INTO dbo.customers_scd2 (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            customer_id, customer_name, email, phone,
            address, city, region, register_date,
            customer_type, is_active, created_at, updated_at
        )
        SELECT
            @batch_id,
            'business_db.dbo.customers',
            s.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            s.customer_id,
            s.customer_name,
            s.email,
            s.phone,
            s.address,
            s.city,
            s.region,
            s.register_date,
            s.customer_type,
            s.is_active,
            s.created_at,
            s.updated_at
        FROM (
            SELECT
                customer_id,
                customer_name,
                email,
                phone,
                address,
                city,
                region,
                register_date,
                customer_type,
                is_active,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(customer_name, N''),
                        N'|', ISNULL(email, N''),
                        N'|', ISNULL(phone, N''),
                        N'|', ISNULL(address, N''),
                        N'|', ISNULL(city, N''),
                        N'|', ISNULL(region, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
                        N'|', ISNULL(customer_type, N''),
                        N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.customers
        ) s
        -- 只插入:刚刚被 MERGE 关闭的旧记录(即发生更新的客户)
        INNER JOIN dbo.customers_scd2 t
            ON s.customer_id = t.customer_id
            AND t.is_current="N"
            AND t.end_date = DATEADD(SECOND, -1, @current_date)
        -- 避免重复插入
        WHERE NOT EXISTS (
            SELECT 1
            FROM dbo.customers_scd2
            WHERE customer_id = s.customer_id
            AND is_current="Y"
        );


        -- ================================================
        -- 汇总统计
        -- ================================================
        SELECT @rows_inserted = COUNT(*)
        FROM dbo.customers_scd2
        WHERE psa_batch_id = @batch_id;

        SELECT @rows_closed = COUNT(*)
        FROM dbo.customers_scd2
        WHERE end_date >= @current_date
          AND end_date < @far_future
          AND psa_batch_id <> @batch_id
          AND is_current="N";

        -- 更新 ETL 日志
        UPDATE etl_db.dbo.etl_log
        SET
            end_time = GETDATE(),
            rows_inserted = ISNULL(@rows_inserted, 0),
            rows_updated = ISNULL(@rows_closed, 0),
            status="SUCCESS"
        WHERE batch_id = @batch_id
          AND table_name="customers_scd2";

        PRINT N'✅ 客户表 SCD2 加载完成';
        PRINT N'   本次新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');
        PRINT N'   历史版本关闭: ' + ISNULL(CAST(@rows_closed AS VARCHAR), '0');

    END TRY
    BEGIN CATCH
        SET @error_msg = ERROR_MESSAGE();

        UPDATE etl_db.dbo.etl_log
        SET
            end_time = GETDATE(),
            status="FAILED",
            error_message = @error_msg
        WHERE batch_id = @batch_id
          AND table_name="customers_scd2";

        THROW;
    END CATCH;
END;
GO

Prosedur Tersimpan SCD2 ETL: Tabel Pesanan

-- ============================================================
-- SCD2 加载存储过程:订单表
-- 订单的核心变化是状态流转(pending→confirmed→shipped/cancelled)
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.sp_load_orders_scd2', 'P') IS NOT NULL
    DROP PROCEDURE dbo.sp_load_orders_scd2;
GO

CREATE PROCEDURE dbo.sp_load_orders_scd2
    @batch_id VARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @current_date DATETIME = GETDATE();
    DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_closed INT = 0;
    DECLARE @error_msg NVARCHAR(MAX);

    BEGIN TRY
        INSERT INTO etl_db.dbo.etl_log (
            batch_id, layer_name, db_name, table_name,
            start_time, status
        )
        VALUES (
            @batch_id, 'psa_scd2', 'psa_db', 'orders_scd2',
            GETDATE(), 'RUNNING'
        );

        ;MERGE dbo.orders_scd2 AS target
        USING (
            SELECT
                order_id,
                customer_id,
                product_id,
                order_date,
                quantity,
                unit_price,
                total_amount,
                status,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(order_id, N''),
                        N'|', ISNULL(customer_id, N''),
                        N'|', ISNULL(product_id, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
                        N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
                        N'|', ISNULL(status, N'')
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.orders
        ) AS source
        ON (
            target.order_id = source.order_id
            AND target.is_current="Y"
        )

        -- 属性变化(订单状态变更)
        WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
        THEN UPDATE SET
            target.end_date   = DATEADD(SECOND, -1, @current_date),
            target.is_current="N"

        -- 新增订单
        WHEN NOT MATCHED BY TARGET
        THEN INSERT (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            order_id, customer_id, product_id, order_date,
            quantity, unit_price, total_amount, status,
            created_at, updated_at
        ) VALUES (
            @batch_id,
            'business_db.dbo.orders',
            source.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            source.order_id,
            source.customer_id,
            source.product_id,
            source.order_date,
            source.quantity,
            source.unit_price,
            source.total_amount,
            source.status,
            source.created_at,
            source.updated_at
        )

        -- 订单取消/删除
        WHEN NOT MATCHED BY SOURCE
             AND target.is_current="Y"
        THEN UPDATE SET
            target.end_date   = @current_date,
            target.is_current="N";

        -- =============================================
        -- 补插【数据更新后的新版本记录】
        -- =============================================
        INSERT INTO dbo.orders_scd2 (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            order_id, customer_id, product_id, order_date,
            quantity, unit_price, total_amount, status,
            created_at, updated_at
        )
        SELECT
            @batch_id,
            'business_db.dbo.orders',
            s.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            s.order_id,
            s.customer_id,
            s.product_id,
            s.order_date,
            s.quantity,
            s.unit_price,
            s.total_amount,
            s.status,
            s.created_at,
            s.updated_at
        FROM (
            SELECT
                order_id,
                customer_id,
                product_id,
                order_date,
                quantity,
                unit_price,
                total_amount,
                status,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(order_id, N''),
                        N'|', ISNULL(customer_id, N''),
                        N'|', ISNULL(product_id, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
                        N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
                        N'|', ISNULL(status, N'')
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.orders
        ) s
        INNER JOIN dbo.orders_scd2 t
            ON s.order_id = t.order_id
            AND t.is_current="N"
            AND t.end_date = DATEADD(SECOND, -1, @current_date)
        WHERE NOT EXISTS (
            SELECT 1 FROM dbo.orders_scd2
            WHERE order_id = s.order_id AND is_current="Y"
        );


        SELECT @rows_inserted = COUNT(*)
        FROM dbo.orders_scd2
        WHERE psa_batch_id = @batch_id;

        SELECT @rows_closed = COUNT(*)
        FROM dbo.orders_scd2
        WHERE end_date >= @current_date
          AND end_date < @far_future
          AND psa_batch_id <> @batch_id
          AND is_current="N";

        UPDATE etl_db.dbo.etl_log
        SET
            end_time = GETDATE(),
            rows_inserted = ISNULL(@rows_inserted, 0),
            rows_updated = ISNULL(@rows_closed, 0),
            status="SUCCESS"
        WHERE batch_id = @batch_id
          AND table_name="orders_scd2";

        PRINT N'✅ 订单表 SCD2 加载完成';
        PRINT N'   新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');

    END TRY
    BEGIN CATCH
        SET @error_msg = ERROR_MESSAGE();
        UPDATE etl_db.dbo.etl_log
        SET end_time = GETDATE(), status="FAILED", error_message = @error_msg
        WHERE batch_id = @batch_id AND table_name="orders_scd2";
        THROW;
    END CATCH;
END;
GO

Prosedur tersimpan SCD2 ETL: tabel produk

-- ============================================================
-- SCD2 加载存储过程:商品表
-- 商品表的变化包括:价格调整、品类调整、上下架
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.sp_load_products_scd2', 'P') IS NOT NULL
    DROP PROCEDURE dbo.sp_load_products_scd2;
GO

CREATE PROCEDURE dbo.sp_load_products_scd2
    @batch_id VARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @current_date DATETIME = GETDATE();
    DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_closed INT = 0;
    DECLARE @error_msg NVARCHAR(MAX);

    BEGIN TRY
        INSERT INTO etl_db.dbo.etl_log (
            batch_id, layer_name, db_name, table_name,
            start_time, status
        )
        VALUES (
            @batch_id, 'psa_scd2', 'psa_db', 'products_scd2',
            GETDATE(), 'RUNNING'
        );

        ;MERGE dbo.products_scd2 AS target
        USING (
            SELECT
                product_id,
                product_name,
                category,
                sub_category,
                brand,
                unit_cost,
                unit_price,
                supplier_id,
                is_active,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(product_name, N''),
                        N'|', ISNULL(category, N''),
                        N'|', ISNULL(sub_category, N''),
                        N'|', ISNULL(brand, N''),
                        N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
                        N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.products
        ) AS source
        ON (
            target.product_id = source.product_id
            AND target.is_current="Y"
        )

        WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
        THEN UPDATE SET
            target.end_date   = DATEADD(SECOND, -1, @current_date),
            target.is_current="N"

        WHEN NOT MATCHED BY TARGET
        THEN INSERT (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            product_id, product_name, category, sub_category,
            brand, unit_cost, unit_price, supplier_id,
            is_active, created_at, updated_at
        ) VALUES (
            @batch_id,
            'business_db.dbo.products',
            source.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            source.product_id,
            source.product_name,
            source.category,
            source.sub_category,
            source.brand,
            source.unit_cost,
            source.unit_price,
            source.supplier_id,
            source.is_active,
            source.created_at,
            source.updated_at
        )

        WHEN NOT MATCHED BY SOURCE
             AND target.is_current="Y"
        THEN UPDATE SET
            target.end_date   = @current_date,
            target.is_current="N";

        -- ================================================
		-- 插入【被关闭的旧记录对应的新版本数据】
		-- ================================================
		INSERT INTO dbo.products_scd2 (
			psa_batch_id, psa_source_table, psa_checksum,
			start_date, end_date, is_current,
			product_id, product_name, category, sub_category,
			brand, unit_cost, unit_price, supplier_id,
			is_active, created_at, updated_at
		)
		SELECT
			@batch_id,
			'business_db.dbo.products',
			s.psa_checksum,
			@current_date,
			@far_future,
			'Y',
			s.product_id,
			s.product_name,
			s.category,
			s.sub_category,
			s.brand,
			s.unit_cost,
			s.unit_price,
			s.supplier_id,
			s.is_active,
			s.created_at,
			s.updated_at
		FROM (
			SELECT
				product_id, product_name, category, sub_category,
				brand, unit_cost, unit_price, supplier_id,
				is_active, created_at, updated_at,
				CONVERT(VARCHAR(32), HASHBYTES('MD5',
					CONCAT(
						ISNULL(product_name, N''),
						N'|', ISNULL(category, N''),
						N'|', ISNULL(sub_category, N''),
						N'|', ISNULL(brand, N''),
						N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
						N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
						N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
						N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
					)
				), 2) AS psa_checksum
			FROM business_db.dbo.products
		) s
		INNER JOIN dbo.products_scd2 t
			ON s.product_id = t.product_id
			AND t.is_current="N"
			AND t.end_date = DATEADD(SECOND, -1, @current_date)
		WHERE NOT EXISTS (
			SELECT 1
			FROM dbo.products_scd2
			WHERE product_id = s.product_id AND is_current="Y"
		);



        SELECT @rows_inserted = COUNT(*)
        FROM dbo.products_scd2
        WHERE psa_batch_id = @batch_id;

        UPDATE etl_db.dbo.etl_log
        SET end_time = GETDATE(), rows_inserted = @rows_inserted, status="SUCCESS"
        WHERE batch_id = @batch_id AND table_name="products_scd2";

        PRINT N'✅ 商品表 SCD2 加载完成,插入 ' + CAST(@rows_inserted AS VARCHAR);

    END TRY
    BEGIN CATCH
        SET @error_msg = ERROR_MESSAGE();
        UPDATE etl_db.dbo.etl_log
        SET end_time = GETDATE(), status="FAILED", error_message = @error_msg
        WHERE batch_id = @batch_id AND table_name="products_scd2";
        THROW;
    END CATCH;
END;
GO

Demonstrasi skenario: menyelesaikan siklus hidup ETL

Langkah 0: Persiapan Lingkungan

Pastikan itu telah dibuat business_db(Perpustakaan Bisnis) dan psa_db(Perpustakaan PSA), baca penjelasan rinci tentang lapisan PSA (tempat penyimpanan sementara persistensi) untuk struktur Tabel tertentu. Jika Anda belum menjalankan skrip untuk menambahkan data pengujian, Anda harus menjalankannya terlebih dahulu. Jika sudah berjalan sesuai artikel, TRUNCATE dulu data yang ada di tabel bisnis, lalu buat ulang data pengujiannya. Script persiapannya adalah sebagai berikut:

-- 清理已有测试数据
TRUNCATE TABLE [business_db].[dbo].[customers]
TRUNCATE TABLE [business_db].[dbo].[orders]
TRUNCATE TABLE [business_db].[dbo].[products]

TRUNCATE TABLE [psa_db].[dbo].[customers_scd2]
TRUNCATE TABLE [psa_db].[dbo].[orders_scd2]
TRUNCATE TABLE [psa_db].[dbo].[products_scd2]


-- 插入模拟业务数据

USE business_db;
GO

-- 客户数据
INSERT INTO dbo.customers (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type, is_active)
VALUES
(N'C001', N'张三', 'zhangsan@email.com', '13800138001', N'北京市朝阳区建国路88号', N'北京', N'华北', '2023-01-15', 'individual', 1),
(N'C002', N'李四科技有限公司', 'lisi@company.com', '13800138002', N'上海市浦东新区张江路100号', N'上海', N'华东', '2023-03-20', 'enterprise', 1),
(N'C003', N'王五', 'wangwu@email.com', '13800138003', N'广州市天河区体育西路103号', N'广州', N'华南', '2023-06-10', 'individual', 1),
(N'C004', N'赵六集团', 'zhaoliu@group.com', '13800138004', N'深圳市南山区科技园南路10号', N'深圳', N'华南', '2023-08-05', 'enterprise', 1),
(N'C005', N'钱七', 'qianqi@email.com', '13800138005', N'杭州市西湖区文三路478号', N'杭州', N'华东', '2024-01-20', 'individual', 1);
GO

-- 商品数据
INSERT INTO dbo.products (product_id, product_name, category, sub_category, brand, unit_cost, unit_price, supplier_id, is_active)
VALUES
('P001', N'iPhone 15 Pro', N'电子产品', N'手机', N'Apple', 6500.00, 8999.00, 'S001', 1),
('P002', N'MacBook Pro 16', N'电子产品', N'笔记本', N'Apple', 15000.00, 19999.00, 'S001', 1),
('P003', N'AirPods Pro 2', N'电子产品', N'耳机', N'Apple', 1200.00, 1899.00, 'S001', 1),
('P004', N'华为Mate 60 Pro', N'电子产品', N'手机', N'华为', 4500.00, 6999.00, 'S002', 1),
('P005', N'戴森吸尘器V15', N'家用电器', N'吸尘器', N'Dyson', 2800.00, 4999.00, 'S003', 1),
('P006', N'索尼WH-1000XM5', N'电子产品', N'耳机', N'Sony', 1800.00, 2999.00, 'S004', 1),
('P007', N'小米空气净化器', N'家用电器', N'空气净化器', N'小米', 600.00, 1299.00, 'S005', 1),
('P008', N'iPad Air 5', N'电子产品', N'平板', N'Apple', 3200.00, 4799.00, 'S001', 1);
GO

-- 订单数据
INSERT INTO dbo.orders (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O001', 'C001', 'P001', '2024-01-15', 1, 8999.00, 8999.00, 'shipped'),
('O002', 'C002', 'P002', '2024-01-16', 5, 19999.00, 99995.00, 'confirmed'),
('O003', 'C001', 'P003', '2024-01-17', 2, 1899.00, 3798.00, 'pending'),
('O004', 'C003', 'P004', '2024-01-18', 1, 6999.00, 6999.00, 'shipped'),
('O005', 'C004', 'P005', '2024-01-19', 3, 4999.00, 14997.00, 'confirmed'),
('O006', 'C001', 'P002', '2024-01-20', 1, 19999.00, 19999.00, 'pending'),
('O007', 'C005', 'P006', '2024-01-21', 1, 2999.00, 2999.00, 'cancelled'),
('O008', 'C002', 'P001', '2024-01-22', 10, 8999.00, 89990.00, 'confirmed'),
('O009', 'C003', 'P007', '2024-01-23', 2, 1299.00, 2598.00, 'shipped'),
('O010', 'C004', 'P008', '2024-01-24', 4, 4799.00, 19196.00, 'pending');
GO

-- 验证数据
SELECT 'orders' AS table_name, COUNT(*) AS row_count FROM dbo.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM dbo.customers
UNION ALL
SELECT 'products', COUNT(*) FROM dbo.products;
GO

Langkah 1: Pemuatan Awal (Hari 1)

-- ============================================================
-- 第一天:初始全量加载
-- ============================================================

DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240115_001';

PRINT N'========== 第一天初始加载 ==========';
PRINT N'批次号: ' + @batch_id;

EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;

Status tabel pelanggan setelah dimuat pada hari pertama:

id_pelanggan nama_pelanggan alamat tanggal_mulai tanggal_akhir adalah_saat ini
C001 Zhang San No.88, Jalan Jianguo, Distrik Chaoyang, Beijing 15-01-2024 9999-12-31 Y
C002 Li Si Technology Co., Ltd. No.100, Jalan Zhangjiang, Area Baru Pudong, Shanghai 15-01-2024 9999-12-31 Y
C003 Wang Wu No.103, Jalan Tiyu Barat, Distrik Tianhe, Guangzhou 15-01-2024 9999-12-31 Y
C004 Grup Zhaoliu No.10, Science Park South Road, Distrik Nanshan, Shenzhen 15-01-2024 9999-12-31 Y
C005 Qian Qi No.478, Jalan Wensan, Distrik Xihu, Hangzhou 15-01-2024 9999-12-31 Y

Langkah 2: Simulasikan perubahan bisnis (hari kedua)

-- ============================================================
-- 第二天:模拟业务系统发生变更
-- ============================================================

USE business_db;
GO

-- 变更①:客户属性变化 — 张三搬家
UPDATE dbo.customers
SET address = N'北京市朝阳区望京SOHO',
    updated_at = GETDATE()
WHERE customer_id = 'C001';

-- 变更②:新增客户 — 孙八
INSERT INTO dbo.customers
    (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type)
VALUES
    (N'C006', N'孙八', 'sunba@email.com', '13800138006',
     N'成都市高新区天府大道888号', N'成都', N'西南', '2024-01-25', 'individual');

-- 变更③:客户注销(软删除)
UPDATE dbo.customers
SET is_active = 0,
    updated_at = GETDATE()
WHERE customer_id = 'C005';

-- 变更④:订单状态流转
UPDATE dbo.orders
SET status="shipped",
    updated_at = GETDATE()
WHERE order_id IN ('O003', 'O006');

-- 变更⑤:商品价格调整
UPDATE dbo.products
SET unit_price = 6499.00,
    updated_at = GETDATE()
WHERE product_id = 'P004';  -- 华为Mate 60 Pro 降价

-- 变更⑥:新订单
INSERT INTO dbo.orders
    (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
    ('O011', 'C001', 'P004', '2024-01-25', 2, 6499.00, 12998.00, 'pending');

Ringkasan perubahan bisnis:

Ubah jenis kelamin kunci bisnis Ubah kontennya
perubahan properti Babak 001 Zhang San Alamat: Jalan No.88 Jianguo → Wangjing SOHO
Tambahkan pelanggan baru Bab 006 Sunba Akun pelanggan baru
hapus dengan lembut Bab 005 Qian Qi is_aktif:1 → 0
Status pesanan O003/O006 tertunda → terkirim
penyesuaian harga produk P004 harga_satuan: 6999 → 6499
pesanan baru O011 pesanan baru

Langkah 3: ETL dimuat pada hari berikutnya

-- ============================================================
-- 第二天:SCD2增量加载
-- ============================================================

DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240125_002';

PRINT N'========== 第二天增量加载 ==========';
PRINT N'批次号: ' + @batch_id;

EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;

Rantai versi lengkap setelah pengunggahan hari berikutnya:

【客户 C001 — 张三】(属性变化)
├── V1: 建国路88号 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: 望京SOHO  | 2024-01-25 → 9999-12-31          | Y(当前)

【客户 C005 — 钱七】(软删除)
└── V1: 西湖区文三路478号 | 2024-01-15 → 2024-01-25 | N(已删除,无新版本)

【客户 C006 — 孙八】(新增)
└── V1: 天府大道888号 | 2024-01-25 → 9999-12-31 | Y(当前)

【订单 O003 — 状态流转】
├── V1: status=pending | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: status=shipped | 2024-01-25 → 9999-12-31          | Y(当前)

【商品 P004 — 华为Mate 60 Pro】(价格变更)
├── V1: unit_price=6999 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: unit_price=6499 | 2024-01-25 → 9999-12-31          | Y(当前)

【其余客户/订单/商品】→ 无变化,不产生新记录 ✅

Metode pertanyaan

Pertanyaan 1: Data valid sekarang

-- 查询当前所有有效客户(去历史版本)
SELECT
    customer_id,
    customer_name,
    address,
    is_active,
    start_date AS effective_from
FROM psa_db.dbo.customers_scd2
WHERE is_current="Y"
ORDER BY customer_id;

Pertanyaan 2: Jepretan kapan saja

-- 查询2024-01-20各客户的地址(张三变更前的快照)
DECLARE @as_of_date DATETIME = '2024-01-20 23:59:59';

SELECT
    customer_id,
    customer_name,
    address,
    city
FROM psa_db.dbo.customers_scd2
WHERE start_date <= @as_of_date
  AND end_date   >  @as_of_date;

Pertanyaan 3: Riwayat perubahan pelanggan (rantai versi lengkap)

-- 查询C001的完整变更记录
SELECT
    customer_id,
    customer_name,
    address,
    customer_type,
    is_active,
    start_date,
    end_date,
    CASE
        WHEN end_date="9999-12-31 23:59:59" THEN '🔵 当前版本'
        WHEN is_active = 0 THEN '🔴 已删除'
        ELSE '⚪ 历史版本'
    END AS version_status
FROM psa_db.dbo.customers_scd2
WHERE customer_id = 'C001'
ORDER BY start_date;

Pertanyaan 4: Lengkapi urutan siklus hidup

-- 查询O003订单的完整状态流转
SELECT
    order_id,
    customer_id,
    product_id,
    quantity,
    total_amount,
    status,
    start_date AS 状态生效时间,
    end_date   AS 状态失效时间,
    CASE
        WHEN end_date="9999-12-31 23:59:59" THEN '🔵 当前状态'
        ELSE '⚪ 已流转'
    END AS version_status
FROM psa_db.dbo.orders_scd2
WHERE order_id = 'O003'
ORDER BY start_date;

Pertanyaan 5: Mempertanyakan sejarah harga produk

-- 查询P004的历史价格变动
SELECT
    product_id,
    product_name,
    unit_price,
    brand,
    start_date AS 生效日期,
    CASE
        WHEN end_date="9999-12-31 23:59:59" THEN '当前价格'
        ELSE CONVERT(VARCHAR(10), DATEADD(DAY, -1, end_date), 120)
    END AS 失效日期,
    CASE
        WHEN end_date="9999-12-31 23:59:59" THEN '🔵'
        ELSE '⚪'
    END AS flag
FROM psa_db.dbo.products_scd2
WHERE product_id = 'P004'
ORDER BY start_date;

Pertanyaan 6: Ubah laporan statistik

-- 统计各批次的新增/变更/删除情况
SELECT
    batch_id,
    table_name,
    rows_inserted AS 新增或变更行数,
    rows_updated  AS 关闭的历史行数,
    status,
    CONVERT(VARCHAR(19), start_time, 120) AS 开始时间,
    CONVERT(VARCHAR(19), end_time, 120)   AS 结束时间,
    DATEDIFF(SECOND, start_time, end_time) AS 耗时_秒
FROM etl_db.dbo.etl_log
WHERE layer_name="psa_scd2"
ORDER BY start_time DESC;

Metode snapshot vs metode SCD2: perbandingan komprehensif

ukuran Metode snapshot (teks asli) Pendekatan SCD2 (artikel ini)
menulis strategi MASUKKAN N baris sepenuhnya setiap kali Hanya baris tulis yang diubah
model penyimpanan Urutan horizontal (N batch = N × jumlah baris) Rantai versi vertikal (hanya perubahan yang disimpan)
Ubah granularitas pelacakan Level batch (hanya mengetahui “ada perubahan dalam batch tertentu”) Tingkat baris (mengetahui secara pasti “bagian data yang berubah”)
Ubah kontennya Tidak dikenal Perbandingan lengkap versi lama + versi baru
Pertanyaan sejarah Filter berdasarkan batch, tidak akurat start_date/end_date Interval presisi
Tingkatkan pengakuan psa_operation='I' is_current="Y" dan diinstal untuk pertama kalinya
Ubah pengakuan Tidak dikenal Checksum memicu SCD2 dengan cara yang berbeda
Hapus pengakuan psa_operation='D'(membutuhkan perawatan tambahan) Pengenalan alami (Tidak kompatibel dengan SUMBER)
biaya penyimpanan O(n × jumlah_batch) O(n × rata-rata_perubahan_hitungan)
Kompleksitas implementasi RENDAH (PILIH MASUKKAN) Sedang (MERGE + logika masa berlaku)
Skenario yang berlaku Tabel kecil, perubahan frekuensi rendah, pembuatan prototipe cepat Tabel besar, perubahan sering, kepatuhan audit
Konsumsi hilir Sederhana (ambil batch terbaru) Perlu memfilter is_current="Y"
Ketertelusuran data Terbatas (ukuran batch) Penuh (dimensi ganda garis + waktu)

meringkaskan

Melalui artikel ini, Anda dapat melihat keseluruhan proses SCD2 dan implementasi spesifiknya di SQL Server.
SCD2 adalah solusi dasar untuk melacak riwayat data warehouse. Setelah menguasai artikel ini, data warehouse juga tersedia dengan cara yang lebih efisien.“Mesin Waktu”——Anda dapat kembali kapan saja dan kapan saja untuk melihat status data yang benar.

OMONG-OMONG:
Untuk meringkas tujuan artikel ini, pertama-tama saya akan membuat ringkasan pengalaman kerja saya sebelumnya. Kedua, jika saya harus melakukan hal yang sama di masa mendatang, menggunakan alat AI untuk merujuk ke URL artikel ini akan membantu AI menghasilkan kode yang diinginkan dengan lebih efisien dan akurat.


tautan terkait

PakarPBN

A Private Blog Network (PBN) is a collection of websites that are controlled by a single individual or organization and used primarily to build backlinks to a “money site” in order to influence its ranking in search engines such as Google. The core idea behind a PBN is based on the importance of backlinks in Google’s ranking algorithm. Since Google views backlinks as signals of authority and trust, some website owners attempt to artificially create these signals through a controlled network of sites.

In a typical PBN setup, the owner acquires expired or aged domains that already have existing authority, backlinks, and history. These domains are rebuilt with new content and hosted separately, often using different IP addresses, hosting providers, themes, and ownership details to make them appear unrelated. Within the content published on these sites, links are strategically placed that point to the main website the owner wants to rank higher. By doing this, the owner attempts to pass link equity (also known as “link juice”) from the PBN sites to the target website.

The purpose of a PBN is to give the impression that the target website is naturally earning links from multiple independent sources. If done effectively, this can temporarily improve keyword rankings, increase organic visibility, and drive more traffic from search results.

Jasa Backlink

Download Anime Batch