Dies ist noch nicht veröffentlicht, aber im Master-Zweig von Alpakka, MongoSource.apply
nimmt einen Typparameter:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Daher können Sie mit der kommenden Version 0.18 von Alpakka Folgendes tun:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Beachten Sie, dass source hier wird davon ausgegangen, dass todoCollection.find() gibt ein Observable[TodoMongo] zurück; Passen Sie die Typen nach Bedarf an.
In der Zwischenzeit könnten Sie den obigen Code einfach manuell hinzufügen. Zum Beispiel:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Beachten Sie, dass MyMongoSource ist so definiert, dass es sich in akka.stream.alpakka.mongodb.scaladsl befindet Paket (wie MongoSource ), weil ObservableToPublisher
ist ein Paket-Privatunterricht. Sie würden MyMongoSource verwenden genauso wie Sie MongoSource verwenden würden :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())