Apache HBase handler om at give dig tilfældig, real-time, læse/skriveadgang til dine Big Data, men hvordan får du effektivt disse data ind i HBase i første omgang? Intuitivt vil en ny bruger forsøge at gøre det via klient-API'erne eller ved at bruge et MapReduce-job med TableOutputFormat, men disse tilgange er problematiske, som du vil lære nedenfor. I stedet er HBase bulk loading-funktionen meget nemmere at bruge og kan indsætte den samme mængde data hurtigere.
Dette blogindlæg vil introducere de grundlæggende begreber for masseindlæsningsfunktionen, præsentere to use cases og foreslå to eksempler.
Oversigt over masseindlæsning
Hvis du har nogle af disse symptomer, er bulk loading sandsynligvis det rigtige valg for dig:
- Du var nødt til at tilpasse dine MemStores for at bruge det meste af hukommelsen.
- Du skulle enten bruge større WAL'er eller omgå dem helt.
- Dine komprimerings- og skyllekøer er i hundredvis.
- Din GC er ude af kontrol, fordi din indsættelsesrækkevidde i MB'erne.
- Din forsinkelse går ud af din SLA, når du importerer data.
De fleste af disse symptomer omtales almindeligvis som "voksesmerter". Brug af massefyldning kan hjælpe dig med at undgå dem.
I HBase-speak er bulk loading processen med at forberede og indlæse HFiles (HBases eget filformat) direkte ind i RegionServers, og dermed omgå skrivestien og helt undgå disse problemer. Denne proces ligner ETL og ser sådan ud:
1. Uddrag dataene fra en kilde, typisk tekstfiler eller en anden database. HBase administrerer ikke denne del af processen. Med andre ord kan du ikke bede HBase om at forberede hFiles ved direkte at læse dem fra MySQL - snarere skal du gøre det på egen hånd. For eksempel kan du køre mysqldump på en tabel og uploade de resulterende filer til HDFS eller bare få fat i dine Apache HTTP-logfiler. Under alle omstændigheder skal dine data være i HDFS før næste trin.
2. Omdan dataene til hFiles. Dette trin kræver et MapReduce-job, og for de fleste inputtyper skal du selv skrive Mapper. Jobbet skal udsende rækketasten som nøglen og enten en KeyValue, en Put eller en Delete som værdien. Reduceren håndteres af HBase; du konfigurerer det ved hjælp af HFileOutputFormat.configureIncrementalLoad(), og det gør følgende:
- Inspicerer tabellen for at konfigurere en totalordrepartitioner
- Uploader partitionsfilen til klyngen og tilføjer den til DistributedCache
- Indstiller antallet af reduktionsopgaver, så det matcher det aktuelle antal regioner
- Indstiller outputnøgle/værdiklassen til at matche HFileOutputFormats krav
- Indstiller reduktionen til at udføre den passende sortering (enten KeyValueSortReducer eller PutSortReducer)
På dette stadium vil der blive oprettet én HF-fil pr. region i outputmappen. Husk, at inputdataene er næsten fuldstændigt omskrevet, så du skal bruge mindst dobbelt så meget ledig diskplads end størrelsen af det originale datasæt. For eksempel, for en 100 GB mysqldump bør du have mindst 200 GB ledig diskplads i HDFS. Du kan slette dumpfilen i slutningen af processen.
3. Indlæs filerne i HBase ved at fortælle RegionServers, hvor de kan finde dem. Dette er det nemmeste trin. Det kræver brug af LoadIncrementalHFiles (mere kendt som completebulkload-værktøjet), og ved at sende det en URL, der lokaliserer filerne i HDFS, vil det indlæse hver fil i den relevante region via den RegionServer, der betjener den. I tilfælde af at en region blev opdelt efter filerne blev oprettet, vil værktøjet automatisk opdele HF-filen i henhold til de nye grænser. Denne proces er ikke særlig effektiv, så hvis din tabel i øjeblikket skrives til af andre processer, er det bedst at få filerne indlæst, så snart transformeringstrinnet er udført.
Her er en illustration af denne proces. Datastrømmen går fra den oprindelige kilde til HDFS, hvor RegionServers blot flytter filerne til deres regioners mapper.
Use Cases
Oprindelig datasætindlæsning: Alle brugere, der migrerer fra et andet datalager, bør overveje denne use case. Først skal du gennemgå øvelsen med at designe tabelskemaet og derefter oprette selve tabellen, forhåndsopdelt. Opdelingspunkterne skal tage højde for rækketastfordelingen og antallet af RegionServers. Jeg anbefaler at læse min kollega Lars Georges præsentation om avanceret skemadesign til enhver seriøs use case.
Fordelen her er, at det er meget hurtigere at skrive filerne direkte end at gå gennem RegionServers skrivesti (skrive til både MemStore og WAL) og så til sidst skylle, komprimere og så videre. Det betyder også, at du ikke behøver at tune din klynge til en skrivetung arbejdsbyrde og derefter tune den igen til din normale arbejdsbyrde.
Inkrementel belastning: Lad os sige, at du har noget datasæt, der i øjeblikket betjenes af HBase, men nu skal du importere flere data i batch fra en tredjepart, eller du har et natligt job, der genererer et par gigabyte, som du skal indsætte. Det er sandsynligvis ikke så stort som det datasæt, som HBase allerede leverer, men det kan påvirke din latens 95. percentil. At gå gennem den normale skrivesti vil have den negative effekt at udløse flere skylninger og komprimeringer under importen end normalt. Denne ekstra IO-stress vil konkurrere med dine latensfølsomme forespørgsler.
Eksempler
Du kan bruge følgende eksempler i din egen Hadoop-klynge, men instruktionerne findes til Cloudera QuickStart VM, som er en enkelt-node-klynge, gæste-OS og eksempeldata og eksempler indbygget i en virtuel maskine til dit skrivebord.
Når du starter VM'en, skal du fortælle den via webgrænsefladen, der automatisk åbner, at den skal installere CDH og derefter sikre dig, at HBase-tjenesten også er startet.
Indbygget TSV Bulk Loader
HBase leveres med et MR-job, der kan læse en afgrænser-separeret værdifil og output direkte i en HBase-tabel eller oprette HFiles til masseindlæsning. Her skal vi til:
- Hent prøvedataene og upload dem til HDFS.
- Kør ImportTsv-jobbet for at transformere filen til flere HFiler i henhold til en forudkonfigureret tabel.
- Forbered og indlæs filerne i HBase.
Det første trin er at åbne en konsol og bruge følgende kommando til at få eksempeldata:
krølle -Ohttps://people.apache.org/~jdcryans/word_count.csv
Jeg oprettede denne fil ved at køre en ordtælling på det originale manuskript til netop dette blogindlæg og derefter udskrive resultatet i csv-format uden nogen kolonnetitler. Upload nu filen til HDFS:
hdfs dfs -put word_count.csv
Når ekstraktionsdelen af bulkbelastningen nu er fuldført, skal du transformere filen. Først skal du designe bordet. For at gøre tingene enkle, kald det "ordtælling" - rækketasterne vil være selve ordene, og den eneste kolonne vil indeholde optællingen i en familie, som vi vil kalde "f". Den bedste praksis, når du opretter en tabel, er at opdele den i henhold til rækkenøglefordelingen, men for dette eksempel vil vi blot oprette fem regioner med splitpunkter fordelt jævnt over nøglerummet. Åbn hbase-skallen:
hbase shell
Og kør følgende kommando for at oprette tabellen:
opret 'wordcount', {NAME => 'f'}, {SPLITS => ['g', 'm', 'r', 'w']}
De fire splitpunkter vil generere fem regioner, hvor den første region starter med en tom rækketast. For at få bedre delte point kan du også lave en hurtig analyse for at se, hvordan ordene virkelig er fordelt, men det overlader jeg til dig.
Hvis du peger på din VM's browser til http://localhost:60010/, vil du se vores nyoprettede tabel og dens fem regioner alle tildelt RegionServeren.
Nu er det tid til at gøre det tunge løft. Hvis du kalder HBase-krukken på kommandolinjen med "hadoop"-scriptet, vises en liste over tilgængelige værktøjer. Den vi ønsker hedder importtsv og har følgende anvendelse:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv FEJL:Forkert antal argumenter:0 Anvendelse:importtsv -Dimporttsv.columns=a,b,c
Kommandolinjen vi skal bruge er følgende:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv-Dimporttsv.separator=,-Dimporttsv.bulk.output=output-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
Her er en oversigt over de forskellige konfigurationselementer:
- -Dimporttsv.separator=, angiver, at separatoren er et komma.
- -Dimporttsv.bulk.output=output er en relativ sti til, hvor hFiles vil blive skrevet. Da din bruger på VM'en er "cloudera" som standard, betyder det, at filerne vil være i /user/cloudera/output. Hvis du springer denne mulighed over, vil jobbet skrives direkte til HBase.
- -Dimporttsv.columns=HBASE_ROW_KEY,f:count er en liste over alle kolonnerne i denne fil. Rækkenøglen skal identificeres ved hjælp af HBASE_ROW_KEY-strengen med store bogstaver; ellers starter det ikke jobbet. (Jeg besluttede at bruge kvalifikationen "tælle", men det kunne være alt andet.)
Jobbet bør afsluttes inden for et minut, givet den lille inputstørrelse. Bemærk, at fem Reducers kører, en pr. region. Her er resultatet på HDFS:
-rw-r--r-- 3 cloudera cloudera 4265 2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b-rw-era--r-- 1 output/f/786198ca47ae406f9be05c9eb09beb36-rw-r--r-- 3 cloudera cloudera 2487 2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2-rw-r--r-- 3 cloudera cloudera 2961 2013-09-12 13 :13 output/f/bb341f04c6d845e8bb95830e9946a914-rw-r--r-- 3 cloudera cloudera 1336 1336 2013-09-12 13:14 output/f/c656badd8423bd2566d849bd6456d849bdf>Som du kan se, tilhører filerne i øjeblikket brugeren "cloudera". For at indlæse dem skal vi ændre ejeren til "hbase", ellers vil HBase ikke have tilladelse til at flytte filerne. Kør følgende kommando:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/clooudera/outputTil det sidste trin skal vi bruge completebulkload-værktøjet til at pege på, hvor filerne er, og hvilke tabeller vi indlæser til:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcountGår du tilbage til HBase-skallen, kan du køre count-kommandoen, der viser dig, hvor mange rækker der blev indlæst. Hvis du har glemt at chown, vil kommandoen hænge.
Tilpasset MR-job
TSV-bulk-loaderen er god til prototyping, men fordi den fortolker alt som strenge og ikke understøtter manipulation af felterne på transformationstidspunktet, vil du ende med at skulle skrive dit eget MR-job. Min kollega James Kinley, der arbejder som løsningsarkitekt i Europa, skrev sådan et job, som vi vil bruge til vores næste eksempel. Dataene for jobbet indeholder offentlige Facebook- og Twitter-beskeder relateret til 2010 NBA-finalerne (spil 1) mellem Lakers og Celtics. Du kan finde koden her. (Quick Start VM leveres med git og maven installeret, så du kan klone lageret på det.)
Ser man på førerklassen, er de vigtigste bits følgende:
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class);... // Konfigurer automatisk partitionering og reducering HFileOutputFormat.configureIncrementalLoad(job, hTable);Først skal din Mapper udlæse en ImmutableBytesWritable, der indeholder rækkenøglen, og outputværdien kan enten være en KeyValue, en Put eller en Delete. Det andet uddrag viser, hvordan man konfigurerer Reducer; det er faktisk fuldstændigt håndteret af HFileOutputFormat. configureIncrementalLoad() som beskrevet i afsnittet "Transformer" tidligere.
HBaseKVMapper-klassen indeholder kun den Mapper, der respekterer den konfigurerede outputnøgle og værdier:
offentlig klasse HBaseKVMapper udvider Mapper{ For at køre det skal du kompilere projektet ved hjælp af maven og få fat i datafilerne ved at følge linkene i README. (Det indeholder også shell-scriptet til at oprette tabellen.) Glem ikke at uploade filerne til HDFS og indstille din klassesti til at være opmærksom på HBase, før du starter jobbet, fordi du ikke kommer til at bruge dens jar denne gang :
eksporter HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*Du vil være i stand til at starte jobbet ved at bruge en kommandolinje, der ligner denne:
hadoop jar hbase-examples-0.0.1-SNAPSHOT.jarcom.cloudera.examples.hbase.bulkimport.Driver -libjars/home/clooudera/.m2/repository/joda-time/joda-time/2.1/joda- time-2.1.jar,/home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jarRowFeeder\ for\ Celtics\ og\ Lakers\ Game\ 1.csv output2 NBAFinal2010Som du kan se, skal jobbets afhængigheder tilføjes separat. Endelig kan du indlæse filerne ved først at ændre deres ejer og derefter køre værktøjet komplet bulkload:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010Potentielle problemer
For nylig slettede data vises igen. Dette problem opstår, når en Delete indsættes via en bulkbelastning og er kraftigt komprimeret, mens den tilsvarende Put stadig er i en MemStore. Dataene vil blive betragtet som slettet, når sletningen er i en HF-fil, men når den først er fjernet under komprimeringen, bliver puttet igen synligt. Hvis du har en sådan brugssag, kan du overveje at konfigurere dine kolonnefamilier til at beholde de slettede celler med KEEP_DELETED_CELLS i skallen eller HColumnDescriptor.setKeepDeletedCells().
Masseindlæste data kan ikke overskrives af en anden masseindlæsning. Dette problem opstår, når to bulk-indlæste HFiler, der er indlæst på forskellige tidspunkter, forsøger at skrive en anden værdi i den samme celle, hvilket betyder, at de har den samme rækkenøgle, familie, kvalifikation og tidsstempel. Resultatet er, at den første indsatte værdi vil blive returneret i stedet for den anden. Denne fejl vil blive rettet i HBase 0.96.0 og CDH 5 (den næste CDH-hovedversion), og der arbejdes i HBASE-8521 for 0.94-grenen og CDH 4.
Massebelastning udløser store komprimeringer. Dette problem opstår, når du laver trinvise masseindlæsninger, og der er nok masseindlæste filer til at udløse en mindre komprimering (standardtærsklen er 3). HFilerne er indlæst med et sekvensnummer sat til 0, så de hentes først, når RegionServeren udvælger filer til en komprimering, og på grund af en fejl vil den også vælge alle de resterende filer. Dette problem vil alvorligt påvirke dem, der allerede har store regioner (flere GB'er) eller som masseindlæser ofte (hvert par timer og mindre), da en masse data vil blive komprimeret. HBase 0.96.0 har den korrekte rettelse, og det samme vil CDH 5; HBASE-8521 løser problemet i 0.94, da de bulk-indlæste HFiles nu er tildelt et korrekt sekvensnummer. HBASE-8283 kan aktiveres med hbase.hstore.useExploringCompation efter 0.94.9 og CDH 4.4.0 for at afhjælpe dette problem ved blot at være en smartere komprimerings-udvælgelsesalgoritme.
Masseindlæste data replikeres ikke . Da masseindlæsning omgår skrivestien, bliver WAL ikke skrevet til som en del af processen. Replikering fungerer ved at læse WAL-filerne, så den ikke vil se de bulk-indlæste data - og det samme gælder for de redigeringer, der bruger Put.setWriteToWAL(true). En måde at håndtere det på er at sende råfilerne eller HFiles til den anden klynge og udføre den anden behandling der.
Konklusion
Målet med dette blogindlæg var at introducere dig til Apache HBase bulk loadings grundlæggende koncepter. Vi forklarede, hvordan processen er som at lave ETL, og at den er meget bedre til store datasæt end at bruge den normale API, da den omgår skrivestien. De to eksempler blev inkluderet for at vise, hvordan simple TSV-filer kan masseindlæses til HBase, og hvordan du skriver din egen Mapper til andre dataformater.
Nu kan du prøve at gøre det samme ved at bruge en grafisk brugergrænseflade via Hue.