Dette blogindlæg blev offentliggjort på Hortonworks.com før fusionen med Cloudera. Nogle links, ressourcer eller referencer er muligvis ikke længere nøjagtige.
Vi er stolte af at kunne annoncere den tekniske forhåndsvisning af Spark-HBase Connector, udviklet af Hortonworks i samarbejde med Bloomberg.
Spark-HBase-stikket udnytter Data Source API (SPARK-3247) introduceret i Spark-1.2.0. Det bygger bro mellem det simple HBase Key Value-lager og komplekse relationelle SQL-forespørgsler og gør det muligt for brugere at udføre komplekse dataanalyser oven på HBase ved hjælp af Spark. En HBase DataFrame er en standard Spark DataFrame og er i stand til at interagere med andre datakilder såsom Hive, ORC, Parquet, JSON osv.
Baggrund
Der er adskillige open source Spark HBase-stik tilgængelige enten som Spark-pakker, som uafhængige projekter eller i HBase-trunk.
Spark er flyttet til Dataset/DataFrame API'erne, som giver indbygget forespørgselsplanoptimering. Nu foretrækker slutbrugere at bruge DataFrames/Datasets baseret grænseflade.
HBase-stikket i HBase-stammen har en rig støtte på RDD-niveau, f.eks. BulkPut osv., men dens DataFrame-understøttelse er ikke så rig. HBase trunk-stik er afhængig af standard HadoopRDD med HBase indbygget TableInputFormat har nogle ydeevnebegrænsninger. Derudover kan BulkGet udført i driveren være et enkelt fejlpunkt.
Der er nogle andre alternative implementeringer. Tag Spark-SQL-on-HBase som et eksempel. Den anvender meget avancerede brugerdefinerede optimeringsteknikker ved at indlejre sin egen forespørgselsoptimeringsplan i standard Spark Catalyst-motoren, sender RDD'en til HBase og udfører komplicerede opgaver, såsom delvis aggregering, inde i HBase-coprocessoren. Denne tilgang er i stand til at opnå høj ydeevne, men den er svær at vedligeholde på grund af dens kompleksitet og den hurtige udvikling af Spark. At tillade vilkårlig kode at køre inde i en coprocessor kan også udgøre sikkerhedsrisici.
Spark-on-HBase Connector (SHC) er udviklet til at overvinde disse potentielle flaskehalse og svagheder. Den implementerer standard Spark Datasource API og udnytter Spark Catalyst-motoren til forespørgselsoptimering. Parallelt hermed er RDD'en konstrueret fra bunden i stedet for at bruge TableInputFormat for at opnå høj ydeevne. Med denne tilpassede RDD kan alle kritiske teknikker anvendes og implementeres fuldt ud, såsom partitionsbeskæring, kolonnebeskæring, prædikat-pushdown og datalokalitet. Designet gør vedligeholdelsen meget nem, samtidig med at der opnås en god afvejning mellem ydeevne og enkelhed.
Arkitektur
Vi antager, at Spark og HBase er installeret i den samme klynge, og Spark-eksekutorer er placeret sammen med regionsservere, som illustreret i figuren nedenfor.
Figur 1. Spark-on-HBase Connector Architecture
På et højt niveau behandler connectoren både Scan og Get på samme måde, og begge handlinger udføres i udførerne. Driveren behandler forespørgslen, samler scanninger/henter baseret på regionens metadata og genererer opgaver pr. region. Opgaverne sendes til de foretrukne eksekvere, der er samlokaliseret med regionserveren, og udføres parallelt i eksekvererne for at opnå bedre datalokalitet og samtidighed. Hvis en region ikke har de nødvendige data, tildeles den pågældende regionsserver ingen opgave. En opgave kan bestå af flere scanninger og bulkGets, og dataanmodningerne fra en opgave hentes kun fra én regionsserver, og denne regionsserver vil også være lokalitetspræference for opgaven. Bemærk, at chaufføren ikke er involveret i den rigtige jobudførelse bortset fra planlægningsopgaver. Dette undgår, at chaufføren er flaskehalsen.
Tabelkatalog
For at bringe HBase-tabellen som en relationstabel ind i Spark, definerer vi en mapping mellem HBase- og Spark-tabeller, kaldet Table Catalog. Der er to kritiske dele af dette katalog. Den ene er rækketastdefinitionen, og den anden er kortlægningen mellem tabelkolonnen i Spark og kolonnefamilien og kolonnekvalifikationen i HBase. Se venligst afsnittet Brug for detaljer.
Native Avro-understøttelse
Konnektoren understøtter Avro-formatet indbygget, da det er en meget almindelig praksis at bevare strukturerede data i HBase som et byte-array. Brugeren kan vedholde Avro-registreringen direkte i HBase. Internt konverteres Avro-skemaet automatisk til en indbygget Spark Catalyst-datatype. Bemærk, at begge nøgleværdidele i en HBase-tabel kan defineres i Avro-format. Se eksemplerne/testcaserne i repoen for nøjagtig brug.
Pushdown for prædikat
Connectoren henter kun nødvendige kolonner fra regionsserveren for at reducere netværksomkostninger og undgå redundant behandling i Spark Catalyst-motoren. Eksisterende standard HBase-filtre bruges til at udføre prædikat-push-down uden at udnytte coprocessor-kapaciteten. Fordi HBase ikke er opmærksom på datatypen bortset fra byte-array og rækkefølgeinkonsistensen mellem Java-primitive typer og byte-array, skal vi forbehandle filterbetingelsen, før vi indstiller filteret i Scan-operationen for at undgå datatab. Inde på regionsserveren filtreres poster, der ikke matcher forespørgselsbetingelsen, fra.
Partitionsbeskæring
Ved at udtrække rækkenøglen fra prædikaterne opdeler vi Scan/BulkGet i flere ikke-overlappende områder, kun de regionsservere, der har de anmodede data, vil udføre Scan/BulkGet. I øjeblikket udføres partitionsbeskæringen på den første dimension af rækketasterne. For eksempel, hvis en rækketast er "key1:key2:key3", vil partitionsbeskæringen kun være baseret på "key1". Bemærk, at WHERE-betingelserne skal defineres nøje. Ellers vil skillevægsbeskæringen muligvis ikke træde i kraft. For eksempel vil HVOR rækketast1> "abc" ELLER kolonne ="xyz" (hvor rækketast1 er den første dimension af rækketasten, og kolonne er en almindelig hbase-kolonne) resultere i en fuld scanning, da vi skal dække alle områderne pga. af ELLER logik.
Datalokalitet
Når en Spark-eksekutor er samlokaliseret med HBase-regionsservere, opnås datalokalitet ved at identificere regionsserverens placering og gør den bedste indsats for at samlokalisere opgaven med regionserveren. Hver executor udfører Scan/BulkGet på den del af dataene, der er placeret på samme vært.
Scan og BulkGet
Disse to operatorer er eksponeret for brugere ved at specificere WHERE CLAUSE, f.eks. WHERE column> x og column
Brug
Det følgende illustrerer den grundlæggende procedure for, hvordan man bruger stikket. For flere detaljer og avanceret brug, såsom Avro og kompositnøgleunderstøttelse, henvises til eksemplerne i arkivet.
1) Definer kataloget for skematilknytningen:
[code language="scala"]def catalog =s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key" , |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":" col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin[/code]
2) Forbered dataene og udfyld HBase-tabellen:
caseklasse HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int, col5:Long, col6:Short, col7:String, col8:Byte)objekt HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}””” HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s”String$i:$t”, i.toByte) }}
val data =(0 til 255).map { i => HBaseRecord(i, “ekstra”)}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
.format(“org.apache.spark. sql.execution.datasources.hbase”)
.save()
3) Indlæs DataFrame:
def withCatalog(cat:String):DataFrame ={
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format( “org.apache.spark.sql.execution.datasources.hbase”)
.load()
}val df =withCatalog(catalog)
4) Sprogintegreret forespørgsel:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
$”col0″ ===“row005” ||
$”col0″ ===“row020” ||
$”col0″ === “r20” ||
$”col0″ <=“row005”) &&
($”col4″ ===1 ||
$”col4″ ===42))
.select(“col0”, “col1”, “col4”)
s .vis5) SQL-forespørgsel:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).showKonfiguration af Spark-Package
Brugere kan bruge Spark-on-HBase-stikket som en standard Spark-pakke. For at inkludere pakken i din Spark-applikation skal du bruge:
spark-shell, pyspark eller spark-submit
> $SPARK_HOME/bin/spark-shell –pakker zhzhan:shc:0.0.11-1.6.1-s_2.10
Brugere kan også inkludere pakken som afhængighed i din SBT-fil. Formatet er spark-pakkenavn:version
spDependencies +="zhzhan/shc:0.0.11-1.6.1-s_2.10"
Kører i Secure Cluster
For at køre i en Kerberos-aktiveret klynge skal brugeren inkludere HBase-relaterede jars i klassestien, da HBase-tokenhentning og fornyelse udføres af Spark og er uafhængig af forbindelsen. Med andre ord skal brugeren starte miljøet på normal vis, enten gennem kinit eller ved at levere principal/keytab. De følgende eksempler viser, hvordan man kører i en sikker klynge med både garnklient- og garnklyngetilstand. Bemærk, at SPARK_CLASSPATH skal indstilles for begge tilstande, og eksempelkrukken er kun en pladsholder for Spark.
eksport SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar
Antag, at hrt_qa er en hovedløs konto, kan brugeren bruge følgende kommando til kinit:
kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –antal-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –antal-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar
Slå det hele sammen
Vi har lige givet et hurtigt overblik over, hvordan HBase understøtter Spark på DataFrame-niveau. Med DataFrame API kan Spark-applikationer arbejde med data, der er gemt i HBase-tabellen, lige så nemt som alle data, der er gemt i andre datakilder. Med denne nye funktion kan data i HBase-tabeller nemt forbruges af Spark-applikationer og andre interaktive værktøjer, f.eks. brugere kan køre en kompleks SQL-forespørgsel oven på en HBase-tabel inde i Spark, udføre en tabelsammenføjning mod Dataframe eller integrere med Spark Streaming for at implementere et mere kompliceret system.
Hvad er det næste?
I øjeblikket er forbindelsen hostet i Hortonworks repo og udgivet som en Spark-pakke. Den er i færd med at blive migreret til Apache HBase trunk. Under migreringen identificerede vi nogle kritiske fejl i HBase-stammen, og de vil blive rettet sammen med sammenlægningen. Fællesskabsarbejdet spores af paraplyen HBase JIRA HBASE-14789, inklusive HBASE-14795 og HBASE-14796 for at optimere den underliggende computerarkitektur til Scan og BulkGet, HBASE-14801 for at give JSON-brugergrænsefladen til brugervenlighed-153BASE-153BASE DataFrame-skrivestien, HBASE-15334 for Avro-understøttelse, HBASE-15333 for at understøtte Java primitive typer, såsom short, int, long, float og double osv., HBASE-15335 for at understøtte sammensat rækkenøgle og HBASE-15572 for at tilføje valgfri tidsstempelsemantik. Vi ser frem til at producere en fremtidig version af stikket, som gør stikket endnu nemmere at arbejde med.
Anerkendelse
Vi vil gerne takke Hamel Kothari, Sudarshan Kadambi og Bloomberg-teamet for at vejlede os i dette arbejde og også hjælpe os med at validere dette arbejde. Vi vil også gerne takke HBase-fællesskabet for at give deres feedback og gøre dette bedre. Endelig har dette arbejde udnyttet erfaringerne fra tidligere Spark HBase-integrationer, og vi vil gerne takke deres udviklere for at have banet vejen.
Reference:
SHC:https://github.com/hortonworks/shc-release
Spark-pakke:http://spark-packages.org/package/zhzhan/shc
Apache HBase: https://hbase.apache.org/
Apache Spark:http://spark.apache.org/