MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

Senke Kafka Stream mit PySpark Structured Streaming in MongoDB

Ich habe eine Lösung gefunden. Da ich den richtigen Mongo-Treiber für strukturiertes Streaming nicht finden konnte, habe ich an einer anderen Lösung gearbeitet. Jetzt verwende ich die direkte Verbindung zu mongoDb und verwende "foreach(...)" anstelle von foreachbatch(. ..). Mein Code sieht in der Datei testSpark.py folgendermaßen aus:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()