I de foregående to artikler i denne serie diskuterede vi, hvordan man bruger Python og SQLAlchemy til at udføre ETL-processen. I dag vil vi gøre det samme, men denne gang ved hjælp af Python og SQL Alchemy uden SQL-kommandoer i tekstformat. Dette vil gøre os i stand til at bruge SQLAlchemy uanset hvilken databasemotor, vi er forbundet til. Så lad os starte.
I dag vil vi diskutere, hvordan man udfører ETL-processen ved hjælp af Python og SQLAlchemy. Vi opretter et script til at udtrække daglige data fra vores operationelle database, transformere dem og derefter indlæse i vores datavarehus.
Dette er den tredje artikel i serien. Hvis du ikke har læst de to første artikler (Brug af Python og MySQL i ETL-processen og SQLAlchemy), opfordrer jeg dig kraftigt til at gøre det, før du fortsætter.
Hele denne serie er en fortsættelse af vores data warehouse-serie:
- Oprettelse af en DWH, del 1:En abonnementsforretningsdatamodel
- Oprettelse af en DWH, del 2:En abonnementsforretningsdatamodel
- Oprettelse af et datavarehus, del 3:En abonnementsforretningsdatamodel
Okay, lad os nu komme i gang med dagens emne. Lad os først se på datamodellerne.
Datamodellerne
Operationel (live) databasedatamodel
DWH-datamodel
Dette er de to datamodeller, vi skal bruge. For mere information om datavarehuse (DWH'er), tjek disse artikler:
- Stjerneskemaet
- Snefnugskemaet
- Stjerneskema vs. snefnugskema
Hvorfor SQLAlchemy?
Hele ideen bag SQLAlchemy er, at efter at vi har importeret databaser, har vi ikke brug for SQL-kode, der er specifik for den relaterede databasemotor. I stedet kan vi importere objekter til SQLAlchemy og bruge SQLAlchemy-syntaksen til udsagn. Det giver os mulighed for at bruge det samme sprog, uanset hvilken databasemotor vi er forbundet til. Den største fordel her er, at en udvikler ikke behøver at tage sig af forskellene mellem forskellige databasemotorer. Dit SQLAlchemy-program vil fungere nøjagtigt det samme (med mindre ændringer), hvis du migrerer til en anden databasemotor.
Jeg har besluttet kun at bruge SQLAlchemy-kommandoer og Python-lister til at kommunikere til midlertidigt lager og mellem forskellige databaser. Hovedårsagerne bag denne beslutning er, at 1) Python-lister er velkendte, og 2) koden ville være læsbar for dem uden Python-færdigheder.
Dette betyder ikke, at SQLAlchemy er perfekt. Det har visse begrænsninger, som vi vil diskutere senere. Lad os lige nu tage et kig på koden nedenfor:
Kørsel af scriptet og resultatet
Dette er Python-kommandoen, der bruges til at kalde vores script. Scriptet kontrollerer dataene i den operationelle database, sammenligner værdierne med DWH og importerer de nye værdier. I dette eksempel opdaterer vi værdier i to dimensionstabeller og en faktatabel; scriptet returnerer det relevante output. Hele scriptet er skrevet, så du kan køre det flere gange om dagen. Det vil slette "gamle" data for den dag og erstatte dem med nye.
Lad os analysere hele scriptet, startende fra toppen.
Import af SQLAlchemy
Den første ting, vi skal gøre, er at importere de moduler, vi skal bruge i scriptet. Normalt importerer du dine moduler, mens du skriver scriptet. I de fleste tilfælde ved du ikke præcist, hvilke moduler du skal bruge i starten.
from datetime import date # import SQLAlchemy from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case
Vi har importeret Pythons datetime
modul, som forsyner os med klasser, der arbejder med datoer.
Dernæst har vi sqlalchemy
modul. Vi importerer ikke hele modulet, kun de ting, vi har brug for – nogle specifikke for SQLAlchemy (create_engine
, MetaData
, Table
), nogle SQL-sætningsdele (select
, and_
, case
), og func
, som gør os i stand til at bruge funktioner som count() og sum() .
Opretter forbindelse til databaserne
Vi skal oprette forbindelse til to databaser på vores server. Vi kunne oprette forbindelse til flere databaser (MySQL, SQL Server eller enhver anden) fra forskellige servere, hvis det var nødvendigt. I dette tilfælde er begge databaser MySQL-databaser og er gemt på min lokale maskine.
# connect to databases engine_live = sqlalchemy.create_engine('mysql+pymysql://: @localhost:3306/subscription_live') connection_live = engine_live.connect() engine_dwh = sqlalchemy.create_engine('mysql+pymysql:// : @localhost:3306/subscription_dwh') connection_dwh = engine_dwh.connect() metadata = MetaData(bind=None)
Vi har oprettet to motorer og to forbindelser. Jeg vil ikke gå i detaljer her, fordi vi allerede har forklaret dette i den forrige artikel.
Opdatering af dim_time Dimension
Mål:Indsæt gårsdagens dato, hvis den ikke allerede er indsat i tabellen.
I vores script opdaterer vi to dimensionstabeller med nye værdier. Resten af dem følger samme mønster, så vi vil kun gennemgå dette én gang; vi behøver ikke at skrive næsten identisk kode ned et par gange mere.
Ideen er meget enkel. Vi kører altid scriptet for at indsætte nye data for i går. Derfor er vi nødt til at kontrollere, om denne dato blev indsat i dimensionstabellen. Hvis det allerede er der, vil vi ikke gøre noget; hvis det ikke er, tilføjer vi det. Lad os tage et kig på koden for at opdatere dim_time
tabel.
Først tjekker vi, om datoen eksisterer. Hvis det ikke findes, tilføjer vi det. Vi starter med at gemme gårsdagens dato i en variabel. I Python gør du det på denne måde:
yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = str(yesterday)
Den første linje tager en aktuel dato, konverterer den til en numerisk værdi, trækker 1 fra denne værdi og konverterer den numeriske værdi tilbage til en dato (i går =i dag – 1 ). Den anden linje gemmer datoen i et tekstformat.
Dernæst tester vi, om datoen allerede er i databasen:
table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh) stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str) result = connection_dwh.execute(stmt).fetchall() date_exists = len(result)
Efter indlæsning af tabellen kører vi en forespørgsel, der skal returnere alle rækker fra dimensionstabellen, hvor klokkeslættet/datoværdien er lig med i går. Resultatet kunne have 0 (ingen sådan dato i tabellen) eller 1 række (datoen er allerede i tabellen).
Hvis datoen ikke allerede er i tabellen, bruger vi kommandoen insert() til at tilføje den:
if date_exists == 0: print("New value added.") stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday()) connection_dwh.execute(stmt) else: print("No new values.")
En ny ting her, jeg gerne vil pege på, er brugen af. .year
, .month
, .isocalendar()[1]
og .weekday
for at få dateparts.
Opdatering af dim_city Dimension
Mål:Indsæt nye byer, hvis der er nogen (dvs. sammenlign listen over byer i livedatabasen med listen over byer i DWH, og tilføj manglende).
Opdatering af dim_time
dimensionen var ret enkel. Vi testede simpelthen, om der var en dato i tabellen, og indsatte den, hvis den ikke allerede var der. For at teste en værdi i DWH-databasen brugte vi en Python-variabel (i går ). Vi bruger den proces igen, men denne gang med lister.
Da der ikke er en nem måde at kombinere tabeller fra forskellige databaser i en enkelt SQLAlchemy-forespørgsel, kan vi ikke bruge den fremgangsmåde, der er skitseret i del 1 af denne serie. Derfor har vi brug for et objekt til at gemme de værdier, der er nødvendige for at kommunikere mellem disse to databaser. Jeg har besluttet at bruge lister, fordi de er almindelige, og de gør arbejdet.
Først indlæser vi country
og city
tabeller fra en live database til de relevante objekter.
# dim_city print("\nUpdating... dim_city") table_city = Table('city', metadata, autoload = True, autoload_with = engine_live) table_country = Table('country', metadata, autoload = True, autoload_with = engine_live) table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)
Derefter indlæser vi dim_city
tabel fra DWH til en liste:
# load whole dwh table in the list stmt = select([table_dim_city]); table_dim_city_list = connection_dwh.execute(stmt).fetchall()
Så gør vi det samme for værdierne fra livedatabasen. Vi slutter os til tabellerne country
og city
så vi har alle de nødvendige data på denne liste:
# load all live values in the list stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\ .select_from(table_city\ .join(table_country)) table_city_list = connection_live.execute(stmt).fetchall()
Nu gennemgår vi listen, der indeholder data fra livedatabasen. For hver post sammenligner vi værdier (city_name
, postal_code
og country_name
). Hvis vi ikke finder sådanne værdier, tilføjer vi en ny post i dim_city
tabel.
# loop through live_db table # for each record test if it is missing in the dwh table new_values_added = 0 for city in table_city_list: id = -1; for dim_city in table_dim_city_list: if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]: id = dim_city[0] if id == -1: stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2]) connection_dwh.execute(stmt) new_values_added = 1 if new_values_added == 0: print("No new values.") else: print("New value(s) added.")
For at afgøre, om værdien allerede er i DWH, testede vi en kombination af attributter, der skulle være unikke. (Den primære nøgle fra livedatabasen hjælper os ikke meget her.) Vi kan bruge lignende kode til at opdatere andre ordbøger. Det er ikke den pæneste løsning, men det er stadig en ret elegant. Og det vil gøre præcis, hvad vi har brug for.
Opdatering af fact_customer_subscribed Tabel
Mål:Hvis vi har gamle data for gårsdagens dato, skal du slette dem først. Tilføj gårsdagens data til DWH – uanset om vi har slettet noget i det forrige trin eller ej.
Efter at have opdateret alle dimensionstabellerne, bør vi opdatere faktatabellerne. I vores script opdaterer vi kun én faktatabel. Begrundelsen er den samme som i det foregående afsnit:opdatering af andre tabeller ville følge det samme mønster, så vi ville for det meste gentage koden.
Før vi indsætter værdier i faktatabellen, skal vi kende værdierne for de relaterede nøgler fra dimensionstabellerne. For at gøre det indlæser vi igen dimensioner i lister og sammenligner dem med værdier fra den levende database.
Den første ting, vi skal gøre, er at indlæse kunden og fact_customer_subscribed
tabeller til objekter:
# fact_customer_subscribed print("\nUpdating... fact_customer_subscribed") table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live) table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)
Nu skal vi finde nøgler til den relaterede tidsdimension. Da vi altid indsætter data for i går, søger vi efter denne dato i dim_time
bord og brug dets ID. Forespørgslen returnerer 1 række, og ID'et er i den første position (indekset starter fra 0, så det er result[0][0]
):
# find key for the dim_time dimension stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday) result = connection_dwh.execute(stmt).fetchall() dim_time_id = result[0][0]
For det tidspunkt sletter vi alle tilknyttede poster fra faktatabellen:
# delete any existing data in the fact table for that time dimension value stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id) connection_dwh.execute(stmt)
Okay, nu har vi ID'et for tidsdimensionen gemt i dim_time_id
variabel. Dette var nemt, fordi vi kun kan have én tidsdimensionsværdi. Historien vil være anderledes for bydimensionen. Først indlæser vi alle de værdier, vi har brug for – værdier, der entydigt beskriver byen (ikke ID'et), og aggregerede værdier:
# prepare data for insert stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\ .select_from(table_customer\ .join(table_city)\ .join(table_country))\ .group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)
Der er et par ting, jeg gerne vil understrege ved forespørgslen ovenfor:
func.sum(...)
er SUM(...) fra "standard SQL".case(...)
syntaks brugerand_
før betingelser, ikke mellem dem..label(...)
fungerer som et SQL AS-alias.- Vi bruger
\
for at gå til næste linje og øge forespørgslens læsbarhed. (Tro mig, det er stort set ulæseligt uden skråstreg – jeg har prøvet det :) ) .group_by(...)
spiller rollen som SQL's GROUP BY.
Dernæst gennemgår vi hver post, der returneres ved hjælp af den forrige forespørgsel. For hver post sammenligner vi værdier, der entydigt definerer en by (city_name
, postal_code
, country_name
) med værdierne gemt i listen oprettet ud fra DWH dim_city
bord. Hvis alle tre værdier matcher, gemmer vi ID'et fra listen og bruger det, når vi indsætter nye data. På denne måde har vi for hver registrering id'er for begge dimensioner:
# loop through all new records # use time dimension # for each record find key for city dimension # insert row new_values = connection_live.execute(stmt).fetchall() for new_value in new_values: dim_city_id = -1; for dim_city in table_dim_city_list: if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]: dim_city_id = dim_city[0] if dim_city_id > 0: stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6]) connection_dwh.execute(stmt_insert) dim_city_id = -1 print("Completed.")
Og det er det. Vi har opdateret vores DWH. Scriptet ville være meget længere, hvis vi opdaterede alle dimensions- og faktatabellerne. Kompleksiteten ville også være større, når en faktatabel er relateret til flere dimensionstabeller. I så fald skal vi bruge en for sløjfe for hver dimensionstabel.
Dette virker ikke!
Jeg var meget skuffet, da jeg skrev dette manuskript og så fandt ud af, at sådan noget ikke vil virke:
stmt = select([table_city.columns.city_name])\ .select_from(table_city\ .outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\ .where(table_dim_city.columns.id.is_(None))
I dette eksempel forsøger jeg at bruge tabeller fra to forskellige databaser. Hvis vi etablerer to separate forbindelser, vil den første forbindelse ikke "se" tabeller fra en anden forbindelse. Hvis vi forbinder direkte til serveren og ikke til en database, vil vi ikke være i stand til at indlæse tabeller.
Indtil dette ændrer sig (forhåbentlig snart), bliver du nødt til at bruge en form for struktur (f.eks. hvad vi gjorde i dag) til at kommunikere mellem de to databaser. Dette komplicerer koden, fordi du skal erstatte en enkelt forespørgsel med to lister og indlejret for sløjfer.
Del dine tanker om SQLAlchemy og Python
Dette var den sidste artikel i denne serie. Men hvem ved? Måske vil vi prøve en anden tilgang i kommende artikler, så følg med. I mellemtiden bedes du dele dine tanker om SQLAlchemy og Python i kombination med databaser. Hvad synes du, vi mangler i denne artikel? Hvad ville du tilføje? Fortæl os i kommentarerne nedenfor.
Du kan downloade det komplette script, som vi brugte i denne artikel her.
Og en særlig tak går til Dirk J Bosman (@dirkjobosman), som anbefalede denne artikelserie.