Skala :
Wenn Sie nur eindeutige Nummern benötigen, können Sie zipWithUniqueId
verwenden und erstellen Sie DataFrame neu. Zuerst einige Importe und Dummy-Daten:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Schema zur weiteren Verwendung extrahieren:
val schema = df.schema
ID-Feld hinzufügen:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
DataFrame erstellen:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Das Gleiche in Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Wenn Sie eine fortlaufende Nummer bevorzugen, können Sie zipWithUniqueId
ersetzen mit zipWithIndex
aber es ist ein bisschen teurer.
Direkt mit DataFrame
API :
(universal Scala, Python, Java, R mit so ziemlich der gleichen Syntax)
Bisher habe ich monotonicallyIncreasingId
verpasst Funktion, die gut funktionieren sollte, solange Sie keine fortlaufenden Nummern benötigen:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Während nützlich monotonicallyIncreasingId
ist nicht deterministisch. Nicht nur IDs können sich von Ausführung zu Ausführung unterscheiden, sondern können ohne zusätzliche Tricks nicht verwendet werden, um Zeilen zu identifizieren, wenn nachfolgende Operationen Filter enthalten.
Hinweis :
Es ist auch möglich, rowNumber
zu verwenden Fensterfunktion:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Leider:
WARN Window:Keine Partition für Windows-Betrieb definiert! Wenn Sie alle Daten auf eine einzelne Partition verschieben, kann dies zu ernsthaften Leistungseinbußen führen.
Es sei denn, Sie haben eine natürliche Möglichkeit, Ihre Daten zu partitionieren und sicherzustellen, dass die Eindeutigkeit in diesem Moment nicht besonders nützlich ist.