sql >> Database teknologi >  >> RDS >> PostgreSQL

Primære nøgler med Apache Spark

Scala :

Hvis alt du behøver er unikke numre, kan du bruge zipWithUniqueId og genskabe DataFrame. Først nogle importer og dummy-data:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Udpak skema til yderligere brug:

val schema = df.schema

Tilføj id-felt:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Opret DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Det samme i Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Hvis du foretrækker fortløbende nummer, kan du erstatte zipWithUniqueId med zipWithIndex men det er lidt dyrere.

Direkte med DataFrame API :

(universal Scala, Python, Java, R med stort set samme syntaks)

Tidligere har jeg savnet monotonicallyIncreasingId funktion, som burde fungere fint, så længe du ikke kræver fortløbende tal:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Selvom det er nyttigt monotonicallyIncreasingId er ikke-deterministisk. Ikke kun id'er kan være forskellige fra udførelse til udførelse, men uden yderligere tricks kan ikke bruges til at identificere rækker, når efterfølgende operationer indeholder filtre.

Bemærk :

Det er også muligt at bruge rowNumber vinduesfunktion:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Desværre:

ADVARSEL Vindue:Ingen partition defineret til vinduesdrift! Hvis alle data flyttes til en enkelt partition, kan dette forårsage alvorlig forringelse af ydeevnen.

Så medmindre du har en naturlig måde at opdele dine data på og sikre, at unikhed ikke er særlig nyttig i dette øjeblik.



  1. Benchmarking Managed PostgreSQL Cloud Solutions:Anden del - Amazon RDS

  2. SCD Type 6

  3. MySQL 1062 - Dublet indtastning '0' for nøglen 'PRIMÆR'

  4. SQLite underforespørgsel