PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

Primärschlüssel mit Apache Spark

Skala :

Wenn Sie nur eindeutige Nummern benötigen, können Sie zipWithUniqueId verwenden und erstellen Sie DataFrame neu. Zuerst einige Importe und Dummy-Daten:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Schema zur weiteren Verwendung extrahieren:

val schema = df.schema

ID-Feld hinzufügen:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

DataFrame erstellen:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Das Gleiche in Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Wenn Sie eine fortlaufende Nummer bevorzugen, können Sie zipWithUniqueId ersetzen mit zipWithIndex aber es ist ein bisschen teurer.

Direkt mit DataFrame API :

(universal Scala, Python, Java, R mit so ziemlich der gleichen Syntax)

Bisher habe ich monotonicallyIncreasingId verpasst Funktion, die gut funktionieren sollte, solange Sie keine fortlaufenden Nummern benötigen:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Während nützlich monotonicallyIncreasingId ist nicht deterministisch. Nicht nur IDs können sich von Ausführung zu Ausführung unterscheiden, sondern können ohne zusätzliche Tricks nicht verwendet werden, um Zeilen zu identifizieren, wenn nachfolgende Operationen Filter enthalten.

Hinweis :

Es ist auch möglich, rowNumber zu verwenden Fensterfunktion:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Leider:

WARN Window:Keine Partition für Windows-Betrieb definiert! Wenn Sie alle Daten auf eine einzelne Partition verschieben, kann dies zu ernsthaften Leistungseinbußen führen.

Es sei denn, Sie haben eine natürliche Möglichkeit, Ihre Daten zu partitionieren und sicherzustellen, dass die Eindeutigkeit in diesem Moment nicht besonders nützlich ist.