配置 Amazon RDS SQL Server 数据库

以下步骤介绍了如何配置 Amazon RDS SQL Server 数据库以与 Datastream 搭配使用:

  1. 为源数据库启用变更数据捕获 (CDC)。如需为源数据库启用 CDC,请连接到数据库,然后在 SQL 提示符、终端或 Amazon RDS 信息中心中运行以下命令:

    EXEC msdb.dbo.rds_cdc_enable_db 'DATABASE_NAME'
    

    DATABASE_NAME 替换为源数据库的名称。

  2. 为需要捕获更改的每个表启用 CDC:

    USE [DATABASE_NAME]
    EXEC sys.sp_cdc_enable_table
    @source_schema = N'SCHEMA_NAME',
    @source_name = N'TABLE_NAME',
    @role_name = NULL
    GO
    

    替换以下内容:

    • DATABASE_NAME:源数据库的名称
    • SCHEMA_NAME:表所属架构的名称
    • TABLE_NAME:您要为其启用 CDC 的表的名称
  3. 启动 SQL Server Agent 并确保它始终处于运行状态。如果 SQL Server 代理长时间处于停机状态,日志可能会被截断,导致 Datastream 未读取的更改数据永久丢失。

    如需了解如何运行 SQL Server Agent,请参阅启动、停止或重启 SQL Server Agent 实例

  4. 启用快照隔离。

    从 SQL Server 数据库回填数据时,请务必确保快照一致。如果您不应用本部分中所述的设置,在回填过程中对数据库所做的更改可能会导致重复或错误的结果,尤其是对于没有主键的表。

    启用快照隔离后,系统会在回填流程开始时创建数据库的临时视图。这样可确保被复制的数据保持一致,即使其他用户同时更改实时表也是如此。启用快照隔离功能可能会对性能产生轻微影响,但对于可靠的数据提取至关重要。

    如需启用快照隔离,请执行以下操作:

    1. 使用 SQL Server 客户端连接到您的数据库。
    2. 运行以下命令:
    ALTER DATABASE DATABASE_NAME SET ALLOW_SNAPSHOT_ISOLATION ON;
    

    DATABASE_NAME 替换为数据库的名称。

  5. 创建 Datastream 用户:

    1. 连接到源数据库,然后输入以下命令:

      USE DATABASE_NAME;
      
    2. 创建一个登录名,以便在 Datastream 中设置连接配置文件时使用。

      CREATE LOGIN YOUR_LOGIN WITH PASSWORD = 'PASSWORD';
      
    3. 创建用户并为其分配 db_ownerdb_denydatawriter 角色:

      CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
      
      EXEC sp_addrolemember 'db_owner', 'USER_NAME';
      EXEC sp_addrolemember 'db_denydatawriter', 'USER_NAME';
      
    4. 将此用户添加到 master 数据库:

      USE master;
      CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
      

事务日志 CDC 方法所需的额外步骤

只有在配置您的源 SQL Server 数据库以与事务日志 CDC 方法搭配使用时,您才需要执行本部分中介绍的步骤。

  1. sys.fn_dblog 函数授予 SELECT 权限。

    USE master;
    GRANT SELECT ON sys.fn_dblog TO USER_NAME;
    
  2. 将您的用户添加到 msdb 数据库,并向其分配以下权限:

    USE msdb;
    CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
    GRANT SELECT ON dbo.sysjobs TO USER_NAME;
    
  3. master 数据库中向您的用户分配以下权限:

      USE master;
      GRANT VIEW SERVER STATE TO YOUR_LOGIN;
    
  4. 设置您希望更改在来源中保留期限。

    USE [DATABASE_NAME]
    EXEC sys.sp_cdc_change_job @job_type = 'capture' , @pollinginterval = 86399
    EXEC sp_cdc_stop_job 'capture'
    EXEC sp_cdc_start_job 'capture'
    

    @pollinginterval 参数以秒为单位,建议将其值设为 86399。这意味着,事务日志会将更改保留 86,399 秒(1 天)。执行 sp_cdc_start_job 'capture 过程会启动设置。

  5. 设置日志截断防范措施。

    为确保 CDC 读取器有足够的时间读取日志,同时允许日志截断以防止耗尽存储空间,您可以设置日志截断保护措施:

    1. 使用 SQL Server 客户端连接到数据库。
    2. 在数据库中创建虚拟表:

      USE [DATABASE_NAME];
      CREATE TABLE dbo.gcp_datastream_truncation_safeguard (
        [id] INT IDENTITY(1,1) PRIMARY KEY,
        CreatedDate DATETIME DEFAULT GETDATE(),
        [char_column] CHAR(8)
        );
      
    3. 创建一个会在您指定的时间段内运行活跃事务来防止日志截断的存储过程:

      CREATE PROCEDURE [dbo].[DatastreamLogTruncationSafeguard] @transaction_logs_retention_time INT
      AS
      BEGIN
        -- Start a new transaction
        BEGIN TRANSACTION;
        INSERT INTO dbo.gcp_datastream_truncation_safeguard (char_column) VALUES ('a')
      
      DECLARE @formatted_time VARCHAR(5)
      SET @formatted_time = CONVERT(VARCHAR(5), DATEADD(MINUTE, @transaction_logs_retention_time, 0), 108);
        -- Wait for X minutes before ending the transaction
        WAITFOR DELAY @formatted_time;
        -- Commit the transaction
        COMMIT TRANSACTION;
      END;
      
    4. 创建另一个存储过程。这次,您将创建一个作业,以便根据指定的节奏运行您在上一步中创建的存储过程:

      CREATE PROCEDURE [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time INT
      AS
      BEGIN
        DECLARE @database_name VARCHAR(MAX)
        SET @database_name =  (SELECT DB_NAME());;
      
        DECLARE @command_str VARCHAR(MAX);
        SET @command_str = CONCAT('Use ', @database_name,'; exec dbo.DatastreamLogTruncationSafeguard @transaction_logs_retention_time = ' + CAST(@transaction_logs_retention_time AS VARCHAR(10)));
      
        DECLARE @job_name VARCHAR(MAX);
      SET @job_name =
        CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob1')
      
          -- Add 3 schedules to the job to run again after specified time.
          IF
            NOT EXISTS(
              SELECT *
              FROM msdb.dbo.sysjobs
              WHERE name = @job_name
            )
              BEGIN
                EXEC
                  msdb.dbo.sp_add_job
                    @job_name
        = @job_name,
        @enabled = 1,
        @description = N'Execute the procedure to run an active transaction for x minutes.';
      
      EXEC msdb.dbo.sp_add_jobstep @job_name = @job_name,
      @step_name = N'Execute_DatastreamLogTruncationSafeguard',
      @subsystem = N'TSQL',
      @command = @command_str;
      
        -- Add a schedule that runs the stored procedure every given minutes starting now.
        DECLARE @schedule_name_1 VARCHAR(MAX);
        SET @schedule_name_1 = CONCAT(@database_name, '_', 'DatastreamEveryGivenMinutesFromNow')
      
        DECLARE @start_time_1 time;
        SET @start_time_1 = DATEADD(SECOND, 1, GETDATE());
        DECLARE @formatted_start_time_1 INT;
        SET @formatted_start_time_1 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_1, 114), ':' ,''));
      
        EXEC msdb.dbo.sp_add_schedule
        @schedule_name = @schedule_name_1,
        @freq_type = 4,  -- daily start
        @freq_subday_type = 4,  -- every X minutes daily
        @freq_interval = 1,
        @freq_subday_interval = @transaction_logs_retention_time,
        @active_start_time = @formatted_start_time_1;
      
        EXEC msdb.dbo.sp_attach_schedule
        @job_name = @job_name,
        @schedule_name = @schedule_name_1 ;
      
        -- Add a schedule that runs the stored procedure after every given minutes starting after some delay.
        DECLARE @schedule_name_2 VARCHAR(MAX);
        Set @schedule_name_2 = CONCAT(@database_name, '_', 'DatastreamEveryGivenMinutesAfterDelay');
      
        DECLARE @start_time_2 time;
        SET @start_time_2 = DATEADD(MINUTE, @transaction_logs_retention_time / 2, GETDATE());
      
        DECLARE @formatted_start_time_2 INT;
        SET @formatted_start_time_2 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_2, 114), ':' ,''));
      
        EXEC msdb.dbo.sp_add_schedule
        @schedule_name = @schedule_name_2,
        @freq_type = 4,  -- daily start
        @freq_subday_type = 4,  -- every x minutes daily
        @freq_interval = 1,
        @freq_subday_interval = @transaction_logs_retention_time,
        @active_start_time = @formatted_start_time_2;
      
        EXEC msdb.dbo.sp_attach_schedule
        @job_name = @job_name,
        @schedule_name = @schedule_name_2 ;
      
        -- Add a schedule that runs the stored procedure on the SQL Server Agent startup.
        DECLARE @schedule_name_agent_startup VARCHAR(MAX);
        Set @schedule_name_agent_startup = CONCAT(@database_name, '_', 'DatastreamSqlServerAgentStartupSchedule')
      
        EXEC msdb.dbo.sp_add_schedule
        @schedule_name = @schedule_name_agent_startup,
        @freq_type = 64,  -- start on SQL Server Agent startup
        @active_start_time = @formatted_start_time_1;
      
        EXEC msdb.dbo.sp_attach_schedule
        @job_name = @job_name,
        @schedule_name = @schedule_name_agent_startup ;
      
        EXEC msdb.dbo.sp_add_jobserver
        @job_name = @job_name,
        @server_name = @@servername ;
        END
      END;
      
    5. 执行会创建 Datastream 作业的存储过程。

      DECLARE @transaction_logs_retention_time INT = (INT)
      EXEC [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time
      

      INT 替换为您要保留日志的时长(以分钟为单位)。例如:

      • 60 的值将保留时间设置为 1 小时
      • 24 * 60 的值将保留时间设置为 1 天
      • 3 * 24 * 60 的值将保留时间设置为 3 天

后续步骤