Bei Apache HBase geht es darum, Ihnen zufälligen Echtzeit-Lese-/Schreibzugriff auf Ihre Big Data zu geben, aber wie bringen Sie diese Daten überhaupt effizient in HBase? Intuitiv wird ein neuer Benutzer versuchen, dies über die Client-APIs oder durch die Verwendung eines MapReduce-Jobs mit TableOutputFormat zu tun, aber diese Ansätze sind problematisch, wie Sie weiter unten erfahren werden. Stattdessen ist die Massenladefunktion von HBase viel einfacher zu verwenden und kann dieselbe Datenmenge schneller einfügen.
Dieser Blogbeitrag stellt die Grundkonzepte der Massenladefunktion vor, stellt zwei Anwendungsfälle vor und schlägt zwei Beispiele vor.
Überblick über das Massenladen
Wenn eines dieser Symptome bei Ihnen auftritt, ist das Massenladen wahrscheinlich die richtige Wahl für Sie:
- Sie mussten Ihre MemStores anpassen, um den größten Teil des Speichers zu verwenden.
- Sie mussten entweder größere WALs verwenden oder sie vollständig umgehen.
- Ihre Komprimierungs- und Flush-Warteschlangen gehen in die Hunderte.
- Ihr GC ist außer Kontrolle, weil Ihre Einfügungen im MB-Bereich liegen.
- Ihre Latenz überschreitet Ihre SLA, wenn Sie Daten importieren.
Die meisten dieser Symptome werden gemeinhin als „Wachstumsschmerzen“ bezeichnet. Durch das Massenladen können Sie diese vermeiden.
In der HBase-Sprache ist Massenladen der Prozess des Vorbereitens und Ladens von HFiles (das eigene Dateiformat von HBase) direkt in die RegionServer, wodurch der Schreibpfad umgangen und diese Probleme vollständig vermieden werden. Dieser Prozess ähnelt ETL und sieht folgendermaßen aus:
1. Extrahieren Sie die Daten aus einer Quelle, normalerweise Textdateien oder einer anderen Datenbank. HBase verwaltet diesen Teil des Prozesses nicht. Mit anderen Worten, Sie können HBase nicht anweisen, HFiles vorzubereiten, indem Sie sie direkt aus MySQL lesen – Sie müssen dies vielmehr mit Ihren eigenen Mitteln tun. Beispielsweise könnten Sie mysqldump auf einer Tabelle ausführen und die resultierenden Dateien in HDFS hochladen oder einfach Ihre Apache-HTTP-Protokolldateien abrufen. In jedem Fall müssen sich Ihre Daten vor dem nächsten Schritt in HDFS befinden.
2. Wandeln Sie die Daten in HFiles um. Dieser Schritt erfordert einen MapReduce-Job, und für die meisten Eingabetypen müssen Sie den Mapper selbst schreiben. Der Job muss den Zeilenschlüssel als Key und entweder einen KeyValue, einen Put oder einen Delete als Value ausgeben. Der Reducer wird von HBase gehandhabt; Sie konfigurieren es mit HFileOutputFormat.configureIncrementalLoad() und es macht Folgendes:
- Inspiziert die Tabelle, um einen Total Order Partitioner zu konfigurieren
- Lädt die Partitionsdatei in den Cluster hoch und fügt sie dem DistributedCache hinzu
- Setzt die Anzahl der Reduzierungsaufgaben so, dass sie der aktuellen Anzahl von Regionen entspricht
- Legt die Ausgabe-Schlüssel/Wert-Klasse so fest, dass sie den Anforderungen von HFileOutputFormat entspricht
- Stellt den Reducer so ein, dass er die entsprechende Sortierung durchführt (entweder KeyValueSortReducer oder PutSortReducer)
Zu diesem Zeitpunkt wird im Ausgabeordner eine HFile pro Region erstellt. Denken Sie daran, dass die Eingabedaten fast vollständig neu geschrieben werden, sodass Sie mindestens doppelt so viel Speicherplatz benötigen wie die Größe des ursprünglichen Datensatzes. Für einen mysqldump mit 100 GB sollten Sie beispielsweise mindestens 200 GB verfügbaren Festplattenspeicher in HDFS haben. Sie können die Dump-Datei am Ende des Vorgangs löschen.
3. Laden Sie die Dateien in HBase, indem Sie den RegionServern mitteilen, wo sie zu finden sind. Dies ist der einfachste Schritt. Es erfordert die Verwendung von LoadIncrementalHFiles (besser bekannt als CompleteBulkload-Tool) und durch Übergeben einer URL, die die Dateien in HDFS lokalisiert, lädt es jede Datei über den RegionServer, der sie bedient, in die relevante Region. Falls eine Region geteilt wurde, nachdem die Dateien erstellt wurden, teilt das Tool die HFile automatisch entsprechend den neuen Grenzen. Dieser Prozess ist nicht sehr effizient. Wenn Ihre Tabelle also gerade von anderen Prozessen beschrieben wird, ist es am besten, die Dateien zu laden, sobald der Transformationsschritt abgeschlossen ist.
Hier ist eine Illustration dieses Prozesses. Der Datenfluss geht von der ursprünglichen Quelle zu HDFS, wo die RegionServer die Dateien einfach in die Verzeichnisse ihrer Regionen verschieben.
Anwendungsfälle
Ursprünglicher Dataset-Ladevorgang: Alle Benutzer, die von einem anderen Datenspeicher migrieren, sollten diesen Anwendungsfall berücksichtigen. Zuerst müssen Sie das Tabellenschema entwerfen und dann die Tabelle selbst erstellen, vorab aufgeteilt. Die Teilungspunkte müssen die Zeilenschlüsselverteilung und die Anzahl der RegionServer berücksichtigen. Ich empfehle, die Präsentation meines Kollegen Lars George über fortgeschrittenes Schemadesign für jeden ernsthaften Anwendungsfall zu lesen.
Der Vorteil hier ist, dass es viel schneller ist, die Dateien direkt zu schreiben, als den Schreibpfad des RegionServers zu durchlaufen (sowohl in den MemStore als auch in die WAL zu schreiben) und dann schließlich zu leeren, zu komprimieren und so weiter. Das bedeutet auch, dass Sie Ihren Cluster nicht für eine Workload mit vielen Schreibvorgängen optimieren und ihn dann erneut für Ihre normale Workload optimieren müssen.
Inkrementelles Laden: Angenommen, Sie haben einen Datensatz, der derzeit von HBase bereitgestellt wird, aber jetzt müssen Sie mehr Daten im Batch von einem Drittanbieter importieren, oder Sie haben einen nächtlichen Job, der einige Gigabyte generiert, die Sie einfügen müssen. Es ist wahrscheinlich nicht so groß wie der Datensatz, den HBase bereits bereitstellt, aber es kann sich auf das 95. Perzentil Ihrer Latenz auswirken. Das Durchlaufen des normalen Schreibpfads hat den nachteiligen Effekt, dass während des Imports mehr Leerungen und Komprimierungen als normal ausgelöst werden. Diese zusätzliche E/A-Belastung konkurriert mit Ihren latenzempfindlichen Abfragen.
Beispiele
Sie können die folgenden Beispiele in Ihrem eigenen Hadoop-Cluster verwenden, aber die Anweisungen werden für die Cloudera QuickStart VM bereitgestellt, die ein Single-Node-Cluster, ein Gastbetriebssystem und Beispieldaten und Beispiele ist, die in eine VM-Appliance für Ihren Desktop eingebrannt sind.
Nachdem Sie die VM gestartet haben, weisen Sie sie über die automatisch geöffnete Webschnittstelle an, CDH bereitzustellen, und stellen Sie dann sicher, dass der HBase-Dienst ebenfalls gestartet wird.
Eingebauter TSV Bulk Loader
HBase wird mit einem MR-Job geliefert, der eine Datei mit durch Trennzeichen getrennten Werten lesen und direkt in eine HBase-Tabelle ausgeben oder HFiles zum Massenladen erstellen kann. Hier gehen wir zu:
- Holen Sie sich die Beispieldaten und laden Sie sie in HDFS hoch.
- Führen Sie den ImportTsv-Job aus, um die Datei gemäß einer vorkonfigurierten Tabelle in mehrere HFiles umzuwandeln.
- Vorbereiten und Laden der Dateien in HBase.
Der erste Schritt besteht darin, eine Konsole zu öffnen und den folgenden Befehl zu verwenden, um Beispieldaten abzurufen:
curl -O https://people.apache.org/~jdcryans/word_count.csv
Ich habe diese Datei erstellt, indem ich eine Wortzählung auf dem Originalmanuskript genau dieses Blogbeitrags durchgeführt und das Ergebnis dann im CSV-Format ohne Spaltentitel ausgegeben habe. Laden Sie die Datei jetzt in HDFS hoch:
hdfs dfs -put word_count.csv
Nachdem der Extraktionsteil des Massenladens nun abgeschlossen ist, müssen Sie die Datei transformieren. Zuerst müssen Sie die Tabelle entwerfen. Um die Dinge einfach zu halten, nennen Sie es „Wortanzahl“ – die Zeilenschlüssel sind die Wörter selbst und die einzige Spalte enthält die Anzahl, in einer Familie, die wir „f“ nennen. Die beste Vorgehensweise beim Erstellen einer Tabelle besteht darin, sie gemäß der Zeilenschlüsselverteilung aufzuteilen, aber für dieses Beispiel erstellen wir nur fünf Regionen mit gleichmäßig über den Schlüsselraum verteilten Teilungspunkten. Öffnen Sie die hbase-Shell:
hbase shell
Und führen Sie den folgenden Befehl aus, um die Tabelle zu erstellen:
create 'wordcount', {NAME => 'f'}, {SPLITS => ['g', 'm', 'r', 'w']}
Die vier Teilungspunkte erzeugen fünf Regionen, wobei die erste Region mit einem leeren Zeilenschlüssel beginnt. Um bessere Teilungspunkte zu erhalten, könnten Sie auch eine schnelle Analyse durchführen, um zu sehen, wie die Wörter wirklich verteilt sind, aber das überlasse ich Ihnen.
Wenn Sie den Browser Ihrer VM auf http://localhost:60010/ richten, sehen Sie unsere neu erstellte Tabelle und ihre fünf Regionen, die alle dem RegionServer zugewiesen sind.
Jetzt ist es an der Zeit, die schwere Arbeit zu erledigen. Durch Aufrufen der HBase-JAR-Datei in der Befehlszeile mit dem „hadoop“-Skript wird eine Liste der verfügbaren Tools angezeigt. Die gewünschte heißt importtsv und hat die folgende Verwendung:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv ERROR: Wrong number of arguments: 0 Usage: importtsv -Dimporttsv.columns=a,b,c
Die Befehlszeile, die wir verwenden werden, ist die folgende:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0- security.jar importtsv -Dimporttsv.separator=, -Dimporttsv.bulk.output=output -Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
Hier ist ein Überblick über die verschiedenen Konfigurationselemente:
- -Dimporttsv.separator=, gibt an, dass das Trennzeichen ein Komma ist.
- -Dimporttsv.bulk.output=Ausgabe ist ein relativer Pfad, wohin die HFiles geschrieben werden. Da Ihr Benutzer auf der VM standardmäßig „cloudera“ ist, bedeutet dies, dass sich die Dateien in /user/cloudera/output befinden. Wenn Sie diese Option überspringen, schreibt der Job direkt in HBase.
- -Dimporttsv.columns=HBASE_ROW_KEY,f:count ist eine Liste aller in dieser Datei enthaltenen Spalten. Der Zeilenschlüssel muss mit der Zeichenfolge HBASE_ROW_KEY in Großbuchstaben identifiziert werden; Andernfalls wird der Job nicht gestartet. (Ich habe mich für den Qualifizierer „count“ entschieden, aber es könnte alles andere sein.)
Angesichts der geringen Eingabegröße sollte der Auftrag innerhalb einer Minute abgeschlossen sein. Beachten Sie, dass fünf Reducer ausgeführt werden, einer pro Region. Hier ist das Ergebnis auf HDFS:
-rw-r--r-- 3 cloudera cloudera 4265 2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b -rw-r--r-- 3 cloudera cloudera 3163 2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36 -rw-r--r-- 3 cloudera cloudera 2487 2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2 -rw-r--r-- 3 cloudera cloudera 2961 2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914 -rw-r--r-- 3 cloudera cloudera 1336 2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f
Wie Sie sehen können, gehören die Dateien derzeit dem Benutzer „cloudera“. Um sie zu laden, müssen wir den Besitzer auf „hbase“ ändern, sonst hat HBase nicht die Berechtigung, die Dateien zu verschieben. Führen Sie den folgenden Befehl aus:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output
Für den letzten Schritt müssen wir das Completebulkload-Tool verwenden, um darauf hinzuweisen, wo sich die Dateien befinden und in welche Tabellen wir laden:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
Zurück in der HBase-Shell können Sie den count-Befehl ausführen, der Ihnen anzeigt, wie viele Zeilen geladen wurden. Wenn Sie vergessen haben zu chown, bleibt der Befehl hängen.
Benutzerdefinierter MR-Job
Der TSV-Bulkloader eignet sich gut für das Prototyping, aber da er alles als Zeichenfolgen interpretiert und die Manipulation der Felder zur Transformationszeit nicht unterstützt, müssen Sie am Ende Ihren eigenen MR-Job schreiben. Mein Kollege James Kinley, der als Lösungsarchitekt in Europa arbeitet, hat einen solchen Job geschrieben, den wir für unser nächstes Beispiel verwenden werden. Die Daten für den Job enthalten öffentliche Facebook- und Twitter-Nachrichten im Zusammenhang mit dem NBA-Finale 2010 (Spiel 1) zwischen den Lakers und den Celtics. Den Code finden Sie hier. (Auf der Schnellstart-VM sind Git und Maven installiert, sodass Sie das Repository darauf klonen können.)
Betrachtet man die Driver-Klasse, so sind die wichtigsten Bits die folgenden:
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); … // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable);
Zuerst muss Ihr Mapper ein ImmutableBytesWritable ausgeben, das den Zeilenschlüssel enthält, und der Ausgabewert kann entweder ein KeyValue, ein Put oder ein Delete sein. Das zweite Snippet zeigt, wie man den Reducer konfiguriert; es wird tatsächlich vollständig von HFileOutputFormat gehandhabt. configureIncrementalLoad() wie zuvor im Abschnitt „Transformieren“ beschrieben.
Die HBaseKVMapper-Klasse enthält nur den Mapper, der die konfigurierten Ausgabeschlüssel und -werte berücksichtigt:
public class HBaseKVMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
Um es auszuführen, müssen Sie das Projekt mit Maven kompilieren und die Datendateien abrufen, indem Sie den Links in der README folgen. (Es enthält auch das Shell-Skript zum Erstellen der Tabelle.) Bevor Sie den Job starten, vergessen Sie nicht, die Dateien in HDFS hochzuladen und Ihren Klassenpfad so einzustellen, dass er HBase kennt, da Sie dieses Mal nicht dessen JAR-Datei verwenden werden :
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*
Sie können den Job mit einer ähnlichen Befehlszeile wie dieser starten:
hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar com.cloudera.examples.hbase.bulkimport.Driver -libjars /home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar, /home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010
Wie Sie sehen können, müssen die Abhängigkeiten des Jobs separat hinzugefügt werden. Schließlich können Sie die Dateien laden, indem Sie zuerst ihren Eigentümer ändern und dann das CompleteBulkload-Tool ausführen:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010
Mögliche Probleme
Kürzlich gelöschte Daten tauchen wieder auf. Dieses Problem tritt auf, wenn ein Delete über einen Massenladevorgang eingefügt und stark komprimiert wird, während sich der entsprechende Put noch in einem MemStore befindet. Die Daten gelten als gelöscht, wenn sich Delete in einer HFile befindet, aber sobald es während der Komprimierung entfernt wird, wird Put wieder sichtbar. Wenn Sie einen solchen Anwendungsfall haben, ziehen Sie in Betracht, Ihre Spaltenfamilien so zu konfigurieren, dass die gelöschten Zellen mit KEEP_DELETED_CELLS in der Shell oder HColumnDescriptor.setKeepDeletedCells() beibehalten werden.
Massengeladene Daten können nicht von einem anderen Massenladen überschrieben werden. Dieses Problem tritt auf, wenn zwei massengeladene HFiles, die zu unterschiedlichen Zeiten geladen wurden, versuchen, einen anderen Wert in dieselbe Zelle zu schreiben, was bedeutet, dass sie denselben Zeilenschlüssel, dieselbe Familie, dasselbe Qualifikationsmerkmal und denselben Zeitstempel haben. Das Ergebnis ist, dass der erste eingefügte Wert anstelle des zweiten zurückgegeben wird. Dieser Fehler wird in HBase 0.96.0 und CDH 5 (der nächsten CDH-Hauptversion) behoben und es wird an HBASE-8521 für den 0.94-Zweig und CDH 4 gearbeitet.
Massenladen löst große Komprimierungen aus. Dieses Problem tritt auf, wenn Sie inkrementelle Massenladevorgänge durchführen und genügend massengeladene Dateien vorhanden sind, um eine geringfügige Komprimierung auszulösen (der Standardschwellenwert ist 3). Die HFiles werden mit einer auf 0 gesetzten Sequenznummer geladen, sodass sie zuerst abgeholt werden, wenn der RegionServer Dateien für eine Komprimierung auswählt, und aufgrund eines Fehlers auch alle verbleibenden Dateien auswählt. Dieses Problem wird diejenigen ernsthaft treffen, die bereits über große Regionen (mehrere GB) verfügen oder häufig Massenladungen durchführen (alle paar Stunden und weniger), da viele Daten komprimiert werden. HBase 0.96.0 hat die richtige Lösung, ebenso wie CDH 5; HBASE-8521 behebt das Problem in 0.94, da den massengeladenen HFiles jetzt eine richtige Sequenznummer zugewiesen wird. HBASE-8283 kann mit hbase.hstore.useExploringCompation nach 0.94.9 und CDH 4.4.0 aktiviert werden, um dieses Problem zu mindern, indem es einfach ein intelligenterer Komprimierungsauswahlalgorithmus ist.
Massengeladene Daten werden nicht repliziert . Da das Massenladen den Schreibpfad umgeht, wird die WAL nicht als Teil des Prozesses beschrieben. Die Replikation funktioniert durch Lesen der WAL-Dateien, sodass die massengeladenen Daten nicht angezeigt werden – und dasselbe gilt für die Bearbeitungen, die Put.setWriteToWAL(true) verwenden. Eine Möglichkeit, damit umzugehen, besteht darin, die Rohdateien oder die HFiles an den anderen Cluster zu senden und die andere Verarbeitung dort durchzuführen.
Schlussfolgerung
Das Ziel dieses Blogbeitrags war es, Ihnen die grundlegenden Konzepte des Massenladens von Apache HBase vorzustellen. Wir haben erklärt, wie der Prozess ETL ähnelt und dass er für große Datensätze viel besser ist als die Verwendung der normalen API, da der Schreibpfad umgangen wird. Die beiden Beispiele wurden eingefügt, um zu zeigen, wie einfach TSV-Dateien in HBase geladen werden können und wie Sie Ihren eigenen Mapper für andere Datenformate schreiben können.
Jetzt können Sie dasselbe mit einer grafischen Benutzeroberfläche über Hue versuchen.