本页面介绍如何配置变更数据捕获 (CDC),以将数据从 Amazon RDS SQL Server 数据库流式传输到支持的目标位置,例如 BigQuery 或 Cloud Storage。
- 为源数据库启用变更数据捕获 (CDC)。如需为源数据库启用 CDC,请连接到该数据库,然后在 SQL 提示符处、终端中或使用 Amazon RDS 控制面板运行以下命令: - EXEC msdb.dbo.rds_cdc_enable_db 'DATABASE_NAME'- 将 - DATABASE_NAME替换为源数据库的名称。
- 在需要捕获更改的每个表上启用 CDC: - USE [DATABASE_NAME] EXEC sys.sp_cdc_enable_table @source_schema = N'SCHEMA_NAME', @source_name = N'TABLE_NAME', @role_name = NULL GO- 替换以下内容: - DATABASE_NAME:源数据库的名称
- SCHEMA_NAME:表所属的架构的名称
- TABLE_NAME:您要为之启用 CDC 的表的名称
 
- 启动 SQL Server Agent 并确保它始终处于运行状态。如果 SQL Server 代理长时间保持关闭状态,日志可能会被截断,导致 Datastream 未读取的更改数据永久丢失。 - 如需了解如何运行 SQL Server Agent,请参阅启动、停止或重启 SQL Server Agent 实例。 
- 启用快照隔离。 - 从 SQL Server 数据库回填数据时,请务必确保快照的一致性。如果您不应用本部分中所述的设置,在回填过程中对数据库所做的更改可能会导致重复或错误的结果,尤其是对于没有主键的表。 - 启用快照隔离会在回填过程开始时创建数据库的临时视图。这样可确保所复制的数据保持一致,即使其他用户同时在对实时表进行更改也是如此。启用快照隔离可能会对性能产生轻微影响,但对于可靠的数据提取至关重要。 - 如需启用快照隔离,请执行以下操作: - 使用 SQL Server 客户端连接到您的数据库。
- 运行以下命令:
 - ALTER DATABASE DATABASE_NAME SET ALLOW_SNAPSHOT_ISOLATION ON;- 将 DATABASE_NAME 替换为您的数据库名称。 
- 创建 Datastream 用户: - 连接到源数据库并输入以下命令: - USE DATABASE_NAME;
- 创建登录信息,以便在 Datastream 中设置连接配置文件时使用。 - CREATE LOGIN YOUR_LOGIN WITH PASSWORD = 'PASSWORD';
- 创建用户并为其分配 - db_owner和- db_denydatawriter角色:- CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;- EXEC sp_addrolemember 'db_owner', 'USER_NAME'; EXEC sp_addrolemember 'db_denydatawriter', 'USER_NAME';
- 将此用户添加到 - master数据库:- USE master; CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
 
事务日志 CDC 方法所需的额外步骤
仅当您配置源 SQL Server 数据库以与事务日志 CDC 方法搭配使用时,才需要执行本部分中所述的步骤。
- 为 - sys.fn_dblog函数授予- SELECT权限。- USE master; GRANT SELECT ON sys.fn_dblog TO USER_NAME;
- 将您的用户添加到 msdb 数据库,并为其分配以下权限: - USE msdb; CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN; GRANT SELECT ON dbo.sysjobs TO USER_NAME;
- 在 - master数据库中为您的用户分配以下权限:- USE master; GRANT VIEW SERVER STATE TO YOUR_LOGIN;
- 设置轮询间隔,以便在您的来源中提供相应更改。 - USE [DATABASE_NAME] EXEC sys.sp_cdc_change_job @job_type = 'capture' , @pollinginterval = 86399 EXEC sp_cdc_stop_job 'capture' EXEC sp_cdc_start_job 'capture'- @pollinginterval参数以秒为单位进行衡量,建议值设置为- 86399。这意味着事务日志会保留 86,399 秒(一天)的更改。执行- sp_cdc_start_job 'capture过程会启动设置。
- 设置日志截断保护措施。 - 为了确保 CDC 读取器有足够的时间读取日志,同时允许日志截断以防止存储空间用尽,您可以设置日志截断安全措施: - 使用 SQL Server 客户端连接到数据库。
- 在数据库中创建虚拟表: - USE [DATABASE_NAME]; CREATE TABLE dbo.gcp_datastream_truncation_safeguard ( [id] INT IDENTITY(1,1) PRIMARY KEY, CreatedDate DATETIME DEFAULT GETDATE(), [char_column] CHAR(8) );
- 创建一个存储过程,该过程会运行您指定的活跃事务一段时间,以防止日志截断: - CREATE PROCEDURE [dbo].[DatastreamLogTruncationSafeguard] @transaction_logs_retention_time INT AS BEGIN -- Start a new transaction BEGIN TRANSACTION; INSERT INTO dbo.gcp_datastream_truncation_safeguard (char_column) VALUES ('a') DECLARE @formatted_time VARCHAR(5) SET @formatted_time = CONVERT(VARCHAR(5), DATEADD(MINUTE, @transaction_logs_retention_time, 0), 108); -- Wait for X minutes before ending the transaction WAITFOR DELAY @formatted_time; -- Commit the transaction COMMIT TRANSACTION; END;
- 创建另一个存储过程。这次,您将创建一个作业,该作业会按照指定的频次运行您在上一步中创建的存储过程: - CREATE PROCEDURE [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time INT AS BEGIN DECLARE @database_name VARCHAR(MAX) SET @database_name = (SELECT DB_NAME());; DECLARE @command_str VARCHAR(MAX); SET @command_str = CONCAT('Use ', @database_name,'; exec dbo.DatastreamLogTruncationSafeguard @transaction_logs_retention_time = ' + CAST(@transaction_logs_retention_time AS VARCHAR(10))); DECLARE @job_name VARCHAR(MAX); SET @job_name = CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob1') -- Add 3 schedules to the job to run again after specified time. IF NOT EXISTS( SELECT * FROM msdb.dbo.sysjobs WHERE name = @job_name ) BEGIN EXEC msdb.dbo.sp_add_job @job_name = @job_name, @enabled = 1, @description = N'Execute the procedure to run an active transaction for x minutes.'; EXEC msdb.dbo.sp_add_jobstep @job_name = @job_name, @step_name = N'Execute_DatastreamLogTruncationSafeguard', @subsystem = N'TSQL', @command = @command_str; -- Add a schedule that runs the stored procedure every given minutes starting now. DECLARE @schedule_name_1 VARCHAR(MAX); SET @schedule_name_1 = CONCAT(@database_name, '_', 'DatastreamEveryGivenMinutesFromNow') DECLARE @start_time_1 time; SET @start_time_1 = DATEADD(SECOND, 1, GETDATE()); DECLARE @formatted_start_time_1 INT; SET @formatted_start_time_1 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_1, 114), ':' ,'')); EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_1, @freq_type = 4, -- daily start @freq_subday_type = 4, -- every X minutes daily @freq_interval = 1, @freq_subday_interval = @transaction_logs_retention_time, @active_start_time = @formatted_start_time_1; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name, @schedule_name = @schedule_name_1 ; -- Add a schedule that runs the stored procedure after every given minutes starting after some delay. DECLARE @schedule_name_2 VARCHAR(MAX); Set @schedule_name_2 = CONCAT(@database_name, '_', 'DatastreamEveryGivenMinutesAfterDelay'); DECLARE @start_time_2 time; SET @start_time_2 = DATEADD(MINUTE, @transaction_logs_retention_time / 2, GETDATE()); DECLARE @formatted_start_time_2 INT; SET @formatted_start_time_2 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_2, 114), ':' ,'')); EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_2, @freq_type = 4, -- daily start @freq_subday_type = 4, -- every x minutes daily @freq_interval = 1, @freq_subday_interval = @transaction_logs_retention_time, @active_start_time = @formatted_start_time_2; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name, @schedule_name = @schedule_name_2 ; -- Add a schedule that runs the stored procedure on the SQL Server Agent startup. DECLARE @schedule_name_agent_startup VARCHAR(MAX); Set @schedule_name_agent_startup = CONCAT(@database_name, '_', 'DatastreamSqlServerAgentStartupSchedule') EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_agent_startup, @freq_type = 64, -- start on SQL Server Agent startup @active_start_time = @formatted_start_time_1; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name, @schedule_name = @schedule_name_agent_startup ; EXEC msdb.dbo.sp_add_jobserver @job_name = @job_name, @server_name = @@servername ; END END;
- 执行用于创建 Datastream 作业的存储过程。 - DECLARE @transaction_logs_retention_time INT = (INT) EXEC [dbo].[SetUpDatastreamJob] @transaction_logs_retention_time- 将 INT 替换为您要保留日志的分钟数。例如: - 值为 60时,保留时间设置为 1 小时
- 值为 24 * 60时,保留时间设置为 1 天
- 3 * 24 * 60的值将保留时间设置为 3 天
 
- 值为 
 
后续步骤
- 详细了解 Datastream 如何与 SQL Server 源搭配使用。