Das Problem liegt in deinem Code. Da Sie eine Tabelle überschreiben, aus der Sie lesen möchten, löschen Sie effektiv alle Daten, bevor Spark tatsächlich darauf zugreifen kann.
Denken Sie daran, dass Spark faul ist. Wenn Sie einen Dataset
erstellen Spark ruft erforderliche Metadaten ab, lädt die Daten jedoch nicht. Es gibt also keinen magischen Cache, der den ursprünglichen Inhalt bewahrt. Daten werden dann geladen, wenn sie tatsächlich benötigt werden. Hier ist es, wenn Sie write
ausführen Aktion und wenn Sie mit dem Schreiben beginnen, müssen keine Daten mehr abgerufen werden.
Was Sie brauchen, ist so etwas:
- Erstellen Sie einen
Dataset
. -
Wenden Sie die erforderlichen Transformationen an und schreiben Sie Daten in eine MySQL-Zwischentabelle.
-
TRUNCATE
die ursprüngliche Eingabe undINSERT INTO ... SELECT
aus der Zwischentabelle oderDROP
die ursprüngliche Tabelle undRENAME
Zwischentabelle.
Alternativer, aber weniger günstiger Ansatz wäre:
- Erstellen Sie einen
Dataset
. - Wenden Sie erforderliche Transformationen an und schreiben Sie Daten in eine persistente Spark-Tabelle (
df.write.saveAsTable(...)
). oder gleichwertig) TRUNCATE
die ursprüngliche Eingabe.- Daten zurücklesen und speichern (
spark.table(...).write.jdbc(...)
) - Drop-Spark-Tabelle.
Wir können nicht genug betonen, dass Spark cache
verwendet / persist
ist nicht der richtige Weg. Sogar mit dem konservativen StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) zwischengespeicherte Daten können verloren gehen (Knotenausfälle), was zu stillen Korrektheitsfehlern führt.