Problemet ligger i din kode. Fordi du overskriver en tabel, hvorfra du forsøger at læse, sletter du effektivt alle data, før Spark rent faktisk kan få adgang til det.
Husk at Spark er doven. Når du opretter et Dataset Spark henter nødvendige metadata, men indlæser ikke dataene. Så der er ingen magisk cache, som vil bevare det originale indhold. Data vil blive indlæst, når det faktisk er påkrævet. Her er det, når du udfører write handling, og når du begynder at skrive, er der ikke flere data at hente.
Det du har brug for er noget som dette:
- Opret et
Dataset. -
Anvend nødvendige transformationer og skriv data til en mellemliggende MySQL-tabel.
-
TRUNCATEdet originale input ogINSERT INTO ... SELECTfra den mellemliggende tabel ellerDROPden originale tabel ogRENAMEmellemtabel.
Alternativ, men mindre gunstig tilgang, ville være:
- Opret et
Dataset. - Anvend nødvendige transformationer og skriv data til en vedvarende Spark-tabel (
df.write.saveAsTable(...)eller tilsvarende) TRUNCATEdet originale input.- Læs data tilbage og gem (
spark.table(...).write.jdbc(...)) - Drop Spark-bord.
Vi kan ikke understrege nok, at brugen af Spark cache / persist er ikke vejen at gå. Selv med det konservative StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2 ) cachelagrede data kan gå tabt (nodefejl), hvilket fører til tavse korrekthedsfejl.