sql >> Database teknologi >  >> RDS >> Sqlserver

Implementering af inkrementel belastning ved hjælp af Change Data Capture i SQL Server

Denne artikel vil være interessant for dem, der ofte skal beskæftige sig med dataintegration.

Introduktion

Antag, at der er en database, hvor brugerne altid ændrer data (opdaterer eller fjerner). Måske bruges denne database af et stort program, der ikke tillader ændring af tabelstrukturen. Opgaven er at indlæse data fra denne database til en anden database på en anden server fra tid til anden. Den enkleste måde at løse problemet på er at indlæse de nye data fra en kildedatabase til en måldatabase med foreløbig oprydning i måldatabasen. Du kan bruge denne metode, så længe tiden til at indlæse data er acceptabel og ikke overskrider forudindstillede deadlines. Hvad hvis det tager flere dage at indlæse data? Derudover fører ustabile kommunikationskanaler til situationen, når dataindlæsningen stopper og genstarter. Hvis du står over for disse forhindringer, foreslår jeg, at du overvejer en af ​​'datagenindlæsning'-algoritmerne. Det betyder, at der kun er sket dataændringer, siden den seneste indlæsning er indlæst.

CDC

I SQL Server 2008 introducerede Microsoft en datasporingsmekanisme kaldet Change Data Capture (CDC). I store træk er formålet med denne mekanisme, at aktivering af CDC for enhver databasetabel vil skabe en systemtabel i den samme database med et lignende navn, som den oprindelige tabel har (skemaet vil være som følger:'cdc' som et præfiks plus gammelt skemanavn plus "_" og slutningen "_CT". For eksempel er den originale tabel dbo.Eksempel, så vil systemtabellen blive kaldt cdc.dbo_Example_CT). Det vil gemme alle de data, der er blevet ændret.

Faktisk, for at grave dybere i CDC, overvej eksemplet. Men sørg først for, at SQL Agent, der bruger CDC, fungerer på SQL Server-testinstansen.

Derudover vil vi overveje et script, der opretter en database og testtabel, udfylder denne tabel med data og aktiverer CDC for denne tabel.

For at forstå og forenkle opgaven vil vi bruge én SQL Server-instans uden at distribuere kilde- og måldatabaserne til forskellige servere.

brug mastergo-- opret en kildedatabase, hvis den ikke eksisterer (vælg * fra sys.databases, hvor navn ='db_src_cdc') opret databasen db_src_cdcgouse db_src_cdcgo-- aktiver CDC, hvis den er deaktiveret, hvis den ikke eksisterer (vælg * fra sys.databaser, hvor navn =db_name() og is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- opret en rolle for tabeller med CDCif ikke eksisterer (vælg * fra sys.sysusers hvor navn ='CDC_Reader' og issqlrole=1) opret rolle CDC_Readergo-- opret en tableif object_id('dbo.Example','U') er null create table dbo.Example ( ID int identitetsbegrænsning PK_Example primærnøgle, Titel varchar(200) ikke null )go-- udfyld tableinsert dbo.Example (Title) værdierne( 'One'),('To'),('Three'),('Fire'),('Fem');go-- aktiver CDC for tabellen, hvis den ikke eksisterer (vælg * fra sys.tables, hvor is_tracked_by_cdc =1 og navn ='Eksempel') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Eksempel', @role_name ='CDC_Reader'go-- udfyld tabellen med nogle data. Vi vil ændre eller slette nogetupdate dbo.Exampleset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Eksempel (ID, Titel) værdier(1,'One'),(6,'Six');set identity_insert dbo.Example off;go

Lad os nu se på, hvad vi har efter at have udført dette script i tabellerne dbo.Example og cdc.dbo_Example_CT (det skal bemærkes, at CDC er asynkront. Data udfyldes i tabellerne, hvor ændringssporingen gemmes efter et vist tidsrum ).

vælg * fra dbo.Eksempel;
ID Titel---- ---------------------- 1 En 3 eerhT 4 ruoF 5 Fem 6 Seks
vælg row_number() over (partition efter ID-rækkefølge efter __$start_lsn desc, __$seqval desc ) som __$rn, *fra cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Titel------ --------------------- ----------- ---------------------------- ------------ ---- ------------ --- ----------- 1 0x0000003a000000580005 NULL 0x0000003a000000580003 2 0x03 1 One 2 0x0000003a000000560006 NUL 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Fire 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A00000058003A0000058003A0000058003A00000580003 

Overvej i detaljer tabelstrukturen, hvori ændringssporing er gemt. Felterne __ $start_lsn og __ $seqval er LSN (log sekvensnummer i databasen) og transaktionsnummeret i transaktionen. Der er en vigtig egenskab i disse felter, nemlig at vi kan være sikre på, at posten med en højere LSN vil blive udført senere. På grund af denne egenskab kan vi nemt få den seneste tilstand for hver post i forespørgslen ved at filtrere vores valg efter betingelsen – hvor __ $ rn =1.

Feltet __$operation indeholder transaktionskoden:

  • 1 – posten er slettet
  • 2 – posten er indsat
  • 3, 4 – posten er opdateret. De gamle data før opdatering er 3, de nye data er 4.

Ud over servicefelter med præfikset «__$» er felterne i den originale tabel fuldstændigt duplikeret. Disse oplysninger er nok til, at vi kan fortsætte til den trinvise belastning.

Opsætning af en database til dataindlæsning

Opret en tabel i vores testmåldatabase, som data vil blive indlæst i, samt en ekstra tabel til at gemme data om indlæsningsloggen.

brug mastergo-- opret en måldatabase, hvis den ikke findes (vælg * fra sys.databases, hvor navn ='db_dst_cdc') opret databasen db_dst_cdcgouse db_dst_cdcgo-- opret en tableif object_id('dbo.Example','U') er null opret tabel dbo.Eksempel ( ID int begrænsning PK_Example primærnøgle, titel varchar(200) ikke null )go-- opret en tabel til at gemme load logif object_id('dbo.log_cdc','U') er null opret tabel dbo .log_cdc ( tabelnavn nvarchar(512) ikke null, dt datetime ikke null standard getdate(), lsn binær(10) ikke null standard(0x0), constraint pk_log_cdc primærnøgle (tabelnavn,dt desc) )go

Jeg vil gerne henlede din opmærksomhed på felterne i LOG_CDC-tabellen:

  • TABLE_NAME gemmer information om, hvilken tabel der blev indlæst (det er muligt at indlæse flere tabeller i fremtiden, fra forskellige databaser eller endda fra forskellige servere; tabelformatet er 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • DT er et felt med indlæsningsdato og -klokkeslæt, som er valgfrit for den trinvise belastning. Det vil dog være nyttigt til revision af indlæsning.
  • LSN – efter at en tabel er indlæst, skal vi gemme oplysninger om det sted, hvor den næste indlæsning skal startes, hvis det kræves. Derfor tilføjer vi efter hver indlæsning den seneste (maksimum) __ $ start_lsn til denne kolonne.

Algorithme for dataindlæsning

Som beskrevet ovenfor kan vi ved hjælp af forespørgslen få den seneste tilstand af tabellen ved hjælp af vinduesfunktioner. Hvis vi kender LSN for den seneste belastning, kan vi, næste gang vi indlæser, filtrere alle data fra kilden, hvis ændringer er højere end den lagrede LSN, hvis der var mindst én fuldstændig tidligere belastning:

med incr_Example as( vælg row_number() over (partition efter ID-rækkefølge efter __$start_lsn desc, __$seqval desc ) som __$rn, * fra db_src_cdc.cdc.dbo_Example_CT hvor __$operation <> 3 og __$ start_lsn> @lsn) vælg * fra incr_Example

Så kan vi få alle poster for den komplette belastning, hvis belastnings-LSN ikke er lagret:

med incr_Example as( vælg row_number() over (partition efter ID-rækkefølge efter __$start_lsn desc, __$seqval desc ) som __$rn, * fra db_src_cdc.cdc.dbo_Example_CT hvor __$operation <> 3 og __$ start_lsn> @lsn), full_Example as( vælg * fra db_src_cdc.dbo.Eksempel hvor @lsn er null) vælg ID, Titel, __$operationfrom incr_Examplewhere __$rn =1union altselect ID, Titel, 2 som __$operationfrom full_Example før> 

Afhængigt af @LSN-værdien viser denne forespørgsel enten alle de seneste ændringer (omgå de midlertidige) med status Fjernet eller ej, eller alle data fra den oprindelige tabel, tilføjer status 2 (ny post) – dette felt bruges kun til at forene to valg. Med denne forespørgsel kan vi nemt implementere enten fuld load eller genindlæse ved hjælp af kommandoen MERGE (startende med SQL 2008-versionen).

For at undgå flaskehalse, der kan skabe alternative processer og for at indlæse matchede data fra forskellige tabeller (i fremtiden vil vi indlæse flere tabeller, og der kan muligvis være relationelle relationer mellem dem), foreslår jeg at bruge et DB-snapshot på kildedatabasen ( en anden SQL 2008-funktion).

Den fulde tekst af belastningen er som følger:

[expand title="Kode"]

/* Algoritme for dataindlæsning*/-- opret et database-øjebliksbillede, hvis der findes (vælg * fra sys.databases, hvor navn ='db_src_cdc_ss' ) drop database db_src_cdc_ss;erklær @query nvarchar(max);vælg @query =N' opret database db_src_cdc_ss på (navn =N'''+navn+ ''', filnavn =N'''+[filnavn]+'.ss'') som et øjebliksbillede af db_src_cdc'fra db_src_cdc.sys.sysfiles hvor groupid =1; exec ( @query );-- læs LSN fra den forrige loaddeclare @lsn binær(10) =(vælg max(lsn) fra db_dst_cdc.dbo.log_cdc hvor tabelnavn ='localhost.db_src_cdc.dbo.Example');-- clear en tabel før den komplette loadif @lsn er null trunkér tabel db_dst_cdc.dbo.Eksempel;-- indlæs proces med incr_Example as( vælg rækkenummer() over (partition efter ID-rækkefølge efter __$start_lsn desc, __$seqval desc ) som __$rn , * fra db_src_cdc_ss.cdc.dbo_Example_CT hvor __$operation <> 3 og __$start_lsn> @lsn), full_Example as( vælg * fra db_src_cdc_ss.dbo.Eksempel hvor @lsn er null som(vælg ID,eksempel), cte_Examp Titel, __$operation fra incr_Example hvor __$rn =1 union alle vælg ID, Titel, 2 som __$operation from full_Example)flet db_dst_cdc.dbo.Eksempel som trg ved hjælp af cte_Example som src på trg.ID=src.IDnår matchet og __$operation =1 derefter deletewhen matched og __$operation <> 1 then update set trg.Title =src.Titlewhen not matched by target and __$operation <> 1 indsæt derefter (ID, Titel) værdier (src.ID, src .Title);-- marker slutningen af ​​indlæsningsprocessen og den seneste LSNindsæt db_dst_cdc.dbo.log_cdc (tabelnavn, lsn)værdier ('localhost.db_src_cdc.dbo.Example', isnull((vælg max(__$start_lsn) fra db_src_cdc_ss.cdc.dbo_Example_CT),0))-- slet database-øjebliksbilledet, hvis det findes (vælg * fra sys.databases, hvor navn ='db_src_cdc_ss' ) slip databasen db_src_cdc_ss

[/udvid]


  1. ORA-16205 Opgradering til 11.2.0.3

  2. Hvordan bruger man mysqli forberedte udsagn?

  3. sqlite get felt med mere end 2 MB

  4. MySql Count kan ikke vise 0 værdier