Dem Fehler zufolge haben Sie bereits eine Zeichenfolge (Sie haben bereits df.selectExpr("CAST(value AS STRING)")
ausgeführt ), also sollten Sie versuchen, das Row-Ereignis als String
abzurufen , und kein Array[Byte]
Beginnen Sie mit der Änderung von
val valueStr = new String(record.getAs[Array[Byte]]("value"))
zu
val valueStr = record.getAs[String]("value")
Ich verstehe, dass Sie möglicherweise bereits einen Cluster zum Ausführen von Spark-Code haben, aber ich würde vorschlagen, sich dennoch mit zu befassen Kafka Connect Mongo Sink-Anschluss sodass Sie Ihren eigenen Mongo-Writer nicht in Spark-Code schreiben und verwalten müssen.
Oder Sie können Spark-Datensätze auch direkt in Mongo schreiben