MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

Speichern Sie mit Mongoose eine sehr große CSV-Datei in MongoDB

Willkommen beim Streamen. Was Sie wirklich wollen, ist ein "evented stream", der Ihre Eingabe "blockweise" verarbeitet, und natürlich idealerweise durch ein gemeinsames Trennzeichen wie das "newline"-Zeichen, das Sie gerade verwenden.

Für wirklich effiziente Sachen können Sie die Verwendung von MongoDB "Bulk API" fügt ein, um das Laden so schnell wie möglich zu machen, ohne den gesamten Maschinenspeicher oder die CPU-Zyklen zu verbrauchen.

Ich befürworte das nicht, da verschiedene Lösungen verfügbar sind, aber hier ist eine Auflistung, die die line- input-stream-Paket um den "Zeilenabschluss"-Teil einfach zu machen.

Schemadefinitionen nur nach "Beispiel":

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

Im Allgemeinen zerlegt die "Stream" -Schnittstelle dort also die Eingabe, um "eine Zeile nach der anderen" zu verarbeiten. Das hindert Sie daran, alles auf einmal zu laden.

Die Hauptbestandteile sind die "Bulk Operations API" von MongoDB. Auf diese Weise können Sie viele Vorgänge gleichzeitig "in die Warteschlange stellen", bevor Sie sie tatsächlich an den Server senden. In diesem Fall werden also bei Verwendung eines "Modulo" Schreibvorgänge nur pro 1000 verarbeiteten Einträgen gesendet. Sie können wirklich alles bis zum 16-MB-BSON-Limit tun, aber halten Sie es überschaubar.

Zusätzlich zu den Vorgängen, die in großen Mengen verarbeitet werden, gibt es einen zusätzlichen „Begrenzer“ von async Bibliothek. Es ist nicht wirklich erforderlich, aber es stellt sicher, dass zu keinem Zeitpunkt im Wesentlichen mehr als die „Modulo-Grenze“ an Dokumenten in Bearbeitung ist. Die allgemeinen Batch-"Einfügungen" verursachen keine anderen IO-Kosten als Speicher, aber die "Execute"-Aufrufe bedeuten, dass IO verarbeitet wird. Also warten wir, anstatt weitere Dinge in die Warteschlange zu stellen.

Es gibt sicherlich bessere Lösungen, die Sie für die "Stream-Verarbeitung" von CSV-Daten finden können, die dies zu sein scheinen. Aber im Allgemeinen gibt Ihnen dies die Konzepte, wie Sie dies auf speichereffiziente Weise tun können, ohne auch CPU-Zyklen zu verbrauchen.