sql >> Database teknologi >  >> RDS >> Mysql

Automatiser masseindlæsning af data fra s3 til Aurora MySQL RDS-instans

Fremgangsmåden er som angivet ovenfor, have en S3-hændelsestrigger og et lambdajob, der lytter på s3-spanden/objektets placering. Så snart en fil er uploadet til s3-lokationen, vil lambda-jobbet køre, og i lambda'en kan du konfigurere til at kalde et AWS Glue-job. Det er præcis, vi har gjort og er gået live. Lambda har en levetid på 15 minutter, og det burde tage mindre end et minut at udløse/starte et limjob.

Find venligst her en eksempelkilde til reference.

from __future__ import print_function
import json
import boto3
import time
import urllib

print('Loading function')

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="your-glue-job-name here"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist '
              'and your bucket is in the same region as this '
              'function.'.format(source_bucket, source_bucket))
    raise e

For at oprette en Lambda-funktion skal du gå til AWS Lambdra->Opret en ny funktion fra Scratch->Vælg S3 for begivenhed, og konfigurer derefter S3-bucket-placeringerne, præfikser efter behov. Kopier derefter indsæt ovenstående kodeeksempel, inline kodeområde, og konfigurer limjobbets navn efter behov. Sørg for, at du har alle nødvendige IAM-roller/adgangsopsætning.

Limjobbet skal have mulighed for at forbinde til din Aurora, og så kan du bruge kommandoen "LOAD FROM S3....." leveret af Aurora. Sørg for, at alle parametergruppeindstillinger/konfigurationer udføres efter behov.

Fortæl mig, hvis der er problemer.

OPDATERING:EKSEMPEL kodestykke til LOAD FROM S3:

conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, [email protected];"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()



  1. liste over skemaer med størrelser (relative og absolutte) i en PostgreSQL-database

  2. Kan Count(*) nogensinde returnere null?

  3. Båndbreddevenlig forespørgselsprofilering til Azure SQL Database

  4. PostgreSQL fremmednøgle eksisterer ikke, udstedelse af arv?