Jeg er ikke ekspert i mongodb, men baseret på de eksempler, jeg har set, er dette et mønster, jeg ville prøve.
Jeg har udeladt andre hændelser end data, da det ser ud til at være den største bekymring at begrænse den.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Jeg forsøger at sammensætte en test af dette Rx-flow uden mongodb, i mellemtiden kan dette måske give dig nogle ideer.