Sqlserver
 sql >> Datenbank >  >> RDS >> Sqlserver

Implementieren des inkrementellen Ladens mit Change Data Capture in SQL Server

Dieser Artikel wird für diejenigen interessant sein, die sich häufig mit Datenintegration auseinandersetzen müssen.

Einführung

Angenommen, es gibt eine Datenbank, in der Benutzer Daten immer ändern (aktualisieren oder entfernen). Möglicherweise wird diese Datenbank von einer großen Anwendung verwendet, die keine Änderung der Tabellenstruktur zulässt. Die Aufgabe besteht darin, von Zeit zu Zeit Daten aus dieser Datenbank in eine andere Datenbank auf einem anderen Server zu laden. Der einfachste Weg, das Problem anzugehen, besteht darin, die neuen Daten aus einer Quelldatenbank in eine Zieldatenbank zu laden und die Zieldatenbank vorab zu bereinigen. Sie können diese Methode verwenden, solange die Zeit zum Laden der Daten akzeptabel ist und die voreingestellten Fristen nicht überschreitet. Was ist, wenn das Laden der Daten mehrere Tage dauert? Außerdem führen instabile Kommunikationskanäle dazu, dass das Laden von Daten stoppt und neu startet. Wenn Sie auf diese Hindernisse stoßen, schlage ich vor, einen der Algorithmen zum erneuten Laden von Daten in Betracht zu ziehen. Das bedeutet, dass seit dem letzten Laden nur Datenänderungen geladen wurden.

CDC

In SQL Server 2008 hat Microsoft einen Datenverfolgungsmechanismus namens Change Data Capture (CDC) eingeführt. Im Großen und Ganzen besteht das Ziel dieses Mechanismus darin, dass durch Aktivieren von CDC für jede Datenbanktabelle eine Systemtabelle in derselben Datenbank mit einem ähnlichen Namen wie die ursprüngliche Tabelle erstellt wird (das Schema lautet wie folgt:„cdc“ als Präfix plus die Erweiterung „ alter Schemaname plus "_" und das Ende "_CT". Beispiel:Die ursprüngliche Tabelle ist dbo.Example, dann heißt die Systemtabelle cdc.dbo_Example_CT). Es speichert alle geänderten Daten.

Betrachten Sie das Beispiel, um tiefer in CDC einzutauchen. Stellen Sie jedoch zunächst sicher, dass der SQL-Agent, der CDC verwendet, auf der SQL Server-Testinstanz funktioniert.

Darüber hinaus betrachten wir ein Skript, das eine Datenbank und eine Testtabelle erstellt, diese Tabelle mit Daten füllt und CDC für diese Tabelle aktiviert.

Um die Aufgabe zu verstehen und zu vereinfachen, verwenden wir eine SQL Server-Instanz, ohne die Quell- und Zieldatenbanken auf verschiedene Server zu verteilen.

mastergo verwenden -- eine Quelldatenbank erstellen, falls nicht vorhanden (* aus sys.databases auswählen, wobei name ='db_src_cdc') Datenbank db_src_cdcgouse db_src_cdcgo erstellen - CDC aktivieren, wenn sie deaktiviert ist, falls nicht vorhanden (* aus sys.databases auswählen, wobei name =db_name() und is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo – erstelle eine Rolle für Tabellen mit CDCif nicht vorhanden (wähle * aus sys.sysusers aus, wobei name ='CDC_Reader' und issqlrole=1) erstelle Rolle CDC_Readergo – erstelle eine Tabelleif object_id('dbo.Example','U') ist null Tabelle dbo.Example erstellen ( ID int Identitätseinschränkung PK_Example Primärschlüssel, Title varchar(200) not null )go-- Tabelle fülleninsert dbo.Example (Title) values( 'One'),('Two'),('Three'),('Four'),('Five');go-- CDC für die Tabelle aktivieren, falls nicht vorhanden (select * from sys.tables where is_tracked_by_cdc =1 und name ='Example') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Example', @role_name ='CDC_Reader'go - füllen Sie die Tabelle mit einigen Daten. Wir werden etwas ändern oder löschenupdate dbo.Beispielset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Beispiel (ID, Titel) values(1,'One'),(6,'Six');set identity_insert dbo.Example off;go

Schauen wir uns nun an, was wir nach der Ausführung dieses Skripts in den Tabellen dbo.Example und cdc.dbo_Example_CT haben (es sollte beachtet werden, dass CDC asynchron ist. Daten werden in die Tabellen gefüllt, in denen die Änderungsverfolgung nach einer bestimmten Zeit gespeichert wird ).

select * from dbo.Example;
ID Titel ---- ---------------------- 1 Eins 3 eerhT 4 ruoF 5 Fünf 6 Sechs
select row_number() over ( Partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, *from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Titel------ --------------------- - ----------- ---------------------- ------------ ---- . 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Vier 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0.x0.x 

Betrachten Sie im Detail die Tabellenstruktur, in der die Änderungsverfolgung gespeichert ist. Die Felder __ $start_lsn und __ $seqval sind LSN (Protokollsequenznummer in der Datenbank) bzw. die Transaktionsnummer innerhalb der Transaktion. In diesen Feldern gibt es eine wichtige Eigenschaft, nämlich dass wir sicher sein können, dass der Eintrag mit einer höheren LSN später durchgeführt wird. Aufgrund dieser Eigenschaft können wir leicht den neuesten Status jedes Datensatzes in der Abfrage abrufen, indem wir unsere Auswahl nach der Bedingung filtern – wobei __ $ rn =1.

Das Feld __$operation enthält den Transaktionscode:

  • 1 – der Datensatz wird gelöscht
  • 2 – der Datensatz wird eingefügt
  • 3, 4 – der Datensatz wird aktualisiert. Die alten Daten vor dem Update sind 3, die neuen Daten sind 4.

Neben Servicefeldern mit Präfix «__$» werden die Felder der Originaltabelle komplett dupliziert. Diese Informationen reichen aus, um mit dem inkrementellen Laden fortzufahren.

Einrichten einer Datenbank zum Laden von Daten

Erstellen Sie eine Tabelle in unserer Testzieldatenbank, in die Daten geladen werden, sowie eine zusätzliche Tabelle, um Daten über das Ladeprotokoll zu speichern.

mastergo verwenden – eine Zieldatenbank erstellen, falls nicht vorhanden (* aus sys.databases auswählen, wobei name ='db_dst_cdc') Datenbank db_dst_cdcgouse db_dst_cdcgo erstellen - eine Tabelle erstellen, wenn object_id('dbo.Example','U') ist null create table dbo.Example ( ID int Constraint PK_Example primary key, Title varchar(200) not null )go-- erstelle eine Tabelle zum Speichern des Ladeprotokolls, wenn object_id('dbo.log_cdc','U') null ist create table dbo .log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), Constraint pk_log_cdc primary key (table_name,dt desc) )go

Ich möchte Ihre Aufmerksamkeit auf die Felder der Tabelle LOG_CDC lenken:

  • TABLE_NAME speichert Informationen darüber, welche Tabelle geladen wurde (es ist möglich, in Zukunft mehrere Tabellen aus verschiedenen Datenbanken oder sogar von verschiedenen Servern zu laden; das Tabellenformat ist „SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME“
  • DT ist ein Feld des Ladedatums und der Ladezeit, das für das inkrementelle Laden optional ist. Es wird jedoch nützlich sein, um das Laden zu prüfen.
  • LSN – nachdem eine Tabelle geladen wurde, müssen wir bei Bedarf Informationen darüber speichern, wo der nächste Ladevorgang beginnen soll. Dementsprechend fügen wir nach jedem Ladevorgang die neuesten (maximal) __ $ start_lsn in diese Spalte ein.

Algorithmus zum Laden von Daten

Wie oben beschrieben, können wir mithilfe der Abfrage mithilfe von Fensterfunktionen den neuesten Stand der Tabelle abrufen. Wenn wir die LSN des letzten Ladevorgangs kennen, können wir beim nächsten Laden alle Daten aus der Quelle herausfiltern, deren Änderungen höher sind als die gespeicherte LSN, wenn es mindestens einen vollständigen vorherigen Ladevorgang gab:

with incr_Example as( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$ start_lsn> @lsn)select * from incr_Example

Dann können wir alle Datensätze für die vollständige Ladung erhalten, wenn die Ladungs-LSN nicht gespeichert ist:

with incr_Example as( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$ start_lsn> @lsn), full_Example as( select * from db_src_cdc.dbo.Example where @lsn is null)select ID, Title, __$operationfrom incr_Examplewhere __$rn =1union allselect ID, Title, 2 as __$operationfrom full_Example 

Je nach @LSN-Wert zeigt diese Abfrage also entweder alle letzten Änderungen (unter Umgehung der vorläufigen) mit dem Status Entfernt oder nicht an, oder alle Daten aus der ursprünglichen Tabelle, wobei Status 2 (neuer Datensatz) hinzugefügt wird – dieses Feld wird nur zum Vereinigen von zwei Auswahlen verwendet. Mit dieser Abfrage können wir mit dem MERGE-Befehl (ab SQL 2008-Version) problemlos entweder das vollständige Laden oder das erneute Laden implementieren.

Um Engpässe zu vermeiden, die zu alternativen Prozessen führen können, und um übereinstimmende Daten aus verschiedenen Tabellen zu laden (in Zukunft werden wir mehrere Tabellen laden und möglicherweise relationale Beziehungen zwischen ihnen bestehen), schlage ich vor, einen DB-Snapshot auf der Quelldatenbank zu verwenden ( ein weiteres Feature von SQL 2008).

Der vollständige Text der Ladung lautet wie folgt:

[expand title="Code"]

/* Algorithmus zum Laden von Daten*/-- Erstellen Sie einen Datenbank-Snapshot, falls vorhanden (wählen Sie * aus sys.databases, wobei Name ='db_src_cdc_ss' ) Löschen Sie die Datenbank db_src_cdc_ss;deklarieren Sie @query nvarchar(max);wählen Sie @query =N' Datenbank db_src_cdc_ss erstellen auf ( name =N'''+name+ ''', filename =N'''+[filename]+'.ss'' ) als Snapshot von db_src_cdc'aus db_src_cdc.sys.sysfiles wobei groupid =1; exec ( @query );-- Lies LSN aus dem vorherigen loaddeclare @lsn binary(10) =(select max(lsn) from db_dst_cdc.dbo.log_cdc where table_name ='localhost.db_src_cdc.dbo.Example');-- clear eine Tabelle vor dem vollständigen Laden, wenn @lsn null ist, kürzen Sie die Tabelle db_dst_cdc.dbo.Example;-- laden Sie den Prozess mit incr_Example as( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn , * from db_src_cdc_ss.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn> @lsn), full_Example as( select * from db_src_cdc_ss.dbo.Example where @lsn is null), cte_Example as( select ID, Title, __$operation from incr_Example where __$rn =1 union all select ID, Title, 2 as __$ operation from full_Example)merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.IDwhen matched and __$operation =1 dann löschen, wenn übereinstimmend und __$operation <> 1 dann aktualisieren set trg.Title =src.Titlewenn nicht mit Ziel übereinstimmend und __$operation <> 1 dann (ID, Titel) Werte einfügen (src.ID, src .Title);-- markiert das Ende des Ladevorgangs und die letzte LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn)values ​​('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))-- Löschen Sie den Datenbank-Snapshot, falls vorhanden (wählen Sie * aus sys.databases, wobei Name ='db_src_cdc_ss' ), ​​löschen Sie die Datenbank db_src_cdc_ss

[/expandieren]