Tak til Pengyu Wang, softwareudvikler hos FINRA, for tilladelsen til at genudgive dette indlæg.
Salted Apache HBase-tabeller med pre-split er en bevist effektiv HBase-løsning til at give ensartet arbejdsbelastningsfordeling på tværs af RegionServers og forhindre hot spots under bulkskrivning. I dette design laves en rækketast med en logisk nøgle plus salt i begyndelsen. En måde at generere salt på er ved at beregne n (antal regioner) modulo på hashkoden for den logiske rækkenøgle (dato osv.).
Taster til saltrække
For eksempel kan en tabel, der accepterer dataindlæsning på daglig basis, bruge logiske rækkenøgler, der starter med en dato, og vi ønsker at forhåndsopdele denne tabel i 1.000 regioner. I dette tilfælde forventer vi at generere 1.000 forskellige salte. Saltet kan for eksempel dannes som:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey =2015-04-26|abcrowKey =893|2015-04-26|abc
Outputtet fra hashCode()
med modulo giver tilfældighed for saltværdi fra "000" til "999". Med denne nøgletransformation er bordet forhåndsopdelt på saltgrænserne, efterhånden som det oprettes. Dette vil gøre rækkevolumener ensartet fordelt under indlæsning af HFiles med MapReduce bulkload. Det garanterer, at rækketaster med samme salt falder i samme område.
I mange tilfælde, som f.eks. dataarkivering, skal du scanne eller kopiere dataene over et bestemt logisk nøgleinterval (datointerval) ved hjælp af MapReduce job. Standardtabel MapReduce-job konfigureres ved at levere Scan
instans med nøgleområdeattributter.
Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Konfigurer tabelkortlægningsjobbet */TableMapReduceUtil.initTableMapperJob(tabelnavn,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);...
Men opsætningen af et sådant job bliver udfordrende for saltede præ-opdelte borde. Start- og stoprækketasterne vil være forskellige for hver region, fordi hver enkelt har et unikt salt. Og vi kan ikke angive flere områder til én Scan
eksempel.
For at løse dette problem er vi nødt til at se på, hvordan tabellen MapReduce fungerer. Generelt opretter MapReduce-rammen én kortopgave til at læse og behandle hver inputopdeling. Hver opdeling genereres i InputFormat
klassebase ved metoden getSplits()
.
I HBase-tabel MapReduce-job, TableInputFormat
bruges som InputFormat
. Inde i implementeringen er getSplits()
metoden tilsidesættes for at hente start- og stoprækketasterne fra Scan
eksempel. Da start- og stoprækketasterne strækker sig over flere områder, divideres området med regionsgrænser og returnerer listen over TableSplit
objekter, der dækker scanningsnøgleområdet. I stedet for at være baseret på HDFS-blok, TableSplit
s er baseret på region. Ved at overskrive getSplits()
metode, er vi i stand til at kontrollere TableSplit
.
Opbygning af tilpasset TableInputFormat
For at ændre adfærden for getSplits()
metode, en brugerdefineret klasse, der udvider TableInputFormat
er påkrævet. Formålet med getSplits()
her er at dække det logiske nøgleområde i hver region, konstruere deres rækkenøgleområde med deres unikke salt. HTable-klassen leverer metoden getStartEndKeys()
som returnerer start- og slutrækkenøgler for hver region. Fra hver starttast skal du analysere det tilsvarende salt for området.
Par nøgler =table.getStartEndKeys();for (int i =0; iJobkonfiguration passerer logisk nøgleområde
TableInputFormat
henter start- og stop-nøglen fraScan
eksempel. Da vi ikke kan brugeScan
i vores MapReduce-job kunne vi brugeConfiguration
i stedet for at videregive disse to variabler, og kun logisk start- og stopnøgle er god nok (en variabel kan være en dato eller andre forretningsoplysninger).getSplits()
metoden harJobContext
argument, Konfigurationsforekomsten kan læses somcontext.getConfiguration()
.I MapReduce-driveren:
Konfiguration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "2015-04-27");I
Custom TableInputFormat
:@Override offentlig liste getSplits(JobContext context) kaster IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");...}Rekonstruer det saltede nøgleområde efter region
Nu hvor vi har saltet og den logiske start/stop-tast for hver region, kan vi genopbygge det faktiske rækkenøgleområde.
byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);Oprettelse af en tabelopdeling for hver region
Med rækkenøgleinterval kan vi nu initialisere
TableSplit
forekomst for regionen.List splits =new ArrayList(keys.getFirst().length);for (int i =0; iEn ting mere at se på er datalokalitet. Rammen bruger placeringsoplysninger i hver inputopdeling til at tildele en kortopgave i dens lokale vært. Til vores
TableInputFormat
, bruger vi metodengetTableRegionLocation()
for at hente regionsplaceringen, der betjener rækketasten.Denne placering sendes derefter til
TableSplit
konstruktør. Dette vil sikre, at kortlæggeren, der behandler tabelopdelingen, er på den samme regionsserver. En metode kaldetDNS.reverseDns()
, kræver adressen til HBase-navneserveren. Denne attribut er gemt i konfigurationen "hbase.nameserver.address
“.this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);…offentlig streng getTableRegionLocation(HTable table, byte[] rowKey) kaster IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey) ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostname();}return regionLocation; }beskyttet streng reverseDNS(InetAddress ipAddress) kaster NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}retur hostName;}En komplet kode af
getSplits
vil se sådan ud:@Override offentlig liste getSplits(JobContext-kontekst) kaster IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Ingen tabel blev leveret.");}// Hent navneserveradressen og standardværdien er null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Pair nøgler =table.getStartEndKeys();if (nøgler ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Mindst én region forventes");}Listeopdelinger =new ArrayList(keys.getFirst().length);for (int i =0; iBrug Custom TableInoutFormat i MapReduce-driveren
Nu skal vi erstatte
TableInputFormat
klasse med den brugerdefinerede build, vi brugte til tabelopsætning af MapReduce-job.Konfiguration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Konfigurer tabelkortlægningsjobbet */TableMapReduceUtil.initTableMapperJob(tabelnavn,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, true, MultiRangeTableIn);Tilgangen til tilpasset
TableInputFormat
giver en effektiv og skalerbar scanningsfunktion til HBase-tabeller, der er designet til at bruge salt til en afbalanceret databelastning. Da scanningen kan omgå alle urelaterede rækkenøgler, uanset hvor stor tabellen er, er scanningens kompleksitet kun begrænset til størrelsen af måldataene. I de fleste tilfælde kan dette garantere relativt ensartet behandlingstid, efterhånden som tabellen vokser.