Das Problem liegt in deinem Code. Da Sie eine Tabelle überschreiben, aus der Sie lesen möchten, löschen Sie effektiv alle Daten, bevor Spark tatsächlich darauf zugreifen kann.
Denken Sie daran, dass Spark faul ist. Wenn Sie einen Dataset erstellen Spark ruft erforderliche Metadaten ab, lädt die Daten jedoch nicht. Es gibt also keinen magischen Cache, der den ursprünglichen Inhalt bewahrt. Daten werden dann geladen, wenn sie tatsächlich benötigt werden. Hier ist es, wenn Sie write ausführen Aktion und wenn Sie mit dem Schreiben beginnen, müssen keine Daten mehr abgerufen werden.
Was Sie brauchen, ist so etwas:
- Erstellen Sie einen
Dataset. -
Wenden Sie die erforderlichen Transformationen an und schreiben Sie Daten in eine MySQL-Zwischentabelle.
-
TRUNCATEdie ursprüngliche Eingabe undINSERT INTO ... SELECTaus der Zwischentabelle oderDROPdie ursprüngliche Tabelle undRENAMEZwischentabelle.
Alternativer, aber weniger günstiger Ansatz wäre:
- Erstellen Sie einen
Dataset. - Wenden Sie erforderliche Transformationen an und schreiben Sie Daten in eine persistente Spark-Tabelle (
df.write.saveAsTable(...)). oder gleichwertig) TRUNCATEdie ursprüngliche Eingabe.- Daten zurücklesen und speichern (
spark.table(...).write.jdbc(...)) - Drop-Spark-Tabelle.
Wir können nicht genug betonen, dass Spark cache verwendet / persist ist nicht der richtige Weg. Sogar mit dem konservativen StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2 ) zwischengespeicherte Daten können verloren gehen (Knotenausfälle), was zu stillen Korrektheitsfehlern führt.