HBase
 sql >> Datenbank >  >> NoSQL >> HBase

Anleitung:Scannen Sie gesalzene Apache HBase-Tabellen mit regionsspezifischen Schlüsselbereichen in MapReduce

Danke an Pengyu Wang, Softwareentwickler bei FINRA, für die Erlaubnis, diesen Beitrag erneut zu veröffentlichen.

Gesalzene Apache HBase-Tabellen mit Pre-Split sind eine bewährte effektive HBase-Lösung, um eine gleichmäßige Workload-Verteilung auf RegionServer bereitzustellen und Hotspots bei Massenschreibvorgängen zu vermeiden. In diesem Design wird ein Zeilenschlüssel mit einem logischen Schlüssel plus Salt am Anfang erstellt. Eine Möglichkeit, Salt zu generieren, besteht darin, n (Anzahl der Regionen) modulo auf dem Hash-Code des logischen Zeilenschlüssels (Datum usw.) zu berechnen.

Salting Row Keys

Beispielsweise könnte eine Tabelle, die täglich Datenlast akzeptiert, logische Zeilenschlüssel verwenden, die mit einem Datum beginnen, und wir möchten diese Tabelle vorab in 1.000 Regionen aufteilen. In diesem Fall erwarten wir, 1.000 verschiedene Salze zu erzeugen. Das Salt kann beispielsweise wie folgt generiert werden:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey 

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

Die Ausgabe von hashCode() mit Modulo bietet Zufälligkeit für Salzwerte von „000“ bis „999“. Mit dieser Schlüsseltransformation wird die Tabelle bei ihrer Erstellung an den Salt-Grenzen vorab geteilt. Dadurch werden die Zeilenvolumen beim Laden der HFiles mit MapReduce-Bulkload gleichmäßig verteilt. Es garantiert, dass Zeilenschlüssel mit demselben Salt in dieselbe Region fallen.

In vielen Anwendungsfällen, z. B. bei der Datenarchivierung, müssen Sie die Daten mithilfe des MapReduce-Jobs über einen bestimmten logischen Schlüsselbereich (Datumsbereich) scannen oder kopieren. Standardtabellen-MapReduce-Jobs werden eingerichtet, indem der Scan bereitgestellt wird Instanz mit Schlüsselbereichsattributen.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
TableInputFormat.class
);
…

Das Einrichten eines solchen Jobs wird jedoch für gesalzene vorab geteilte Tabellen schwierig. Start- und Stoppreihenschlüssel sind für jede Region unterschiedlich, da jede ein einzigartiges Salz hat. Und wir können nicht mehrere Bereiche für einen Scan angeben Beispiel.

Um dieses Problem zu lösen, müssen wir uns ansehen, wie die Tabelle MapReduce funktioniert. Im Allgemeinen erstellt das MapReduce-Framework eine Zuordnungsaufgabe, um jede Eingabeaufteilung zu lesen und zu verarbeiten. Jeder Split wird im InputFormat generiert Klassenbasis, durch die Methode getSplits() .

Im HBase-Tabellen-MapReduce-Job TableInputFormat wird als InputFormat verwendet . Innerhalb der Implementierung der getSplits() -Methode wird überschrieben, um die Start- und Stoppzeilentasten aus dem Scan abzurufen Beispiel. Da sich die Start- und Stoppzeilenschlüssel über mehrere Regionen erstrecken, wird der Bereich durch Regionsgrenzen unterteilt und die Liste von TableSplit zurückgegeben Objekte, die den Scan-Key-Bereich abdeckt. Anstatt auf dem HDFS-Block zu basieren, TableSplit s basieren auf der Region. Durch Überschreiben von getSplits() -Methode können wir den TableSplit steuern .

Benutzerdefiniertes TableInputFormat erstellen

Um das Verhalten von getSplits() zu ändern Methode, eine benutzerdefinierte Klasse, die TableInputFormat erweitert erforderlich. Der Zweck von getSplits() Um den logischen Schlüsselbereich in jeder Region abzudecken, konstruieren Sie ihren Zeilenschlüsselbereich mit ihrem einzigartigen Salz. Die Klasse HTable stellt die Methode getStartEndKeys() bereit die Start- und Endzeilenschlüssel für jede Region zurückgibt. Analysieren Sie von jedem Startschlüssel aus das entsprechende Salt für die Region.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}

Auftragskonfiguration durchläuft logischen Schlüsselbereich

TableInputFormat ruft den Start- und Stoppschlüssel von Scan ab Beispiel. Da wir Scan nicht verwenden können In unserem MapReduce-Job könnten wir Configuration verwenden Stattdessen müssen diese beiden Variablen übergeben werden, und nur eine logische Start- und Stopptaste ist gut genug (eine Variable könnte ein Datum oder eine andere Geschäftsinformation sein). Das getSplits() Methode hat JobContext Argument, Die Konfigurationsinstanz kann als context.getConfiguration() gelesen werden .

Im MapReduce-Treiber:

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Im Custom TableInputFormat :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}

Rekonstruieren Sie den Salted-Key-Bereich nach Region

Jetzt, da wir den Salt und den logischen Start-/Stoppschlüssel für jede Region haben, können wir den tatsächlichen Zeilenschlüsselbereich neu erstellen.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Einen TableSplit für jede Region erstellen

Mit dem Zeilenschlüsselbereich können wir jetzt TableSplit initialisieren Beispiel für die Region.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

Eine weitere zu betrachtende Sache ist die Datenlokalität. Das Framework verwendet Standortinformationen in jeder Eingabeaufteilung, um eine Kartenaufgabe in seinem lokalen Host zuzuweisen. Für unser TableInputFormat verwenden wir die Methode getTableRegionLocation() um den Standort der Region abzurufen, der den Zeilenschlüssel bedient.

Dieser Ort wird dann an TableSplit übergeben Konstrukteur. Dadurch wird sichergestellt, dass sich der Mapper, der die Tabellenaufteilung verarbeitet, auf demselben Regionsserver befindet. Eine Methode namens DNS.reverseDns() , erfordert die Adresse für den HBase-Nameserver. Dieses Attribut wird in der Konfiguration „hbase.nameserver.address gespeichert “.

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…

public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

Ein vollständiger Code von getSplits sieht so aus:

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Verwenden Sie das benutzerdefinierte TableInoutFormat im MapReduce-Treiber

Jetzt müssen wir das TableInputFormat ersetzen -Klasse mit dem benutzerdefinierten Build, das wir für die Einrichtung des Tabellen-MapReduce-Jobs verwendet haben.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
MultiRangeTableInputFormat.class
);

Der Ansatz des benutzerdefinierten TableInputFormat bietet eine effiziente und skalierbare Scanfunktion für HBase-Tabellen, die für die Verwendung von Salt für eine ausgewogene Datenlast ausgelegt sind. Da der Scan alle nicht verwandten Zeilenschlüssel umgehen kann, unabhängig davon, wie groß die Tabelle ist, ist die Komplexität des Scans nur auf die Größe der Zieldaten beschränkt. In den meisten Anwendungsfällen kann dies eine relativ konsistente Verarbeitungszeit garantieren, wenn die Tabelle wächst.