Danke an Pengyu Wang, Softwareentwickler bei FINRA, für die Erlaubnis, diesen Beitrag erneut zu veröffentlichen.
Gesalzene Apache HBase-Tabellen mit Pre-Split sind eine bewährte effektive HBase-Lösung, um eine gleichmäßige Workload-Verteilung auf RegionServer bereitzustellen und Hotspots bei Massenschreibvorgängen zu vermeiden. In diesem Design wird ein Zeilenschlüssel mit einem logischen Schlüssel plus Salt am Anfang erstellt. Eine Möglichkeit, Salt zu generieren, besteht darin, n (Anzahl der Regionen) modulo auf dem Hash-Code des logischen Zeilenschlüssels (Datum usw.) zu berechnen.
Salting Row Keys
Beispielsweise könnte eine Tabelle, die täglich Datenlast akzeptiert, logische Zeilenschlüssel verwenden, die mit einem Datum beginnen, und wir möchten diese Tabelle vorab in 1.000 Regionen aufteilen. In diesem Fall erwarten wir, 1.000 verschiedene Salze zu erzeugen. Das Salt kann beispielsweise wie folgt generiert werden:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey = 2015-04-26|abc rowKey = 893|2015-04-26|abc
Die Ausgabe von hashCode()
mit Modulo bietet Zufälligkeit für Salzwerte von „000“ bis „999“. Mit dieser Schlüsseltransformation wird die Tabelle bei ihrer Erstellung an den Salt-Grenzen vorab geteilt. Dadurch werden die Zeilenvolumen beim Laden der HFiles mit MapReduce-Bulkload gleichmäßig verteilt. Es garantiert, dass Zeilenschlüssel mit demselben Salt in dieselbe Region fallen.
In vielen Anwendungsfällen, z. B. bei der Datenarchivierung, müssen Sie die Daten mithilfe des MapReduce-Jobs über einen bestimmten logischen Schlüsselbereich (Datumsbereich) scannen oder kopieren. Standardtabellen-MapReduce-Jobs werden eingerichtet, indem der Scan
bereitgestellt wird Instanz mit Schlüsselbereichsattributen.
Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); scan.setStartRow(Bytes.toBytes("2015-04-26")); scan.setStopRow(Bytes.toBytes("2015-04-27")); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, TableInputFormat.class ); …
Das Einrichten eines solchen Jobs wird jedoch für gesalzene vorab geteilte Tabellen schwierig. Start- und Stoppreihenschlüssel sind für jede Region unterschiedlich, da jede ein einzigartiges Salz hat. Und wir können nicht mehrere Bereiche für einen Scan
angeben Beispiel.
Um dieses Problem zu lösen, müssen wir uns ansehen, wie die Tabelle MapReduce funktioniert. Im Allgemeinen erstellt das MapReduce-Framework eine Zuordnungsaufgabe, um jede Eingabeaufteilung zu lesen und zu verarbeiten. Jeder Split wird im InputFormat
generiert Klassenbasis, durch die Methode getSplits()
.
Im HBase-Tabellen-MapReduce-Job TableInputFormat
wird als InputFormat
verwendet . Innerhalb der Implementierung der getSplits()
-Methode wird überschrieben, um die Start- und Stoppzeilentasten aus dem Scan
abzurufen Beispiel. Da sich die Start- und Stoppzeilenschlüssel über mehrere Regionen erstrecken, wird der Bereich durch Regionsgrenzen unterteilt und die Liste von TableSplit
zurückgegeben Objekte, die den Scan-Key-Bereich abdeckt. Anstatt auf dem HDFS-Block zu basieren, TableSplit
s basieren auf der Region. Durch Überschreiben von getSplits()
-Methode können wir den TableSplit
steuern .
Benutzerdefiniertes TableInputFormat erstellen
Um das Verhalten von getSplits()
zu ändern Methode, eine benutzerdefinierte Klasse, die TableInputFormat
erweitert erforderlich. Der Zweck von getSplits()
Um den logischen Schlüsselbereich in jeder Region abzudecken, konstruieren Sie ihren Zeilenschlüsselbereich mit ihrem einzigartigen Salz. Die Klasse HTable stellt die Methode getStartEndKeys()
bereit die Start- und Endzeilenschlüssel für jede Region zurückgibt. Analysieren Sie von jedem Startschlüssel aus das entsprechende Salt für die Region.
Pair keys = table.getStartEndKeys(); for (int i = 0; i < keys.getFirst().length; i++) { // The first 3 bytes is the salt, for the first region, start key is empty, so apply “000” if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } … }
Auftragskonfiguration durchläuft logischen Schlüsselbereich
TableInputFormat
ruft den Start- und Stoppschlüssel von Scan
ab Beispiel. Da wir Scan
nicht verwenden können In unserem MapReduce-Job könnten wir Configuration
verwenden Stattdessen müssen diese beiden Variablen übergeben werden, und nur eine logische Start- und Stopptaste ist gut genug (eine Variable könnte ein Datum oder eine andere Geschäftsinformation sein). Das getSplits()
Methode hat JobContext
Argument, Die Konfigurationsinstanz kann als context.getConfiguration()
gelesen werden .
Im MapReduce-Treiber:
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27");
Im Custom TableInputFormat
:
@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); String scanStart = conf.get("logical.scan.start"); String scanStop = conf.get("logical.scan.stop"); … }
Rekonstruieren Sie den Salted-Key-Bereich nach Region
Jetzt, da wir den Salt und den logischen Start-/Stoppschlüssel für jede Region haben, können wir den tatsächlichen Zeilenschlüsselbereich neu erstellen.
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
Einen TableSplit für jede Region erstellen
Mit dem Zeilenschlüsselbereich können wir jetzt TableSplit
initialisieren Beispiel für die Region.
List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { … byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); }
Eine weitere zu betrachtende Sache ist die Datenlokalität. Das Framework verwendet Standortinformationen in jeder Eingabeaufteilung, um eine Kartenaufgabe in seinem lokalen Host zuzuweisen. Für unser TableInputFormat
verwenden wir die Methode getTableRegionLocation()
um den Standort der Region abzurufen, der den Zeilenschlüssel bedient.
Dieser Ort wird dann an TableSplit
übergeben Konstrukteur. Dadurch wird sichergestellt, dass sich der Mapper, der die Tabellenaufteilung verarbeitet, auf demselben Regionsserver befindet. Eine Methode namens DNS.reverseDns()
, erfordert die Adresse für den HBase-Nameserver. Dieses Attribut wird in der Konfiguration „hbase.nameserver.address
gespeichert “.
this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null); … public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException { HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress(); InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { regionLocation = regionServerAddress.getHostname(); } return regionLocation; } protected String reverseDNS(InetAddress ipAddress) throws NamingException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; }
Ein vollständiger Code von getSplits
sieht so aus:
@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); table = getHTable(conf); if (table == null) { throw new IOException("No table was provided."); } // Get the name server address and the default value is null. this.nameServer = conf.get("hbase.nameserver.address", null); String scanStart = conf.get("region.scan.start"); String scanStop = conf.get("region.scan.stop"); Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new RuntimeException("At least one region is expected"); } List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]); String regionSalt = null; if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); } log.info("Total table splits: " + splits.size()); return splits; }
Verwenden Sie das benutzerdefinierte TableInoutFormat im MapReduce-Treiber
Jetzt müssen wir das TableInputFormat
ersetzen -Klasse mit dem benutzerdefinierten Build, das wir für die Einrichtung des Tabellen-MapReduce-Jobs verwendet haben.
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); HTableInterface status_table = new HTable(conf, status_tablename); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27"); Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, MultiRangeTableInputFormat.class );
Der Ansatz des benutzerdefinierten TableInputFormat
bietet eine effiziente und skalierbare Scanfunktion für HBase-Tabellen, die für die Verwendung von Salt für eine ausgewogene Datenlast ausgelegt sind. Da der Scan alle nicht verwandten Zeilenschlüssel umgehen kann, unabhängig davon, wie groß die Tabelle ist, ist die Komplexität des Scans nur auf die Größe der Zieldaten beschränkt. In den meisten Anwendungsfällen kann dies eine relativ konsistente Verarbeitungszeit garantieren, wenn die Tabelle wächst.