Scala :
Hvis alt du behøver er unikke numre, kan du bruge zipWithUniqueId
og genskabe DataFrame. Først nogle importer og dummy-data:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Udpak skema til yderligere brug:
val schema = df.schema
Tilføj id-felt:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Opret DataFrame:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Det samme i Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Hvis du foretrækker fortløbende nummer, kan du erstatte zipWithUniqueId
med zipWithIndex
men det er lidt dyrere.
Direkte med DataFrame
API :
(universal Scala, Python, Java, R med stort set samme syntaks)
Tidligere har jeg savnet monotonicallyIncreasingId
funktion, som burde fungere fint, så længe du ikke kræver fortløbende tal:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Selvom det er nyttigt monotonicallyIncreasingId
er ikke-deterministisk. Ikke kun id'er kan være forskellige fra udførelse til udførelse, men uden yderligere tricks kan ikke bruges til at identificere rækker, når efterfølgende operationer indeholder filtre.
Bemærk :
Det er også muligt at bruge rowNumber
vinduesfunktion:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Desværre:
ADVARSEL Vindue:Ingen partition defineret til vinduesdrift! Hvis alle data flyttes til en enkelt partition, kan dette forårsage alvorlig forringelse af ydeevnen.
Så medmindre du har en naturlig måde at opdele dine data på og sikre, at unikhed ikke er særlig nyttig i dette øjeblik.