Ich bin kein Experte für Mongodb, aber basierend auf den Beispielen, die ich gesehen habe, ist dies ein Muster, das ich ausprobieren würde.
Ich habe andere Ereignisse als Daten weggelassen, da die Drosselung dieses Ereignisses das Hauptanliegen zu sein scheint.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Ich versuche, einen Test dieses Rx-Flows ohne Mongodb zusammenzustellen, in der Zwischenzeit könnte Ihnen dies einige Ideen geben.