為 CDC 設定自行管理的 SQL Server 資料庫

本頁面說明如何設定變更資料擷取 (CDC),將資料從自行管理的 SQL Server 資料庫串流至支援的目的地,例如 BigQuery 或 Cloud Storage。

  1. 為來源資料庫啟用 CDC。如要這麼做,請連線至資料庫,並在 SQL 提示或終端機中執行下列指令:

    USE [DATABASE_NAME]
    GO
    EXEC sys.sp_cdc_enable_db
    GO
    

    DATABASE_NAME 替換為來源資料庫的名稱。

  2. 為需要擷取變更的資料表啟用 CDC:

    USE [DATABASE_NAME]
    EXEC sys.sp_cdc_enable_table
    @source_schema = N'SCHEMA_NAME',
    @source_name = N'TABLE_NAME',
    @role_name = NULL
    GO
    

    更改下列內容:

    • DATABASE_NAME:來源資料庫的名稱
    • SCHEMA_NAME:資料表所屬的結構定義名稱
    • TABLE_NAME:要啟用 CDC 的資料表名稱
  3. 啟動 SQL Server 代理程式,並確認代理程式持續運作。如果 SQL Server Agent 長時間處於停機狀態,記錄可能會遭到截斷,導致 Datastream 未讀取的變更資料永久遺失。

    如要瞭解如何執行 SQL Server 代理程式,請參閱「啟動、停止或重新啟動 SQL Server 代理程式執行個體」。

  4. 啟用快照隔離功能。

    從 SQL Server 資料庫回填資料時,請務必確保快照一致。如果您未套用本節所述設定,在回填程序期間對資料庫所做的變更可能會導致重複或不正確的結果,尤其是沒有主鍵的資料表。

    啟用快照隔離後,系統會在回填程序開始時建立資料庫的暫時檢視畫面。即使其他使用者同時變更即時資料表,也能確保複製的資料保持一致。啟用快照隔離可能會稍微影響效能,但這是可靠擷取資料的必要條件。

    如要啟用快照隔離功能,請按照下列步驟操作:

    1. 透過 SQL Server 用戶端連結至資料庫。
    2. 執行下列指令:
    ALTER DATABASE DATABASE_NAME SET ALLOW_SNAPSHOT_ISOLATION ON;
    

    DATABASE_NAME 替換成資料庫名稱。

  5. 建立 Datastream 使用者:

    1. 連線至來源資料庫,然後輸入下列指令:

      USE DATABASE_NAME;
      
    2. 建立登入資料,在 Datastream 中設定連線設定檔時使用。

      CREATE LOGIN YOUR_LOGIN WITH PASSWORD = 'PASSWORD';
      
    3. 建立使用者:

      CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
      
    4. 為他們指派 db_datareader 角色:

      EXEC sp_addrolemember 'db_datareader', 'USER_NAME';
      
    5. 授予 VIEW DATABASE STATE 權限:

      GRANT VIEW DATABASE STATE TO USER_NAME;
      
    6. 將這位使用者新增至 master 資料庫:

      USE master;
      CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
      

交易記錄 CDC 方法的額外步驟

只有在設定來源 SQL Server 資料庫,以搭配使用交易記錄 CDC 方法時,才需要執行本節所述步驟。

  1. 連線至來源資料庫,並將 db_ownerdb_denydatawriter 角色指派給使用者:

    USE DATABASE_NAME;
    EXEC sp_addrolemember 'db_owner', 'USER_NAME';
    EXEC sp_addrolemember 'db_denydatawriter', 'USER_NAME';
    
  2. 授予 SELECT 函式的 sys.fn_dblog 權限。

    USE master;
    GRANT SELECT ON sys.fn_dblog TO USER_NAME;
    
  3. 將使用者新增至 msdb 資料庫,並指派下列權限:

    USE msdb;
    CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
    GRANT SELECT ON dbo.sysjobs TO USER_NAME;
    
  4. master 資料庫中,為使用者指派下列權限:

      USE master;
      GRANT VIEW SERVER STATE TO YOUR_LOGIN;
    
  5. 設定保留期限,指定變更內容在來源中可用的時間長度。

    USE [DATABASE_NAME]
    EXEC sys.sp_cdc_change_job @job_type = 'capture' , @pollinginterval = 86399
    EXEC sp_cdc_stop_job 'capture'
    EXEC sp_cdc_start_job 'capture'
    

    @pollinginterval 參數以秒為單位,建議將值設為 86399。也就是說,交易記錄會保留 86,399 秒 (一天) 的變更。執行 sp_cdc_start_job 'capture 程序會啟動設定。

  6. 如果資料庫正在執行任何清除或擷取工作,請停止這些工作。 詳情請參閱「管理及監控異動資料擷取」。

  7. 設定記錄檔截斷保護機制。

    為確保 CDC 讀取器有足夠時間讀取記錄檔,同時允許記錄檔截斷,以免用盡儲存空間,您可以設定記錄檔截斷保護措施:

    1. 透過 SQL Server 用戶端連線至資料庫。
    2. 建立會執行有效交易的預存程序,以防記錄檔遭到截斷:

      CREATE PROCEDURE dbo.DatastreamLogTruncationSafeguard @transaction_logs_retention_time INT
      AS
      BEGIN
      
      DECLARE @transactionLog TABLE (beginLSN BINARY(10), endLSN BINARY(10))
      INSERT @transactionLog EXEC sp_repltrans
      
      DECLARE @currentDateTime DATETIME = GETDATE()
      DECLARE @cutoffDateTime DATETIME = DATEADD(MINUTE, -@transaction_logs_retention_time, @currentDateTime)
      
      DECLARE @firstValidLSN BINARY(10) = NULL
      DECLARE @lastValidLSN BINARY(10) = NULL
      DECLARE @firstTxnTime DATETIME = NULL
      DECLARE @lastTxnTime DATETIME = NULL
      
      SELECT TOP 1
          @lastTxnTime = t.logStartTime,
          @lastValidLSN = t.beginLSN
      FROM (
        SELECT
          beginLSN AS beginLSN,
          (SELECT TOP 1 [begin time]
          FROM fn_dblog(stuff(stuff(CONVERT(CHAR(24), beginLSN, 1), 19, 0, ':'), 11, 0, ':'), DEFAULT)) AS logStartTime
        FROM @transactionLog
      ) t
      ORDER BY t.beginLSN DESC
      
      -- If all transactions are before cutoff, clear everything
      IF (@lastTxnTime < @cutoffDateTime)
      BEGIN
          EXEC sp_repldone NULL, NULL, 0, 0, 1
      END
      ELSE
      BEGIN
          -- Find the earliest transaction
          SELECT TOP 1
            @firstTxnTime = t.logStartTime,
            @firstValidLSN = ISNULL(@firstValidLSN, t.beginLSN)
          FROM (
            SELECT
              beginLSN AS beginLSN,
              (SELECT TOP 1 [begin time]
              FROM fn_dblog(stuff(stuff(CONVERT(CHAR(24), beginLSN, 1), 19, 0, ':'), 11, 0, ':'), DEFAULT)) AS logStartTime
            FROM @transactionLog
          ) t
          ORDER BY t.beginLSN ASC
      
          IF (@firstTxnTime < @cutoffDateTime)
          BEGIN
              -- Identify the earliest and latest LSNs within VLogs before cutoff
              SELECT
                @firstValidLSN = SUBSTRING(MAX(t.lsnMarkers), 1, 10),
                @lastValidLSN = SUBSTRING(MAX(t.lsnMarkers), 11, 10)
              FROM (
                SELECT MIN(beginLSN + endLSN) AS lsnMarkers
                FROM @transactionLog
                GROUP BY SUBSTRING(beginLSN, 1, 4)
              ) t
              WHERE (
                SELECT TOP 1 [begin time]
                FROM fn_dblog(stuff(stuff(CONVERT(CHAR(24), t.lsnMarkers, 1), 19, 0, ':'), 11, 0, ':'), DEFAULT)
                WHERE Operation = 'LOP_BEGIN_XACT'
              ) < @cutoffDateTime
      
              EXEC sp_repldone @firstValidLSN, @lastValidLSN, 0, 0, 0
          END
        END
      END;
      
    3. 建立另一個預存程序。這次,您要建立工作,根據指定的頻率執行在上一個步驟中建立的預存程序:

      CREATE PROCEDURE [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time INT
      AS
      BEGIN
      
      DECLARE @database_name VARCHAR(MAX)
        SET @database_name =  (SELECT DB_NAME());;
      
        DECLARE @command_str VARCHAR(MAX);
        SET @command_str = CONCAT('Use ', @database_name,'; exec dbo.DatastreamLogTruncationSafeguard @transaction_logs_retention_time = ' + CAST(@transaction_logs_retention_time AS VARCHAR(10)));
      
        DECLARE @job_name VARCHAR(MAX);
      SET @job_name =
        CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob1')
          DECLARE @current_time INT
        = CAST(FORMAT(GETDATE(), 'HHmmss') AS INT);
      
        -- Schedule the procedure to run after every 5 minutes.
        IF NOT EXISTS (
          SELECT * FROM msdb.dbo.sysjobs
          WHERE name = @job_name
        )
        BEGIN
          EXEC msdb.dbo.sp_add_job
          @job_name = @job_name,
          @enabled = 1,
          @description = N'Execute the procedure every 5 minutes.' ;
      
          EXEC msdb.dbo.sp_add_jobstep
          @job_name =  @job_name,
          @step_name = N'Execute_DatastreamLogTruncationSafeguard',
          @subsystem = N'TSQL',
          @command = @command_str;
      
            DECLARE @schedule_name_1 VARCHAR(MAX);
          SET @schedule_name_1 = CONCAT(@database_name, '_', 'DatastreamEveryFiveMinutesSchedule')
      
          EXEC msdb.dbo.sp_add_schedule
          @schedule_name = @schedule_name_1,
          @freq_type = 4,  -- daily start
          @freq_subday_type = 4,  -- every X minutes daily
          @freq_interval = 1,
          @freq_subday_interval = 5,
          @active_start_time = @current_time;
      
          EXEC msdb.dbo.sp_attach_schedule
          @job_name = @job_name,
          @schedule_name = @schedule_name_1 ;
      
          -- Add a schedule that runs the stored procedure on the SQL Server Agent startup.
          DECLARE @schedule_name_agent_startup VARCHAR(MAX);
          SET @schedule_name_agent_startup = CONCAT(@database_name, '_', 'DatastreamSqlServerAgentStartupSchedule')
      
          EXEC msdb.dbo.sp_add_schedule
          @schedule_name = @schedule_name_agent_startup,
          @freq_type = 64,  -- start on SQL Server Agent startup
          @active_start_time = @current_time;
      
          EXEC msdb.dbo.sp_attach_schedule
          @job_name = @job_name,
          @schedule_name = @schedule_name_agent_startup ;
      
          EXEC msdb.dbo.sp_add_jobserver
          @job_name = @job_name,
          @server_name = @@servername ;
        END
      END;
      
    4. 執行會建立 Datastream 工作的預存程序。

      DECLARE @transaction_logs_retention_time INT = (INT)
      EXEC [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time
      

      請將 INT 替換為您要保留記錄的分鐘數。例如:

      • 60 的值會將保留時間設為 1 小時
      • 24 * 60 的值會將保留時間設為 1 天
      • 3 * 24 * 60 的值會將保留時間設為 3 天

後續步驟