sql >> Database teknologi >  >> NoSQL >> HBase

Spark-on-HBase:DataFrame-baseret HBase-stik

Dette blogindlæg blev offentliggjort på Hortonworks.com før fusionen med Cloudera. Nogle links, ressourcer eller referencer er muligvis ikke længere nøjagtige.

Vi er stolte af at kunne annoncere den tekniske forhåndsvisning af Spark-HBase Connector, udviklet af Hortonworks i samarbejde med Bloomberg.

Spark-HBase-stikket udnytter Data Source API (SPARK-3247) introduceret i Spark-1.2.0. Det bygger bro mellem det simple HBase Key Value-lager og komplekse relationelle SQL-forespørgsler og gør det muligt for brugere at udføre komplekse dataanalyser oven på HBase ved hjælp af Spark. En HBase DataFrame er en standard Spark DataFrame og er i stand til at interagere med andre datakilder såsom Hive, ORC, Parquet, JSON osv.

Baggrund

Der er adskillige open source Spark HBase-stik tilgængelige enten som Spark-pakker, som uafhængige projekter eller i HBase-trunk.

Spark er flyttet til Dataset/DataFrame API'erne, som giver indbygget forespørgselsplanoptimering. Nu foretrækker slutbrugere at bruge DataFrames/Datasets baseret grænseflade.

HBase-stikket i HBase-stammen har en rig støtte på RDD-niveau, f.eks. BulkPut osv., men dens DataFrame-understøttelse er ikke så rig. HBase trunk-stik er afhængig af standard HadoopRDD med HBase indbygget TableInputFormat har nogle ydeevnebegrænsninger. Derudover kan BulkGet udført i driveren være et enkelt fejlpunkt.

Der er nogle andre alternative implementeringer. Tag Spark-SQL-on-HBase som et eksempel. Den anvender meget avancerede brugerdefinerede optimeringsteknikker ved at indlejre sin egen forespørgselsoptimeringsplan i standard Spark Catalyst-motoren, sender RDD'en til HBase og udfører komplicerede opgaver, såsom delvis aggregering, inde i HBase-coprocessoren. Denne tilgang er i stand til at opnå høj ydeevne, men den er svær at vedligeholde på grund af dens kompleksitet og den hurtige udvikling af Spark. At tillade vilkårlig kode at køre inde i en coprocessor kan også udgøre sikkerhedsrisici.

Spark-on-HBase Connector (SHC) er udviklet til at overvinde disse potentielle flaskehalse og svagheder. Den implementerer standard Spark Datasource API og udnytter Spark Catalyst-motoren til forespørgselsoptimering. Parallelt hermed er RDD'en konstrueret fra bunden i stedet for at bruge TableInputFormat for at opnå høj ydeevne. Med denne tilpassede RDD kan alle kritiske teknikker anvendes og implementeres fuldt ud, såsom partitionsbeskæring, kolonnebeskæring, prædikat-pushdown og datalokalitet. Designet gør vedligeholdelsen meget nem, samtidig med at der opnås en god afvejning mellem ydeevne og enkelhed.

Arkitektur

Vi antager, at Spark og HBase er installeret i den samme klynge, og Spark-eksekutorer er placeret sammen med regionsservere, som illustreret i figuren nedenfor.

Figur 1. Spark-on-HBase Connector Architecture

På et højt niveau behandler connectoren både Scan og Get på samme måde, og begge handlinger udføres i udførerne. Driveren behandler forespørgslen, samler scanninger/henter baseret på regionens metadata og genererer opgaver pr. region. Opgaverne sendes til de foretrukne eksekvere, der er samlokaliseret med regionserveren, og udføres parallelt i eksekvererne for at opnå bedre datalokalitet og samtidighed. Hvis en region ikke har de nødvendige data, tildeles den pågældende regionsserver ingen opgave. En opgave kan bestå af flere scanninger og bulkGets, og dataanmodningerne fra en opgave hentes kun fra én regionsserver, og denne regionsserver vil også være lokalitetspræference for opgaven. Bemærk, at chaufføren ikke er involveret i den rigtige jobudførelse bortset fra planlægningsopgaver. Dette undgår, at chaufføren er flaskehalsen.

Tabelkatalog

For at bringe HBase-tabellen som en relationstabel ind i Spark, definerer vi en mapping mellem HBase- og Spark-tabeller, kaldet Table Catalog. Der er to kritiske dele af dette katalog. Den ene er rækketastdefinitionen, og den anden er kortlægningen mellem tabelkolonnen i Spark og kolonnefamilien og kolonnekvalifikationen i HBase. Se venligst afsnittet Brug for detaljer.

Native Avro-understøttelse

Konnektoren understøtter Avro-formatet indbygget, da det er en meget almindelig praksis at bevare strukturerede data i HBase som et byte-array. Brugeren kan vedholde Avro-registreringen direkte i HBase. Internt konverteres Avro-skemaet automatisk til en indbygget Spark Catalyst-datatype. Bemærk, at begge nøgleværdidele i en HBase-tabel kan defineres i Avro-format. Se eksemplerne/testcaserne i repoen for nøjagtig brug.

Pushdown for prædikat

Connectoren henter kun nødvendige kolonner fra regionsserveren for at reducere netværksomkostninger og undgå redundant behandling i Spark Catalyst-motoren. Eksisterende standard HBase-filtre bruges til at udføre prædikat-push-down uden at udnytte coprocessor-kapaciteten. Fordi HBase ikke er opmærksom på datatypen bortset fra byte-array og rækkefølgeinkonsistensen mellem Java-primitive typer og byte-array, skal vi forbehandle filterbetingelsen, før vi indstiller filteret i Scan-operationen for at undgå datatab. Inde på regionsserveren filtreres poster, der ikke matcher forespørgselsbetingelsen, fra.

Partitionsbeskæring

Ved at udtrække rækkenøglen fra prædikaterne opdeler vi Scan/BulkGet i flere ikke-overlappende områder, kun de regionsservere, der har de anmodede data, vil udføre Scan/BulkGet. I øjeblikket udføres partitionsbeskæringen på den første dimension af rækketasterne. For eksempel, hvis en rækketast er "key1:key2:key3", vil partitionsbeskæringen kun være baseret på "key1". Bemærk, at WHERE-betingelserne skal defineres nøje. Ellers vil skillevægsbeskæringen muligvis ikke træde i kraft. For eksempel vil HVOR rækketast1> "abc" ELLER kolonne ="xyz" (hvor rækketast1 er den første dimension af rækketasten, og kolonne er en almindelig hbase-kolonne) resultere i en fuld scanning, da vi skal dække alle områderne pga. af ELLER logik.

Datalokalitet

Når en Spark-eksekutor er samlokaliseret med HBase-regionsservere, opnås datalokalitet ved at identificere regionsserverens placering og gør den bedste indsats for at samlokalisere opgaven med regionserveren. Hver executor udfører Scan/BulkGet på den del af dataene, der er placeret på samme vært.

Scan og BulkGet

Disse to operatorer er eksponeret for brugere ved at specificere WHERE CLAUSE, f.eks. WHERE column> x og column til scanning og WHERE kolonne =x for at få. Operationerne udføres i udførerne, og driveren konstruerer kun disse operationer. Internt konverteres de til scanning og/eller get, og Iterator[Row] returneres til katalysatormotoren til behandling af det øverste lag.

Brug

Det følgende illustrerer den grundlæggende procedure for, hvordan man bruger stikket. For flere detaljer og avanceret brug, såsom Avro og kompositnøgleunderstøttelse, henvises til eksemplerne i arkivet.

1) Definer kataloget for skematilknytningen:

[code language="scala"]def catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},        |"rowkey":"key" ,        |"columns":{          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},          |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},          |"col4":{"cf":"cf4", "col":" col4", "type":"int"},          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},          |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}        |}      |}""".stripMargin[/code] 

2) Forbered dataene og udfyld HBase-tabellen:
caseklasse HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

objekt HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 til 255).map { i =>  HBaseRecord(i, “ekstra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Indlæs DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(catalog)

4) Sprogintegreret forespørgsel:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
 $”col0″ ===“row005” ||
 $”col0″ ===“row020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“row005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .vis

5) SQL-forespørgsel:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

Konfiguration af Spark-Package

Brugere kan bruge Spark-on-HBase-stikket som en standard Spark-pakke. For at inkludere pakken i din Spark-applikation skal du bruge:

spark-shell, pyspark eller spark-submit

> $SPARK_HOME/bin/spark-shell –pakker zhzhan:shc:0.0.11-1.6.1-s_2.10

Brugere kan også inkludere pakken som afhængighed i din SBT-fil. Formatet er spark-pakkenavn:version

spDependencies +="zhzhan/shc:0.0.11-1.6.1-s_2.10"

Kører i Secure Cluster

For at køre i en Kerberos-aktiveret klynge skal brugeren inkludere HBase-relaterede jars i klassestien, da HBase-tokenhentning og fornyelse udføres af Spark og er uafhængig af forbindelsen. Med andre ord skal brugeren starte miljøet på normal vis, enten gennem kinit eller ved at levere principal/keytab. De følgende eksempler viser, hvordan man kører i en sikker klynge med både garnklient- og garnklyngetilstand. Bemærk, at SPARK_CLASSPATH skal indstilles for begge tilstande, og eksempelkrukken er kun en pladsholder for Spark.

eksport SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Antag, at hrt_qa er en hovedløs konto, kan brugeren bruge følgende kommando til kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –antal-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –antal-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Slå det hele sammen

Vi har lige givet et hurtigt overblik over, hvordan HBase understøtter Spark på DataFrame-niveau. Med DataFrame API kan Spark-applikationer arbejde med data, der er gemt i HBase-tabellen, lige så nemt som alle data, der er gemt i andre datakilder. Med denne nye funktion kan data i HBase-tabeller nemt forbruges af Spark-applikationer og andre interaktive værktøjer, f.eks. brugere kan køre en kompleks SQL-forespørgsel oven på en HBase-tabel inde i Spark, udføre en tabelsammenføjning mod Dataframe eller integrere med Spark Streaming for at implementere et mere kompliceret system.

Hvad er det næste?

I øjeblikket er forbindelsen hostet i Hortonworks repo og udgivet som en Spark-pakke. Den er i færd med at blive migreret til Apache HBase trunk. Under migreringen identificerede vi nogle kritiske fejl i HBase-stammen, og de vil blive rettet sammen med sammenlægningen. Fællesskabsarbejdet spores af paraplyen HBase JIRA HBASE-14789, inklusive HBASE-14795 og HBASE-14796  for at optimere den underliggende computerarkitektur til Scan og BulkGet, HBASE-14801 for at give JSON-brugergrænsefladen til brugervenlighed-153BASE-153BASE DataFrame-skrivestien, HBASE-15334 for Avro-understøttelse, HBASE-15333  for at understøtte Java primitive typer, såsom short, int, long, float og double osv., HBASE-15335 for at understøtte sammensat rækkenøgle og HBASE-15572 for at tilføje valgfri tidsstempelsemantik. Vi ser frem til at producere en fremtidig version af stikket, som gør stikket endnu nemmere at arbejde med.

Anerkendelse

Vi vil gerne takke Hamel Kothari, Sudarshan Kadambi og Bloomberg-teamet for at vejlede os i dette arbejde og også hjælpe os med at validere dette arbejde. Vi vil også gerne takke HBase-fællesskabet for at give deres feedback og gøre dette bedre. Endelig har dette arbejde udnyttet erfaringerne fra tidligere Spark HBase-integrationer, og vi vil gerne takke deres udviklere for at have banet vejen.

Reference:

SHC:https://github.com/hortonworks/shc-release

Spark-pakke:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. MongoDB diakritisk følsom søgning viser ikke alle accentuerede (ord med diakritisk tegn) rækker som forventet og omvendt

  2. Redis klient

  3. Mongo C# JSON-læser forventede en værdi, men fandt 'replSetGetStatus'

  4. spring-data-mongo - valgfri forespørgselsparametre?