MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

Importieren Sie CSV mit dem Mongoose-Schema

Sie können dies mit Fast-CSV tun, indem Sie die headers abrufen aus der Schemadefinition, die die geparsten Zeilen als "Objekte" zurückgibt. Sie haben tatsächlich einige Diskrepanzen, also habe ich sie mit Korrekturen markiert:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Solange das Schema tatsächlich mit der bereitgestellten CSV übereinstimmt, ist es in Ordnung. Dies sind die Korrekturen, die ich sehen kann, aber wenn Sie die tatsächlichen Feldnamen anders ausrichten müssen, müssen Sie sie anpassen. Aber es gab im Grunde eine Number an der Stelle, an der ein String steht und im Wesentlichen ein zusätzliches Feld, von dem ich annehme, dass es das leere Feld in der CSV ist.

Die allgemeinen Dinge sind das Abrufen des Arrays von Feldnamen aus dem Schema und das Übergeben an die Optionen beim Erstellen der CSV-Parser-Instanz:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Sobald Sie das tatsächlich getan haben, erhalten Sie anstelle eines Arrays ein "Objekt" zurück:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Machen Sie sich keine Sorgen um die "Typen", da Mongoose die Werte gemäß dem Schema umwandelt.

Der Rest passiert innerhalb des Handlers für die data Veranstaltung. Für maximale Effizienz verwenden wir insertMany() nur einmal alle 10.000 Zeilen in die Datenbank zu schreiben. Wie das tatsächlich zum Server und zu den Prozessen geht, hängt von der MongoDB-Version ab, aber 10.000 sollten ziemlich vernünftig sein, basierend auf der durchschnittlichen Anzahl von Feldern, die Sie für eine einzelne Sammlung importieren würden, in Bezug auf den „Kompromiss“ für die Speichernutzung und das Schreiben von a angemessene Netzwerkanforderung. Verkleinern Sie die Zahl gegebenenfalls.

Die wichtigen Teile sind, diese Aufrufe als async zu markieren Funktionen und await das Ergebnis von insertMany() vor dem Fortfahren. Außerdem müssen wir pause() machen den Stream und resume() bei jedem Item sonst laufen wir Gefahr den buffer zu überschreiben von Dokumenten, die vor dem eigentlichen Versand eingefügt werden müssen. Die pause() und resume() sind notwendig, um "Gegendruck" auf das Rohr auszuüben, andernfalls kommen Gegenstände einfach immer wieder "heraus" und feuern die data ab Veranstaltung.

Natürlich erfordert die Kontrolle für die 10.000 Einträge, dass wir dies sowohl bei jeder Iteration als auch bei der Beendigung des Streams überprüfen, um den Puffer zu leeren und alle verbleibenden Dokumente an den Server zu senden.

Das ist wirklich das, was Sie tun möchten, da Sie sicherlich nicht bei "jeder" Iteration durch die data eine asynchrone Anfrage an den Server senden möchten Ereignis oder im Wesentlichen ohne auf den Abschluss jeder Anforderung zu warten. Bei „sehr kleinen Dateien“ kommen Sie damit davon, dies nicht zu überprüfen, aber bei jeder Belastung in der realen Welt werden Sie mit Sicherheit die Aufrufliste überschreiten, da asynchrone „In-Flight“-Aufrufe noch nicht abgeschlossen sind.

FYI - eine package.json Gebraucht. Das mz ist optional, da es nur ein modernisiertes Promise ist aktivierte Bibliothek von "eingebauten" Standardknotenbibliotheken, an deren Verwendung ich einfach gewöhnt bin. Der Code ist natürlich komplett austauschbar mit dem fs Modul.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Tatsächlich können wir dies mit Node v8.9.x und höher mit einer Implementierung von AsyncIterator sogar noch viel einfacher machen durch den stream-to-iterator Modul. Es befindet sich immer noch in Iterator<Promise<T>> Modus, aber es sollte reichen, bis Node v10.x stabil wird LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Im Grunde wird die gesamte Verarbeitung von Stream-"Ereignissen" sowie das Anhalten und Fortsetzen durch ein einfaches for ersetzt Schleife:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Leicht! Dies wird in einer späteren Node-Implementierung mit for..await..of bereinigt wenn es stabiler wird. Aber das obige läuft gut auf der angegebenen Version und höher.