Dies ist noch nicht veröffentlicht, aber im Master-Zweig von Alpakka, MongoSource.apply
nimmt einen Typparameter:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Daher können Sie mit der kommenden Version 0.18 von Alpakka Folgendes tun:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Beachten Sie, dass source
hier wird davon ausgegangen, dass todoCollection.find()
gibt ein Observable[TodoMongo]
zurück; Passen Sie die Typen nach Bedarf an.
In der Zwischenzeit könnten Sie den obigen Code einfach manuell hinzufügen. Zum Beispiel:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Beachten Sie, dass MyMongoSource
ist so definiert, dass es sich in akka.stream.alpakka.mongodb.scaladsl
befindet Paket (wie MongoSource
), weil ObservableToPublisher
ist ein Paket-Privatunterricht. Sie würden MyMongoSource
verwenden genauso wie Sie MongoSource
verwenden würden :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())