HBase
 sql >> Datenbank >  >> NoSQL >> HBase

Spark-on-HBase:DataFrame-basierter HBase-Konnektor

Dieser Blogbeitrag wurde vor der Fusion mit Cloudera auf Hortonworks.com veröffentlicht. Einige Links, Ressourcen oder Referenzen sind möglicherweise nicht mehr korrekt.

Wir sind stolz darauf, die technische Vorschau des Spark-HBase Connectors bekannt zu geben, der von Hortonworks in Zusammenarbeit mit Bloomberg entwickelt wurde.

Der Spark-HBase-Konnektor nutzt die in Spark-1.2.0 eingeführte Datenquellen-API (SPARK-3247). Es überbrückt die Lücke zwischen dem einfachen HBase-Schlüsselwertspeicher und komplexen relationalen SQL-Abfragen und ermöglicht es Benutzern, mithilfe von Spark komplexe Datenanalysen auf HBase durchzuführen. Ein HBase-DataFrame ist ein Standard-Spark-DataFrame und kann mit beliebigen anderen Datenquellen wie Hive, ORC, Parquet, JSON usw. interagieren.

Hintergrund

Es gibt mehrere Open-Source-Spark-HBase-Konnektoren, die entweder als Spark-Pakete, als unabhängige Projekte oder im HBase-Trunk verfügbar sind.

Spark ist zu den Dataset/DataFrame-APIs übergegangen, die eine integrierte Abfrageplanoptimierung bieten. Jetzt ziehen es Endbenutzer vor, eine auf DataFrames/Datasets basierende Schnittstelle zu verwenden.

Der HBase-Konnektor im HBase-Trunk hat eine reichhaltige Unterstützung auf RDD-Ebene, z. BulkPut usw., aber die DataFrame-Unterstützung ist nicht so umfangreich. Der HBase-Trunk-Konnektor basiert auf dem standardmäßigen HadoopRDD, wobei das in HBase integrierte TableInputFormat einige Leistungseinschränkungen aufweist. Darüber hinaus kann BulkGet, das im Treiber ausgeführt wird, ein Single Point of Failure sein.

Es gibt einige andere alternative Implementierungen. Nehmen Sie Spark-SQL-on-HBase als Beispiel. Es wendet sehr fortschrittliche benutzerdefinierte Optimierungstechniken an, indem es seinen eigenen Abfrageoptimierungsplan in die standardmäßige Spark Catalyst-Engine einbettet, das RDD an HBase sendet und komplizierte Aufgaben, wie z. B. eine teilweise Aggregation, innerhalb des HBase-Coprozessors durchführt. Dieser Ansatz kann eine hohe Leistung erzielen, ist jedoch aufgrund seiner Komplexität und der schnellen Entwicklung von Spark nur schwer zu warten. Auch das Ausführen von beliebigem Code innerhalb eines Koprozessors kann Sicherheitsrisiken mit sich bringen.

Der Spark-on-HBase Connector (SHC) wurde entwickelt, um diese potenziellen Engpässe und Schwächen zu überwinden. Es implementiert die Standard-Spark-Datenquellen-API und nutzt die Spark Catalyst-Engine zur Abfrageoptimierung. Parallel dazu wird das RDD von Grund auf neu erstellt, anstatt TableInputFormat zu verwenden um Höchstleistungen zu erzielen. Mit diesem angepassten RDD können alle kritischen Techniken angewendet und vollständig implementiert werden, wie z. B. Partitionsbereinigung, Spaltenbereinigung, Prädikat-Pushdown und Datenlokalität. Das Design macht die Wartung sehr einfach und erreicht gleichzeitig einen guten Kompromiss zwischen Leistung und Einfachheit.

Architektur

Wir gehen davon aus, dass Spark und HBase im selben Cluster bereitgestellt werden und Spark-Executors sich zusammen mit Regionsservern befinden, wie in der folgenden Abbildung dargestellt.

Abbildung 1. Spark-on-HBase-Connector-Architektur

Auf hoher Ebene behandelt der Konnektor sowohl Scan als auch Get auf ähnliche Weise, und beide Aktionen werden in den Executoren ausgeführt. Der Treiber verarbeitet die Abfrage, aggregiert Scans/Abrufe basierend auf den Metadaten der Region und generiert Aufgaben pro Region. Die Aufgaben werden an die bevorzugten Ausführenden gesendet, die sich am gleichen Standort wie der Regionsserver befinden, und werden in den Ausführenden parallel ausgeführt, um eine bessere Datenlokalität und Parallelität zu erreichen. Wenn eine Region die erforderlichen Daten nicht enthält, wird diesem Regionsserver keine Aufgabe zugewiesen. Eine Aufgabe kann aus mehreren Scans und BulkGets bestehen, und die Datenanforderungen einer Aufgabe werden nur von einem Regionsserver abgerufen, und dieser Regionsserver ist auch die Standortpräferenz für die Aufgabe. Beachten Sie, dass der Fahrer nicht an der eigentlichen Jobausführung beteiligt ist, mit Ausnahme der Planung von Aufgaben. Dadurch wird vermieden, dass der Treiber zum Flaschenhals wird.

Tabellenkatalog

Um die HBase-Tabelle als relationale Tabelle in Spark zu integrieren, definieren wir eine Zuordnung zwischen HBase- und Spark-Tabellen, die als Tabellenkatalog bezeichnet wird. Dieser Katalog besteht aus zwei kritischen Teilen. Die eine ist die Rowkey-Definition und die andere die Zuordnung zwischen der Tabellenspalte in Spark und der Spaltenfamilie und dem Spaltenqualifizierer in HBase. Weitere Informationen finden Sie im Abschnitt Verwendung.

Native Avro-Unterstützung

Der Konnektor unterstützt das Avro-Format nativ, da es gängige Praxis ist, strukturierte Daten als Byte-Array in HBase zu speichern. Der Benutzer kann den Avro-Datensatz direkt in HBase speichern. Intern wird das Avro-Schema automatisch in einen nativen Spark Catalyst-Datentyp konvertiert. Beachten Sie, dass beide Schlüsselwertteile in einer HBase-Tabelle im Avro-Format definiert werden können. Die genaue Verwendung entnehmen Sie bitte den Beispielen/Testfällen im Repo.

Prädikat-Pushdown

Der Konnektor ruft nur die erforderlichen Spalten vom Regionsserver ab, um den Netzwerkaufwand zu reduzieren und eine redundante Verarbeitung in der Spark Catalyst-Engine zu vermeiden. Vorhandene Standard-HBase-Filter werden verwendet, um Prädikat-Pushdown durchzuführen, ohne die Coprozessorfähigkeit zu nutzen. Da HBase den Datentyp mit Ausnahme von Byte-Array und die Inkonsistenz der Reihenfolge zwischen Java-Primitive-Typen und Byte-Array nicht kennt, müssen wir die Filterbedingung vorverarbeiten, bevor wir den Filter im Scan-Vorgang festlegen, um Datenverluste zu vermeiden. Innerhalb des Regionsservers werden Datensätze herausgefiltert, die nicht der Abfragebedingung entsprechen.

Partitionsbereinigung

Durch Extrahieren des Zeilenschlüssels aus den Prädikaten teilen wir Scan/BulkGet in mehrere nicht überlappende Bereiche auf, nur die Regionsserver, die über die angeforderten Daten verfügen, führen Scan/BulkGet aus. Derzeit wird die Partitionsbereinigung für die erste Dimension der Zeilenschlüssel durchgeführt. Wenn beispielsweise ein Zeilenschlüssel „Schlüssel1:Schlüssel2:Schlüssel3“ lautet, basiert die Partitionsbereinigung nur auf „Schlüssel1“. Beachten Sie, dass die WHERE-Bedingungen sorgfältig definiert werden müssen. Andernfalls wird die Partitionsbereinigung möglicherweise nicht wirksam. Zum Beispiel führt WHERE rowkey1> „abc“ OR column =„xyz“ (wobei rowkey1 die erste Dimension des rowkey und column eine reguläre hbase-Spalte ist) zu einem vollständigen Scan, da wir alle Bereiche abdecken müssen, weil des ODER Logik.

Datenlokalität

Wenn sich ein Spark-Executor zusammen mit HBase-Regionsservern befindet, wird die Datenlokalität erreicht, indem der Standort des Regionsservers identifiziert wird, und es wird nach besten Kräften versucht, die Aufgabe zusammen mit dem Regionsserver zu lokalisieren. Jeder Ausführende führt Scan/BulkGet auf dem Teil der Daten durch, die sich auf demselben Host befinden.

Scannen und BulkGet

Diese beiden Operatoren werden Benutzern durch die Angabe von WHERE CLAUSE angezeigt, z. B. WHERE Spalte> x und Spalte für scan und WHERE Spalte =x vergessen. Die Operationen werden in den Executoren durchgeführt, und der Treiber baut nur diese Operationen auf. Intern werden sie zum Scannen und/oder Abrufen konvertiert, und Iterator[Row] wird zur Verarbeitung der oberen Schicht an die Catalyst-Engine zurückgegeben.

Verwendung

Im Folgenden wird das grundlegende Verfahren zur Verwendung des Connectors veranschaulicht. Weitere Details und fortgeschrittene Anwendungsfälle, wie Avro und Unterstützung für zusammengesetzte Schlüssel, finden Sie in den Beispielen im Repository.

1) Definieren Sie den Katalog für das Schema-Mapping:

[code language="scala"]def catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},        |"rowkey":"key" ,        |"columns":{          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},          |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},          |"col4":{"cf":"cf4", "col":" col4", "type":"int"},          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},          |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}        |}      |}""".stripMargin[/code] 

2) Bereiten Sie die Daten vor und füllen Sie die HBase-Tabelle:
Fallklasse HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

object HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s"String$i:$t",      i.toByte) }}

val data =(0 to 255).map { i =>  HBaseRecord(i, „extra“)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> „5“))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Laden Sie den DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( „org.apache.spark.sql.execution.datasources.hbase“)
 .load()
}

val df =withCatalog(Katalog)

4) Sprachintegrierte Abfrage:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
 $”col0″ ===“row005” ||
 $“col0″ ===„row020“ ||
 $“col0″ === “r20“ ||
 $“col0″ <=„row005“) &&
 ($“col4″ ===1 ||
 $“col4″ ===42))
 .select(“col0“, „col1“, „col4“)
s .show

5) SQL-Abfrage:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

Spark-Paket konfigurieren

Benutzer können den Spark-on-HBase-Konnektor als standardmäßiges Spark-Paket verwenden. Um das Paket in Ihre Spark-Anwendung aufzunehmen, verwenden Sie:

spark-shell, pyspark oder spark-submit

> $SPARK_HOME/bin/spark-shell –Pakete zhzhan:shc:0.0.11-1.6.1-s_2.10

Benutzer können das Paket auch als Abhängigkeit in Ihre SBT-Datei aufnehmen. Das Format ist Spark-Paketname:Version

spDependencies +=„zhzhan/shc:0.0.11-1.6.1-s_2.10“

Ausführung im sicheren Cluster

Für die Ausführung in einem Kerberos-fähigen Cluster muss der Benutzer HBase-bezogene JAR-Dateien in den Klassenpfad aufnehmen, da das Abrufen und Erneuern des HBase-Tokens von Spark durchgeführt wird und unabhängig vom Konnektor ist. Mit anderen Worten, der Benutzer muss die Umgebung auf normale Weise initialisieren, entweder durch kinit oder durch Bereitstellen von Prinzipal/Keytab. Die folgenden Beispiele zeigen, wie in einem sicheren Cluster sowohl im Garn-Client- als auch im Garn-Cluster-Modus ausgeführt wird. Beachten Sie, dass SPARK_CLASSPATH für beide Modi festgelegt werden muss und das Beispiel-Jar nur ein Platzhalter für Spark ist.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Angenommen, hrt_qa ist ein Headless-Konto, der Benutzer kann den folgenden Befehl für kinit verwenden:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master-garn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master-garn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Alles zusammenfügen

Wir haben gerade einen kurzen Überblick darüber gegeben, wie HBase Spark auf DataFrame-Ebene unterstützt. Mit der DataFrame-API können Spark-Anwendungen mit in HBase-Tabellen gespeicherten Daten genauso einfach arbeiten wie mit Daten, die in anderen Datenquellen gespeichert sind. Mit dieser neuen Funktion können Daten in HBase-Tabellen problemlos von Spark-Anwendungen und anderen interaktiven Tools, z. Benutzer können eine komplexe SQL-Abfrage auf einer HBase-Tabelle in Spark ausführen, eine Tabellenverknüpfung mit Dataframe durchführen oder mit Spark Streaming integrieren, um ein komplizierteres System zu implementieren.

Was kommt als Nächstes?

Derzeit wird der Connector im Hortonworks-Repository gehostet und als Spark-Paket veröffentlicht. Es wird gerade zum Apache HBase-Trunk migriert. Während der Migration haben wir einige kritische Fehler im HBase-Trunk identifiziert, die zusammen mit der Zusammenführung behoben werden. Die Community-Arbeit wird von der Dach-HBase JIRA verfolgt:HBASE-14789, einschließlich HBASE-14795 und HBASE-14796, um die zugrunde liegende Computerarchitektur für Scan und BulkGet zu optimieren, HBASE-14801, um eine JSON-Benutzeroberfläche für eine einfache Verwendung bereitzustellen, HBASE-15336 für der DataFrame-Schreibpfad, HBASE-15334 für Avro-Unterstützung, HBASE-15333 zur Unterstützung von Java-Grundtypen wie short, int, long, float und double usw., HBASE-15335 zur Unterstützung von zusammengesetzten Zeilenschlüsseln und HBASE-15572 um eine optionale Zeitstempelsemantik hinzuzufügen. Wir freuen uns darauf, eine zukünftige Version des Connectors zu produzieren, die die Arbeit mit dem Connector noch einfacher macht.

Bestätigung

Wir möchten Hamel Kothari, Sudarshan Kadambi und dem Team von Bloomberg dafür danken, dass sie uns bei dieser Arbeit angeleitet und uns auch dabei geholfen haben, diese Arbeit zu validieren. Wir möchten uns auch bei der HBase-Community für ihr Feedback und die Verbesserung bedanken. Schließlich hat diese Arbeit die Lehren aus früheren Spark-HBase-Integrationen genutzt, und wir möchten ihren Entwicklern dafür danken, dass sie den Weg geebnet haben.

Referenz:

SHC:https://github.com/hortonworks/shc-release

Spark-Paket:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/