Enable CDC for your source database. To do it, connect to the database and run the following command at a SQL prompt or in a terminal:
USE DATABASE_NAME GO EXEC sys.sp_cdc_enable_db GO
Replace
DATABASE_NAME
with the name of your source database.Set the retention period for which you want the changes to be available on your source:
USE DATABASE_NAME EXEC sys.sp_cdc_change_job @job_type = 'capture' , @pollinginterval = 86399 EXEC sp_cdc_stop_job 'capture' EXEC sp_cdc_start_job 'capture'
The
@pollinginterval
parameter is measured in seconds with a recommended value set to86399
. This means that the transaction log retains changes for 86,399 seconds (one day). Executing thesp_cdc_start_job 'capture
procedure initiates the settings.Enable CDC on the tables for which you need to capture changes:
USE DATABASE_NAME EXEC sys.sp_cdc_enable_table @source_schema = N'SCHEMA_NAME', @source_name = N'TABLE_NAME', @role_name = NULL GO
Replace the following:
DATABASE_NAME
: the name of your source databaseSCHEMA_NAME
: the name of the schema to which the tables belongTABLE_NAME
: the name of the table for which you want to enable CDC
Enable snapshot isolation:
When you backfill data from your SQL Server database, it's important to ensure consistent snapshots. If you don't apply the settings described further in this section, changes made to the database during the backfill process might lead to duplicates or incorrect results, in particular, for tables without primary keys.
Enabling snapshot isolation creates a temporary view of your database at the start of the backfill process. This ensures that the data being copied remains consistent, even if other users are making changes to the live tables at the same time. Enabling snapshot isolation might have a slight performance impact, but it's essential for reliable data extraction.
To enable snapshot isolation:
- Connect to your database using a SQL Server client.
- Run the following command:
ALTER DATABASE DATABASE_NAME SET ALLOW_SNAPSHOT_ISOLATION ON;
Replace DATABASE_NAME with the name of you database.
Set up a log truncation safeguard
To make sure that the CDC reader has enough time to read the logs while allowing log truncation to prevent using up the storage space, you can set up a log truncation safeguard:
- Connect to the database using a SQL Server client.
Create a dummy table in the database:
USE DATABASE_NAME; CREATE TABLE dbo.gcp_datastream_truncation_safeguard ( [id] INT IDENTITY(1,1) PRIMARY KEY, CreatedDate DATETIME DEFAULT GETDATE(), [char_column] CHAR(8) );
Replace DATABASE_NAME with the name of the database in which you want to create your dummy table.
Create a stored procedure that runs an active transaction for a period of 24 hours to prevent log truncation:
CREATE PROCEDURE dbo.DatastreamLogTruncationSafeguard AS BEGIN -- Start a new transaction BEGIN TRANSACTION; INSERT INTO dbo.gcp_datastream_truncation_safeguard (char_column) VALUES ('a') -- Wait for one day before ending the transaction WAITFOR DELAY '23:59'; -- Commit the transaction COMMIT TRANSACTION; END;
Create another stored procedure. This time, you create a job that runs the stored procedure that you created in the previous step on a daily basis:
CREATE PROCEDURE dbo.SetUpDatastreamJob AS BEGIN DECLARE @database_name VARCHAR(MAX) Set @database_name = (SELECT DB_NAME());; DECLARE @command_str VARCHAR(MAX); Set @command_str = CONCAT('Use ', @database_name,'; exec dbo.DatastreamLogTruncationSafeguard') DECLARE @job_name_1 VARCHAR(MAX); Set @job_name_1 = CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob1') -- Schedule the procedure to run again tomorrow IF NOT EXISTS ( select * from msdb.dbo.sysjobs WHERE name = @job_name_1 ) BEGIN EXEC msdb.dbo.sp_add_job @job_name = @job_name_1, @enabled = 1, @description = N'Execute the procedure every day' ; EXEC msdb.dbo.sp_add_jobstep @job_name = @job_name_1, @step_name = N'Execute_DatastreamLogTruncationSafeguard1', @subsystem = N'TSQL', @command = @command_str; -- Add a schedule that runs the stored procedure every day. DECLARE @start_time_1 time; SET @start_time_1 = DATEADD(MINUTE, 1, GETDATE()); DECLARE @schedule_name_1 VARCHAR(MAX); Set @schedule_name_1 = CONCAT(@database_name, '_', 'DatastreamEverydaySchedule1') DECLARE @formatted_start_time_1 INT; SET @formatted_start_time_1 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_1, 114), ':' ,'')); EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_1, @freq_type = 4, -- daily start @freq_interval = 1, @active_start_time = @formatted_start_time_1; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name_1, @schedule_name = @schedule_name_1 ; -- Add a schedule that runs the stored procedure on the SQL Server Agent startup. DECLARE @schedule_name_agent_startup VARCHAR(MAX); Set @schedule_name_agent_startup = CONCAT(@database_name, '_', 'DatastreamSqlServerAgentStartupSchedule') EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_agent_startup, @freq_type = 64, -- start on SQL Server Agent startup @active_start_time = @formatted_start_time_1; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name_1, @schedule_name = @schedule_name_agent_startup ; EXEC msdb.dbo.sp_add_jobserver @job_name = @job_name_1, @server_name = @@servername ; END DECLARE @job_name_2 VARCHAR(MAX); Set @job_name_2 = CONCAT(@database_name, '_', 'DatastreamLogTruncationSafeguardJob2') IF NOT EXISTS ( select * from msdb.dbo.sysjobs WHERE name = @job_name_2 ) BEGIN EXEC msdb.dbo.sp_add_job @job_name = @job_name_2, @enabled = 1, @description = N'Procedure execution every day' ; EXEC msdb.dbo.sp_add_jobstep @job_name = @job_name_2, @step_name = N'Execute_DatastreamLogTruncationSafeguard2', @subsystem = N'TSQL', @command = @command_str; DECLARE @start_time_2 time; SET @start_time_2 = DATEADD(HOUR, 12, GETDATE()); DECLARE @formatted_start_time_2 INT; SET @formatted_start_time_2 = CONVERT(INT, REPLACE(CONVERT(VARCHAR(8), @start_time_2, 114), ':' ,'')); DECLARE @schedule_name_2 VARCHAR(MAX); Set @schedule_name_2 = CONCAT(@database_name, '_', 'DatastreamEverydaySchedule2') EXEC msdb.dbo.sp_add_schedule @schedule_name = @schedule_name_2, @freq_type = 4, -- daily start @freq_interval = 1, @active_start_time = @formatted_start_time_2; EXEC msdb.dbo.sp_attach_schedule @job_name = @job_name_2, @schedule_name = @schedule_name_2 ; EXEC msdb.dbo.sp_add_jobserver @job_name = @job_name_2, @server_name = @@servername ; END End;
Execute the stored procedure that creates the Datastream job.
EXEC dbo.SetUpDatastreamJob
Create a Datastream user:
Connect to the source database and enter the following command:
USE DATABASE_NAME; ```
Create a login to use while setting up the connection profile in Datastream.
CREATE LOGIN YOUR_LOGIN WITH PASSWORD = 'PASSWORD';
Create a user and assign the
db_owner
anddb_denydatawriter
roles to it:CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN;
EXEC sp_addrolemember 'db_owner', 'USER_NAME'; EXEC sp_addrolemember 'db_denydatawriter', 'USER_NAME';
Add this user to the
master
database and assign the following permissions to them:USE master; CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN; GRANT VIEW SERVER STATE TO YOUR_LOGIN; GRANT SELECT ON sys.fn_dblog TO USER_NAME;
Add this user to the msdb database and assign the following permissions to them:
USE msdb; CREATE USER USER_NAME FOR LOGIN YOUR_LOGIN; GRANT SELECT ON dbo.sysjobs TO USER_NAME;
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2024-04-24 UTC.
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]