Velkommen til streaming. Det, du virkelig ønsker, er en "begiven strøm", der behandler dit input "én del af gangen", og selvfølgelig ideelt set ved hjælp af en fælles afgrænsning, såsom det "nylinje"-tegn, du bruger i øjeblikket.
For virkelig effektive ting kan du tilføje brug af MongoDB "Bulk API" indsatser for at gøre din indlæsning så hurtig som muligt uden at optage al maskinens hukommelse eller CPU-cyklusser.
Fortaler ikke, da der er forskellige tilgængelige løsninger, men her er en liste, der bruger line- input-stream-pakke for at gøre "linieterminator"-delen enkel.
Skemadefinitioner kun efter "eksempel":
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("\n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
Så generelt nedbryder "stream"-grænsefladen der "inputtet" for at behandle "én linje ad gangen". Det forhindrer dig i at indlæse alt på én gang.
Hoveddelene er "Bulk Operations API" fra MongoDB. Dette giver dig mulighed for at "opstille" mange operationer ad gangen, før du rent faktisk sender til serveren. Så i dette tilfælde med brugen af en "modulo", bliver skrivninger kun sendt pr. 1000 behandlede poster. Du kan virkelig gøre alt op til 16 MB BSON-grænsen, men hold det håndterbart.
Ud over at operationerne behandles i bulk, er der en ekstra "begrænser" på plads fra async bibliotek. Det er egentlig ikke påkrævet, men dette sikrer, at der i det væsentlige ikke er mere end "modulo-grænsen" af dokumenter i gang på noget tidspunkt. De generelle batch-"inserts" kommer uden andre IO-omkostninger end hukommelse, men "execute"-kaldene betyder, at IO behandler. Så vi venter i stedet for at stille flere ting i kø.
Der er helt sikkert bedre løsninger, du kan finde til "stream-behandling" af CSV-type data, hvilket dette ser ud til at være. Men generelt giver dette dig koncepterne til, hvordan du gør dette på en hukommelseseffektiv måde uden også at spise CPU-cyklusser.