HBase
 sql >> Datenbank >  >> NoSQL >> HBase

Erstellen einer Anwendung für maschinelles Lernen mit Cloudera Data Science Workbench und Betriebsdatenbank, Teil 1:Einrichtung und Grundlagen

Einführung

Python wird häufig von Dateningenieuren und Datenwissenschaftlern verwendet, um alle möglichen Probleme zu lösen, von ETL/ELT-Pipelines bis hin zum Erstellen von Modellen für maschinelles Lernen. Apache HBase ist ein effektives Datenspeichersystem für viele Arbeitsabläufe, aber der Zugriff auf diese Daten speziell über Python kann schwierig sein. Für Datenprofis, die in HBase gespeicherte Daten nutzen möchten, kann das aktuelle Upstream-Projekt „hbase-connectors“ mit PySpark für grundlegende Vorgänge verwendet werden.

In dieser Blogserie erklären wir, wie PySpark und HBase zusammen für die grundlegende Nutzung von Spark sowie für Jobs konfiguriert werden, die in CDSW verwaltet werden. Für diejenigen, die mit CDSW nicht vertraut sind:Es ist eine sichere Self-Service-Data-Science-Plattform für Unternehmen, mit der Data Scientists ihre eigenen Analyse-Pipelines verwalten und so Machine-Learning-Projekte von der Exploration bis zur Produktion beschleunigen können. Weitere Informationen zu CDSW finden Sie auf der Produktseite von Cloudera Data Science Workbench.

In diesem Beitrag werden mehrere Operationen erklärt und zusammen mit Beispielausgaben demonstriert. Für den Kontext werden alle Beispieloperationen in diesem speziellen Blogbeitrag mit einer CDSW-Bereitstellung ausgeführt.

Voraussetzungen:

  1. Haben Sie einen CDP-Cluster mit HBase und Spark
  2. Wenn Sie Beispielen über CDSW folgen möchten, muss es installiert sein – Cloudera Data Science Workbench installieren
  3. Python 3 wird auf jedem Knoten unter demselben Pfad installiert

Konfiguration:

Zunächst müssen HBase und Spark zusammen konfiguriert werden, damit Spark-SQL-Abfragen ordnungsgemäß funktionieren. Dazu sind zwei Teile erforderlich:Konfigurieren Sie zunächst die HBase-Regionsserver über Cloudera Manager; Stellen Sie zweitens sicher, dass die Spark-Laufzeit über HBase-Bindungen verfügt. Beachten Sie jedoch, dass Cloudera Manager bereits einige Konfigurations- und Umgebungsvariablen einrichtet, um Spark automatisch für Sie auf HBase zu verweisen. Nichtsdestotrotz ist der erste Schritt der Konfiguration von Spark-SQL-Abfragen bei allen Bereitstellungstypen auf CDP-Clustern gleich, der zweite unterscheidet sich jedoch je nach Bereitstellungstyp geringfügig.

Konfigurieren von HBase-Regionsservern

  1. Gehen Sie zu Cloudera Manager und wählen Sie den HBase-Dienst aus.
  2. Suchen Sie nach „Regionserver-Umgebung“

  1. Fügen Sie eine neue Umgebungsvariable hinzu, indem Sie das RegionServer Environment Advanced Configuration Snippet verwenden (Sicherheitsventil):
    • Schlüssel:HBASE_CLASSPATH
    • Wert:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
      Stellen Sie sicher, dass Sie die richtigen Versionsnummern verwenden.
  2. Regionsserver neu starten.

Nachdem Sie die obigen Schritte ausgeführt haben, führen Sie die folgenden Schritte aus, je nachdem, ob Sie eine CDSW- oder eine Nicht-CDSW-Bereitstellung wünschen.

Hinzufügen von HBase-Bindungen zur Spark-Laufzeit in Nicht-CDSW-Bereitstellungen

Um die Shell bereitzustellen oder Spark-Submit korrekt zu verwenden, verwenden Sie die folgenden Befehle, um sicherzustellen, dass Spark über die richtigen HBase-Bindungen verfügt.

pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. Glas

spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shaded.jar

Hinzufügen von HBase-Bindungen zur Spark-Laufzeit in CDSW-Bereitstellungen

Um CDSW mit HBase und PySpark zu konfigurieren, müssen Sie einige Schritte ausführen.

1) Stellen Sie sicher, dass Python 3 auf jedem Cluster-Knoten installiert ist, und notieren Sie sich den Pfad dazu

2) Erstellen Sie ein neues Projekt in CDSW und verwenden Sie eine PySpark-Vorlage

3) Öffnen Sie das Projekt, gehen Sie zu Einstellungen -> Engine -> Umgebungsvariablen.

4) Stellen Sie PYSPARK3_DRIVER_PYTHON ein und PYSPARK3_PYTHON in den Pfad, in dem Python auf Ihren Cluster-Knoten installiert ist (Pfad in Schritt 1 notiert).

Unten ist ein Beispiel, wie es aussehen sollte.

5) Gehen Sie in Ihrem Projekt zu Files -> spark-defaults.conf und öffnen Sie es in der Workbench

6) Kopieren Sie die folgende Zeile, fügen Sie sie in diese Datei ein und stellen Sie sicher, dass sie gespeichert ist, bevor Sie eine neue Sitzung starten.

spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar

An diesem Punkt ist CDSW jetzt so konfiguriert, dass PySpark-Jobs auf HBase ausgeführt werden! Der Rest dieses Blogbeitrags bezieht sich auf einige Beispielvorgänge bei einer CDSW-Bereitstellung.

Beispielvorgänge 

Put-Operationen

Es gibt zwei Möglichkeiten, Zeilen in HBase einzufügen und zu aktualisieren. Die erste und am meisten empfohlene Methode besteht darin, einen Katalog zu erstellen, bei dem es sich um ein Schema handelt, das die Spalten einer HBase-Tabelle einem PySpark-Datenrahmen zuordnet und dabei den Tabellennamen und den Namespace angibt. Das Erstellen dieses benutzerdefinierten JSON-Formats ist die bevorzugte Methode, da es auch mit anderen Vorgängen verwendet werden kann. Weitere Informationen zu Katalogen finden Sie in dieser Dokumentation http://hbase.apache.org/book.html#_define_catalog. Die zweite Methode verwendet einen spezifischen Mapping-Parameter namens „hbase.columns.mapping“, der nur eine Zeichenfolge von Schlüssel-Wert-Paaren akzeptiert.

  • Kataloge verwenden
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("SampleApplication")\
  .getOrCreate()

tableCatalog = ''.join("""{
               "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
               "rowkey":"key",
               "columns":{
                 "key":{"cf":"rowkey", "col":"key", "type":"int"},
                 "empId":{"cf":"personal","col":"empId","type":"string"},
                 "empName":{"cf":"personal", "col":"empName", "type":"string"},
                 "empState":{"cf":"personal", "col":"empWeight", "type":"string"}
               }
             }""".split())

employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3]))
employeeDF = spark.createDataFrame(employeeMap)

employeeDF.write.format("org.apache.hadoop.hbase.spark") \
  .options(catalog=tableCatalog, newTable=5) \
  .option("hbase.spark.use.hbasecontext", False) \
  .save()
# newTable refers to the NumberOfRegions which has to be > 3

Überprüfen Sie, ob eine neue Tabelle mit dem Namen „tblEmployee“ in HBase erstellt wurde, indem Sie einfach die HBase-Shell öffnen und den folgenden Befehl ausführen:

Scannen Sie „tblEmployee“, {‘LIMIT‘ => 2}

Die Verwendung von Katalogen kann Ihnen auch das einfache Laden von HBase-Tabellen ermöglichen. Dies wird in einer zukünftigen Folge besprochen.

  • Mit hbase.columns.mapping

Beim Schreiben des PySpark-Datenrahmens kann eine Option namens „hbase.columns.mapping“ hinzugefügt werden, um eine Zeichenfolge einzuschließen, die die Spalten korrekt zuordnet. Mit dieser Option können Sie nur Zeilen in vorhandene Tabellen einfügen.

Lassen Sie uns zunächst in der HBase-Shell eine Tabelle erstellen, die „tblEmployee2“, „personal“

erstellt

Lassen Sie uns nun in PySpark zwei Zeilen mit „hbase.columns.mapping“

einfügen
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("SampleApplication")\
  .getOrCreate()

employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)]
employeeRDD = spark.sparkContext.parallelize(employee)
employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3])))
employeeDF = spark.createDataFrame(employeeMap)


employeeDF.write.format("org.apache.hadoop.hbase.spark") \
       .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \
       .option("hbase.table", "tblEmployee2") \
       .option("hbase.spark.use.hbasecontext", False) \
       .save()

Überprüfen Sie erneut, ob eine neue Tabelle mit dem Namen „tblEmployee2“ diese neuen Zeilen enthält.

Scannen Sie ‘tblEmployee2’, {‘LIMIT’ => 2}

Damit sind unsere Beispiele zum Einfügen von Zeilen über PySpark in HBase-Tabellen abgeschlossen. In der nächsten Ausgabe werde ich Get and Scan Operations, PySpark SQL und einige Fehlerbehebungen besprechen. Bis dahin sollten Sie sich einen CDP-Cluster besorgen und sich durch diese Beispiele durcharbeiten.