MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

MongoDBObject wird nicht innerhalb eines rrd-Foreach-Loop-Casbah-Scala-Apache-Sparks hinzugefügt

Die Berechnungen auf RDDs werden über den Cluster verteilt. Sie können eine Variable, die außerhalb des Abschlusses des RDD-Vorgangs erstellt wurde, nicht innerhalb des RDD aktualisieren. Sie befinden sich grundsätzlich an zwei verschiedenen Stellen:Die Variable wird im Spark-Treiber erstellt und in den Workern aufgerufen und sollte als schreibgeschützt behandelt werden.

Spark unterstützt verteilte Kumulatoren, die in diesem Fall verwendet werden könnten:Spark-Kumulatoren

Eine andere Option (die ich bevorzugen würde) besteht darin, den RDD-Stream in das gewünschte Datenformat umzuwandeln und den foreachRDD zu verwenden Methode, um es im Sekundärspeicher zu persistieren. Dies wäre eine funktionalere Herangehensweise an das Problem. Das würde ungefähr so ​​aussehen:

  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)