Ich habe eine Lösung gefunden. Da ich den richtigen Mongo-Treiber für strukturiertes Streaming nicht finden konnte, habe ich an einer anderen Lösung gearbeitet. Jetzt verwende ich die direkte Verbindung zu mongoDb und verwende "foreach(...)" anstelle von foreachbatch(. ..). Mein Code sieht in der Datei testSpark.py folgendermaßen aus:
....
import pymongo
from pymongo import MongoClient
local_url = "mongodb://localhost:27017"
def write_machine_df_mongo(target_df):
cluster = MongoClient(local_url)
db = cluster["test_db"]
collection = db.test1
post = {
"machine_id": target_df.machine_id,
"proc_type": target_df.proc_type,
"sensor1_id": target_df.sensor1_id,
"sensor2_id": target_df.sensor2_id,
"time": target_df.time,
"sensor1_val": target_df.sensor1_val,
"sensor2_val": target_df.sensor2_val,
}
collection.insert_one(post)
machine_df.writeStream\
.outputMode("append")\
.foreach(write_machine_df_mongo)\
.start()