I Spark er funktionerne på RDD
s (som map
her) serialiseres og sendes til eksekutorerne til behandling. Dette indebærer, at alle elementer indeholdt i disse operationer skal kunne serialiseres.
Redis-forbindelsen her kan ikke serialiseres, da den åbner TCP-forbindelser til mål-DB, der er bundet til den maskine, hvor den er oprettet.
Løsningen er at skabe disse forbindelser på udførerne, i den lokale udførelseskontekst. Der er få måder at gøre det på. To, du tænker på, er:
rdd.mapPartitions
:lader dig behandle en hel partition på én gang og derfor amortisere omkostningerne ved at oprette forbindelser)- Singleton-forbindelsesadministratorer:Opret forbindelsen én gang pr. udfører
mapPartitions
er nemmere, da alt det kræver er en lille ændring af programstrukturen:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
En singleton forbindelsesmanager kan modelleres med et objekt, der har en doven reference til en forbindelse (bemærk:en mutbar ref vil også fungere).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Dette objekt kan derefter bruges til at instansiere 1 forbindelse pr. arbejder-JVM og bruges som en Serializable
genstand i en operationslukning.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Fordelen ved at bruge singleton-objektet er mindre overhead, da forbindelser kun oprettes én gang af JVM (i modsætning til 1 pr. RDD-partition)
Der er også nogle ulemper:
- oprydning af forbindelser er besværlig (shutdown hook/timere)
- man skal sikre trådsikkerhed for delte ressourcer
(*) kode angivet til illustrationsformål. Ikke kompileret eller testet.