sql >> Database teknologi >  >> NoSQL >> MongoDB

Importer CSV ved hjælp af Mongoose Schema

Du kan gøre det med fast-csv ved at hente headers fra skemadefinitionen, som vil returnere de parsede linjer som "objekter". Du har faktisk nogle uoverensstemmelser, så jeg har markeret dem med rettelser:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Så længe skemaet rent faktisk stemmer overens med den medfølgende CSV, så er det okay. Dette er de rettelser, som jeg kan se, men hvis du har brug for, at de faktiske feltnavne skal justeres anderledes, skal du justere. Men der var dybest set et Number i den position, hvor der er en String og i det væsentlige et ekstra felt, som jeg formoder er det tomme felt i CSV'en.

De generelle ting er at hente rækken af ​​feltnavne fra skemaet og overføre det til mulighederne, når du laver csv-parser-forekomsten:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Når du rent faktisk gør det, får du et "Objekt" tilbage i stedet for et array:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Du skal ikke bekymre dig om "typerne", fordi Mongoose vil kaste værdierne i henhold til skemaet.

Resten sker i behandleren for data begivenhed. For maksimal effektivitet bruger vi insertMany() kun at skrive til databasen én gang for hver 10.000 linjer. Hvordan det rent faktisk går til serveren og processerne afhænger af MongoDB-versionen, men 10.000 burde være ret rimeligt baseret på det gennemsnitlige antal felter, du ville importere for en enkelt samling i form af "afvejningen" for hukommelsesbrug og skrivning af en rimelig netværksanmodning. Gør tallet mindre, hvis det er nødvendigt.

De vigtige dele er at markere disse opkald som async funktioner og await resultatet af insertMany() før du fortsætter. Vi skal også pause() streamen og resume() på hvert element ellers risikerer vi at overskrive buffer af dokumenter, der skal indsættes, før de rent faktisk sendes. pause() og resume() er nødvendige for at sætte "modtryk" på røret, ellers bliver der bare ved med at "komme ud" og affyre data begivenhed.

Naturligvis kræver kontrollen for de 10.000 poster, at vi kontrollerer det både ved hver iteration og ved streamafslutning for at tømme bufferen og sende eventuelle resterende dokumenter til serveren.

Det er virkelig, hvad du vil gøre, da du bestemt ikke ønsker at affyre en async-anmodning til serveren både på "hver" iteration gennem data begivenhed eller i det væsentlige uden at vente på, at hver anmodning er fuldført. Du slipper for ikke at tjekke det for "meget små filer", men for enhver belastning i den virkelige verden er du sikker på at overskride opkaldsstakken på grund af "in flight" asynkrone opkald, som endnu ikke er afsluttet.

FYI - en package.json Brugt. mz er valgfrit, da det blot er et moderniseret Promise aktiveret bibliotek af standard node "indbyggede" biblioteker, som jeg simpelthen er vant til at bruge. Koden er naturligvis fuldstændig udskiftelig med fs modul.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Faktisk med Node v8.9.x og nyere, så kan vi endda gøre dette meget enklere med en implementering af AsyncIterator gennem stream-to-iterator modul. Det er stadig i Iterator<Promise<T>> tilstand, men det burde gøre indtil Node v10.x bliver stabil LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Grundlæggende bliver al stream-"hændelse"-håndtering og pause og genoptagelse erstattet af en simpel for sløjfe:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Let! Dette bliver ryddet op i senere nodeimplementering med for..await..of når den bliver mere stabil. Men ovenstående kører fint på fra den angivne version og opefter.



  1. Hierarkiske forespørgsler med Mongo ved hjælp af $graphLookup

  2. Hvornår skal du deaktivere Transparent Huge Pages for redis

  3. Indstil Mongo Timeout i Spring Boot

  4. Kom godt i gang med Cloudera Data Platform Operational Database (COD)