为源数据库启用 CDC。为此,请连接到数据库并在 SQL 提示符处或终端运行以下命令:
USE [DATABASE_NAME] GO EXEC sys.sp_cdc_enable_db GO
将
DATABASE_NAME
替换为您的源数据库的名称。设置保留期限,使更改在多长时间内出现在来源中:
USE [DATABASE_NAME] EXEC sys.sp_cdc_change_job @job_type = 'capture' , @pollinginterval = 86399 EXEC sp_cdc_stop_job 'capture' EXEC sp_cdc_start_job 'capture'
@pollinginterval
参数会以秒为单位进行测量,建议值设置为86399
。这意味着事务日志将更改保留 86399 秒(一天)。执行sp_cdc_start_job 'capture
过程会启动设置。在需要捕获更改的表上启用 CDC:
USE [DATABASE_NAME] EXEC sys.sp_cdc_enable_table @source_schema = N'SCHEMA_NAME', @source_name = N'TABLE_NAME', @role_name = NULL GO
请替换以下内容:
DATABASE_NAME
:源数据库的名称SCHEMA_NAME
:表所属的架构的名称TABLE_NAME
:您要为其启用 CDC 的表的名称
启用快照隔离:
从 SQL Server 数据库回填数据时,请务必确保一致的快照。如果您未应用本部分中进一步介绍的设置,则回填过程中对数据库所做的更改可能会导致重复或错误的结果,尤其是对于没有主键的表。
启用快照隔离会在回填过程开始时创建数据库的临时视图。这样可以确保复制的数据保持一致,即使其他用户同时对活跃表进行更改也是如此。启用快照隔离可能会对性能略有影响,但这对于可靠提取数据至关重要。
如需启用快照隔离,请执行以下操作:
- 使用 SQL Server 客户端连接到数据库。
- 运行以下命令:
ALTER DATABASE DATABASE_NAME SET ALLOW_SNAPSHOT_ISOLATION ON;
将 DATABASE_NAME 替换为您的数据库的名称。
设置日志截断保护措施
为了确保 CDC 读取器有足够的时间读取日志,同时允许截断日志以避免占用存储空间,您可以设置日志截断保护措施:
- 使用 SQL Server 客户端连接到数据库。
在数据库中创建虚拟表:
USE [DATABASE_NAME]; CREATE TABLE dbo.gcp_datastream_truncation_safeguard ( [id] INT IDENTITY(1,1) PRIMARY KEY, CreatedDate DATETIME DEFAULT GETDATE(), [char_column] CHAR(8) );
将 DATABASE_NAME 替换为您要在其中创建虚拟表的数据库的名称。
创建一个运行活跃事务 24 小时的存储过程,以防止日志被截断:
CREATE PROCEDURE dbo.DatastreamLogTruncationSafeguard AS BEGIN -- Start a new transaction BEGIN TRANSACTION; INSERT INTO dbo.gcp_datastream_truncation_safeguard (char_column) VALUES ('a') -- Wait for one day before ending the transaction WAITFOR DELAY '23:59'; -- Commit the transaction COMMIT TRANSACTION; END;
创建另一个存储过程。这一次,您将创建一个作业,每天运行您在上一步中创建的存储过程:
CREATE PROCEDURE dbo.SetUpDatastreamJob AS BEGIN DECLARE @database_name VARCHAR(MAX) Set @database_name = (SELECT DB_NAME());; DECLARE @command_str VARCHAR(MAX); Set @command_str = CONCAT('Use [', @database_name,']; exec dbo.DatastreamLogTruncationSafeguard') DECLARE @job_name_1 VARCHAR(MAX); Set @job_name_1 = CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob1') -- Schedule the procedure to run again tomorrow IF NOT EXISTS ( select * from msdb.dbo.sysjobs WHERE name = @job_name_1 ) BEGIN EXEC msdb.dbo.sp_add_job @job_name = @job_name_1, @enabled = 1, @description = N'Execute the procedure every day' ; EXEC msdb.dbo.sp_add_jobstep @job_name = @job_name_1, @step_name = N'Execute_DatastreamLogTruncationSafeguard1', @subsystem = N'TSQL', @command = @command_str; -- Add a schedule that runs the stored procedure every day. DECLARE @start_time_1 time; SET @start_time_1 = DATEADD(MINUTE, 1, GETDATE()); DECLARE @schedule_name_1 VARCHAR(MAX); Set @schedule_name_1 = CONCAT(@database_name, '_', 'DatastreamEverydaySchedule1') DECLARE @formatted_start_time_1 INT; SET @formatted_start_time_1 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_1, 114), ':' ,'')); EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_1, @freq_type = 4, -- daily start @freq_interval = 1, @active_start_time = @formatted_start_time_1; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name_1, @schedule_name = @schedule_name_1 ; -- Add a schedule that runs the stored procedure on the SQL Server Agent startup. DECLARE @schedule_name_agent_startup VARCHAR(MAX); Set @schedule_name_agent_startup = CONCAT(@database_name, '_', 'DatastreamSqlServerAgentStartupSchedule') EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_agent_startup, @freq_type = 64, -- start on SQL Server Agent startup @active_start_time = @formatted_start_time_1; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name_1, @schedule_name = @schedule_name_agent_startup ; EXEC msdb.dbo.sp_add_jobserver @job_name = @job_name_1, @server_name = @@servername ; END DECLARE @job_name_2 VARCHAR(MAX); Set @job_name_2 = CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob2') IF NOT EXISTS ( select * from msdb.dbo.sysjobs WHERE name = @job_name_2 ) BEGIN EXEC msdb.dbo.sp_add_job @job_name = @job_name_2, @enabled = 1, @description = N'Procedure execution every day' ; EXEC msdb.dbo.sp_add_jobstep @job_name = @job_name_2, @step_name = N'Execute_DatastreamLogTruncationSafeguard2', @subsystem = N'TSQL', @command = @command_str; DECLARE @start_time_2 time; SET @start_time_2 = DATEADD(HOUR, 12, GETDATE()); DECLARE @formatted_start_time_2 INT; SET @formatted_start_time_2 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_2, 114), ':' ,'')); DECLARE @schedule_name_2 VARCHAR(MAX); Set @schedule_name_2 = CONCAT(@database_name, '_', 'DatastreamEverydaySchedule2') EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_2, @freq_type = 4, -- daily start @freq_interval = 1, @active_start_time = @formatted_start_time_2; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name_2, @schedule_name = @schedule_name_2 ; EXEC msdb.dbo.sp_add_jobserver @job_name = @job_name_2, @server_name = @@servername ; END End;
执行用于创建 Datastream 作业的存储过程。
EXEC dbo.SetUpDatastreamJob
创建 Datastream 用户:
连接到源数据库并输入以下命令:
USE DATABASE_NAME; ```
创建在 Datastream 中设置连接配置文件时使用的登录信息。
CREATE LOGIN YOUR_LOGIN WITH PASSWORD = 'PASSWORD';
创建一个用户并为其分配
db_owner
和db_denydatawriter
角色:CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
EXEC sp_addrolemember 'db_owner', 'USER_NAME'; EXEC sp_addrolemember 'db_denydatawriter', 'USER_NAME';
将此用户添加到
master
数据库,并为其分配以下权限:USE master; CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN; GRANT VIEW SERVER STATE TO YOUR_LOGIN; GRANT SELECT ON sys.fn_dblog TO USER_NAME;
将此用户添加到 msdb 数据库,并为其分配以下权限:
USE msdb; CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN; GRANT SELECT ON dbo.sysjobs TO USER_NAME;
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
最后更新时间 (UTC):2024-05-18。
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"翻译问题"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"其他"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"易于理解"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"解决了我的问题"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"其他"
}]