Documentation Index
Fetch the complete documentation index at: https://docs.popsink.com/llms.txt
Use this file to discover all available pages before exploring further.
The MSSQL Source Connector is a powerful and efficient solution designed to facilitate seamless data integration between your Microsoft SQL Server (MSSQL) databases and Popsink. This connector helps you synchronize with Microsoft SQL Server in real-time, ensuring that your target systems remain updated with the latest information in real-time, driving business intelligence, analytics, and operational efficiency.
Table of Contents
- Key Features
- Security
- Prerequisites
- Creating a Dedicated Role
- Enabling CDC
- Housekeeping Scripts
Key Features
- Real-time Change Data Capture (CDC): the MSSQL Source Connector employs a CDC mechanism using the native Microsoft SQL Server logical decoding feature, capturing and streaming changes (inserts, updates, and deletes) as they occur in your database.
- Fault-tolerant and Scalable: The MSSQL Source Connector is built with fault tolerance and scalability in mind. It is capable of resuming data replication from the last known offset in case of failures, ensuring data consistency and minimal downtime.
- Initial Load: The connector automatically performs an initial full table load.
- Advanced Filtering: The MSSQL Source Connector provides a range of filtering options, including table and schema filters, allowing you to selectively replicate specific tables and schemas based on your needs.
Security
The connector supports SSL/TLS encryption for secure communication between the connector and your Microsoft SQL Server database.
Prerequisites
Relevant Roles
- sysadmin: role required for enabling CDC at the database level.
- db_owner: role required for enabling CDC at the table level.
Creating a Dedicated Role
This step creates a dedicated replication ROLE, assigns it to your USER and grants it the necessary permissions to work with
Steps
- Create the Replication Role:
USE [{{your_database}}];
GO
CREATE ROLE cdc_role;
GO
- Assign the Replication Role to a User:
USE [{{your_database}}];
GO
EXEC sp_addrolemember 'cdc_role', '{{your_user}}';
GO
- Grant CDC Permissions:
USE [{{your_database}}];
GO
GRANT EXECUTE ON SCHEMA::cdc TO cdc_role;
GO
- Grant Permissions to Check Version, Encryption, and Track Schema:
USE master;
GO
GRANT VIEW SERVER STATE TO cdc_role;
GO
USE {{your_database}};
GO
GRANT SELECT ON OBJECT::INFORMATION_SCHEMA.TABLES TO cdc_role;
GO
Enabling CDC
To enable CDC for your data, you need to do two things: 1) Enable CDC at the Database level, and 2) Enable CDC at the Table level.
For Databases
-
Enable CDC on a Database:
USE [{{your_database}}];
GO
EXEC sys.sp_cdc_enable_db;
GO
-
Enable CDC on Multiple Databases:
DECLARE @databases CURSOR;
SET @databases = CURSOR FOR
SELECT name FROM sys.databases WHERE name IN ({{your_database_list}});
OPEN @databases;
DECLARE @dbName NVARCHAR(50);
FETCH NEXT FROM @databases INTO @dbName;
WHILE @@FETCH_STATUS = 0
BEGIN
EXEC('USE ' + @dbName + '; EXEC sys.sp_cdc_enable_db;');
FETCH NEXT FROM @databases INTO @dbName;
END;
CLOSE @databases;
DEALLOCATE @databases;
For Tables
-
Enable CDC on a Table:
USE [{{your_database}}];
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'{{your_table}}',
@role_name = N'cdc_role', -- use @role_name = NULL if you haven't set up a role
@supports_net_changes = 1;
GO
-
Enable CDC on multiple Tables:
DECLARE @tables CURSOR;
SET @tables = CURSOR FOR
SELECT schema_name(t.schema_id) as schema_name, t.name as table_name
FROM sys.tables t
WHERE t.name IN ({{your_table_list}});
OPEN @tables;
DECLARE @schemaName NVARCHAR(50);
DECLARE @tableName NVARCHAR(50);
FETCH NEXT FROM @tables INTO @schemaName, @tableName;
WHILE @@FETCH_STATUS = 0
BEGIN
EXEC('USE {{your_database}}; EXEC sys.sp_cdc_enable_table @source_schema = ''' + @schemaName + ''', @source_name = ''' + @tableName + ''', @role_name = ''cdc_user'' + @supports_net_changes = 1;');
FETCH NEXT FROM @tables INTO @schemaName, @tableName;
END;
CLOSE @tables;
DEALLOCATE @tables;
For All
- Enable CDC on multiple Tables in multiple Databases:
-- Enable CDC on all databases'
DECLARE @dbName NVARCHAR(128);
DECLARE @tableName NVARCHAR(128);
DECLARE @sql NVARCHAR(MAX);
-- Cursor for all databases
DECLARE db_cursor CURSOR FOR
SELECT name
FROM sys.databases
-- [Optional] WHERE name LIKE 'something-%';
-- Open the cursor and fetch the first database
OPEN db_cursor;
FETCH NEXT FROM db_cursor INTO @dbName;
-- Loop through all databases
WHILE @@FETCH_STATUS = 0
BEGIN
-- Enable CDC on the database if not already enabled
SET @sql = 'IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = ''' + @dbName + ''' AND is_cdc_enabled = 1)
BEGIN
EXEC(''USE [' + @dbName + ']; EXEC sys.sp_cdc_enable_db'')
END';
EXEC sp_executesql @sql;
-- Cursor for all tables in the current database
DECLARE table_cursor CURSOR FOR
SELECT QUOTENAME(SCHEMA_NAME(schema_id)) + '.' + QUOTENAME(name)
FROM sys.tables
WHERE is_tracked_by_cdc = 0; -- Only select tables not already tracked by CDC, you may add more conditions
-- Open the cursor for tables
SET @sql = 'USE [' + @dbName + ']';
EXEC sp_executesql @sql;
OPEN table_cursor;
FETCH NEXT FROM table_cursor INTO @tableName;
-- Loop through all tables
WHILE @@FETCH_STATUS = 0
BEGIN
-- Enable CDC on the table
SET @sql = 'USE [' + @dbName + ']; EXEC sys.sp_cdc_enable_table @source_schema = N''' + SCHEMA_NAME(schema_id) + ''', @source_name = N''' + @tableName + ''', @role_name = NULL, @supports_net_changes = 1';
EXEC sp_executesql @sql;
-- Fetch the next table
FETCH NEXT FROM table_cursor INTO @tableName;
END;
-- Close and deallocate the table cursor
CLOSE table_cursor;
DEALLOCATE table_cursor;
-- Fetch the next database
FETCH NEXT FROM db_cursor INTO @dbName;
END;
-- Close and deallocate the database cursor
CLOSE db_cursor;
DEALLOCATE db_cursor;
Housekeeping Scripts
Here are a few useful scripts that can help you manage the CDC lifecycle.
Purging Logs
Enabling Logs can take up space on your database so it’s best practice to purge them regularly. There is a tradeoff between the log retention and the operational resilience of your replication pipeline. A longer retention means you will need more time to recover from outages before having to resync. Shorter retention takes up less space but gives you less time to recover.
- Purge CDC Logs:
-- Set the database and the retention period
USE YourDatabaseName;
GO
DECLARE @retentionDays INT = 7; -- Set the retention period in days
-- Convert retention days to a cutoff LSN
DECLARE @retentionDateTime DATETIME = DATEADD(DAY, -@retentionDays, GETDATE());
DECLARE @cutoffLSN BINARY(10);
-- Retrieve the minimum LSN based on the retention date
SELECT @cutoffLSN = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @retentionDateTime);
-- Check if the cutoff LSN is valid
IF @cutoffLSN IS NOT NULL
BEGIN
DECLARE @capture_instance NVARCHAR(200);
DECLARE @sql NVARCHAR(MAX);
-- Cursor to go through each capture instance
DECLARE instance_cursor CURSOR FOR
SELECT capture_instance
FROM cdc.change_tables;
OPEN instance_cursor;
FETCH NEXT FROM instance_cursor INTO @capture_instance;
WHILE @@FETCH_STATUS = 0
BEGIN
-- Generate the cleanup command for each capture instance
SET @sql = N'EXEC sys.sp_cdc_cleanup_change_table @capture_instance = ''' + @capture_instance + ''', @low_water_mark = ''' + CONVERT(NVARCHAR(100), @cutoffLSN, 1) + ''';';
-- Execute the cleanup command
EXEC sp_executesql @sql;
FETCH NEXT FROM instance_cursor INTO @capture_instance;
END;
CLOSE instance_cursor;
DEALLOCATE instance_cursor;
PRINT 'CDC logs purged successfully up to ' + CONVERT(NVARCHAR(30), @retentionDateTime, 121);
END
ELSE
BEGIN
PRINT 'No valid LSN found for the specified retention period.';
END
GO
Check if CDC is Enabled
-
Check if it is Enabled on a Database:
USE master
GO
select name, is_cdc_enabled
from sys.databases
where name = '{{your_database}}'
GO
0 means CDC is not enabled, 1 means it is.
-
Check if it is Enabled on a Table:
USE [{{your_database}}];
GO
select name,type,type_desc,is_tracked_by_cdc
from sys.tables
where name = ‘<table_name>’
GO
0 means CDC is not enabled, 1 means it is.
Disable CDC
- Disable CDC on a Database:
USE [{{your_database}}];
GO
EXEC sys.sp_cdc_disable_db;
GO
- Disable CDC on a Table:
USE [{{your_database}}];
GO
EXEC sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name = N'{{your_table}}',
@capture_instance = N'dbo_{{your_table}}';
GO