Dieser Blogbeitrag wurde vor der Fusion mit Cloudera auf Hortonworks.com veröffentlicht. Einige Links, Ressourcen oder Referenzen sind möglicherweise nicht mehr korrekt.
2016 haben wir die zweite Version v1.0.1 von Spark HBase Connector (SHC) veröffentlicht. In diesem Blog gehen wir auf die wichtigsten Funktionen ein, die wir dieses Jahr implementiert haben.
Phoenix-Coder unterstützen
SHC kann verwendet werden, um Daten zur weiteren Downstream-Verarbeitung in den HBase-Cluster zu schreiben. Es unterstützt die Avro-Serialisierung für Ein- und Ausgabedaten und verwendet standardmäßig eine benutzerdefinierte Serialisierung mit einem einfachen nativen Codierungsmechanismus. Beim Lesen von Eingabedaten überträgt SHC Filter auf HBase, um Daten effizient zu scannen. Angesichts der Popularität von Phoenix-Daten in HBase scheint es naheliegend, neben Avro-Daten auch Phoenix-Daten als Eingabe für HBase zu unterstützen. Außerdem scheint die standardmäßige Verwendung der einfachen nativen Binärcodierung anfällig für zukünftige Änderungen zu sein und stellt ein Risiko für Benutzer dar, die Daten von SHC in HBase schreiben. Beispielsweise muss mit SHC die Abwärtskompatibilität ordnungsgemäß gehandhabt werden. Daher muss die Standardeinstellung SHC in ein standardisierteres und gut getestetes Format wie Phoenix geändert werden.
Für die Unterstützung zusammengesetzter Schlüssel musste vor dieser Funktion die Wertlänge jeder Dimension festgelegt werden – mit Ausnahme der letzten Dimension des zusammengesetzten Schlüssels. Diese Einschränkung wurde von Phoenix Coder entfernt. Wenn Benutzer derzeit Phoenix als Datencodierer auswählen, müssen sie nicht die Länge jedes Teils des zusammengesetzten Schlüssels im Katalog angeben.
Da Phoenix der Standardcodierer ist, besteht die einzige Änderung für die Benutzer darin, dass sie, wenn sie PrimitiveType als Datencodierer verwenden möchten, „tableCoder“:„PrimitiveType“ in ihren Katalogen angeben müssen, um SHC mitzuteilen, dass sie stattdessen PrimitiveType verwenden möchten von Phoenix als „tableCoder“.
def catalog =s“““{
|“table“:{“namespace“:“default“, „name“:“table1″, „tableCoder“:“PrimitiveType“},
|“rowkey ”:”key”,
|”columns”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”} ,
|“col1″:{“cf“:“cf1″, „col“:“col1″, „type“:“boolean“},
|“col2″:{“cf“:„cf2″, „col“:“col2″, „type“:„double“},
|“col3″:{“cf“:“cf3″, „col“:“col3″, „type“ :“float“},
|“col4″:{“cf“:“cf4″, „col“:“col4″, „type“:“int“},
|“col5″:{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6 ″, „type“:“smallint“},
|“col7″:{“cf“:“cf7″, „col“:“col7″, „type“:“string“},
|“col8″:{“cf“:“cf8″, „col“:“col8″, „type“:“tinyint“}
|}
|}“““.stripMargin
Spark HBase-Verbindungen zwischenspeichern
SHC hat zuvor keine Verbindungsobjekte zu HBase zwischengespeichert. Insbesondere wurde der Aufruf von „ConnectionFactory.createConnection“ jedes Mal ausgeführt, wenn SHC HBase-Tabellen und -Regionen besuchen musste. Benutzer konnten dies einfach sehen, indem sie sich die Executor-Protokolle ansahen und beobachteten, wie Zookeeper-Verbindungen für jede Anfrage hergestellt wurden. In der Dokumentation der Schnittstelle Connection heißt es, dass die Verbindungserstellung ein schwergewichtiger Vorgang ist und Verbindungsimplementierungen Thread-sicher sind. Daher wäre es für langlebige Prozesse sehr nützlich, wenn SHC eine Verbindung im Cache halten würde. Mit dieser Funktion verringert SHC die Anzahl der erstellten Verbindungen drastisch und verbessert dabei erheblich die Leistung.
Duplizierte Spaltenfamilien unterstützen
SHC hat Unterstützung für duplizierte Spaltenfamilien unterstützt. Jetzt können Benutzer ihre Kataloge wie folgt definieren:
def catalog =s“““{
|“table“:{“namespace“:“default“, „name“:“table1″, „tableCoder“:“PrimitiveType“},
|“rowkey ”:”key”,
|”columns”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”} ,
|“col1″:{“cf“:“cf1″, „col“:“col1″, „type“:“boolean“},
|“col2″:{“cf“:"cf1", "col":"col2", "type":"double"},
|"col3":{"cf":"cf1", "col":"col3", "type" :“float“},
|“col4″:{“cf“:“cf2″, „col“:“col4″, „type“:“int“},
|“col5″:{"cf":"cf2", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf3", "col":"col6 ″, „type“:“smallint“},
|“col7″:{“cf“:“cf3″, „col“:“col7″, „type“:“string“},
|“col8″:{“cf“:“cf3″, „col“:“col8″, „type“:“tinyint“}
|}
|}“““.stripMargin
In der obigen Katalogdefinition haben die Spalten „col0“, „col1“ und „col2“ dieselbe Spaltenfamilie „cf1“.
Spark UnhandledFilters-API verwenden
SHC hat auch die Spark-API unhandledFilters implementiert, was eine effektive Optimierung darstellt. Diese API informiert Spark über Filter, die SHC nicht implementiert, anstatt alle Filter zurückzugeben. Das vorherige Verhalten bestand in diesem Fall darin, alle Filter erneut anzuwenden, sobald Daten in Spark abgerufen wurden. Dies sollte idempotent sein, ändert also keine Daten, kann aber teuer werden, wenn die Filter kompliziert sind.
SHC-Community
Die SHC-Community ist größer und einflussreicher als noch vor einem Jahr. 2016 haben wir beim Hadoop Summit und beim HBase/Spark-Treffen Vorträge gehalten und ausführliche Blogs geschrieben. Mit der steigenden Anzahl von SHC-Benutzern erhalten wir eine höhere Anzahl von Benutzerfragen. Wir freuen uns sehr über die zunehmende Akzeptanz von SHC und wenn Sie Ideen zur weiteren Verbesserung haben, geben Sie uns bitte Feedback über Hortonworks Community Connection.
ANERKENNUNG
Wir möchten dem Team von Bloomberg dafür danken, dass es uns bei dieser Arbeit angeleitet und uns auch dabei geholfen hat, diese Arbeit zu validieren. Wir möchten uns auch bei der HBase-Community für ihr Feedback und die Verbesserung bedanken. Schließlich hat diese Arbeit die Lehren aus früheren Spark-HBase-Integrationen genutzt, und wir möchten ihren Entwicklern dafür danken, dass sie den Weg geebnet haben.
REFERENZ:
SHC: https://github.com/hortonworks-spark/shc
Apache HBase: https://hbase.apache.org/
Apache Spark: http://spark.apache.org/
Apache Phoenix: https://phoenix.apache.org/