MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

Knoten fügen große Daten mit Mungo ein

Das Problem dabei ist, dass die Schleife, die Sie ausführen, nicht darauf wartet, dass jede Operation abgeschlossen wird. Tatsächlich reihen Sie also nur Tausende von .save() in die Warteschlange Anforderungen und versuchen, sie gleichzeitig auszuführen. Das ist innerhalb angemessener Grenzen nicht möglich, daher erhalten Sie die Fehlermeldung.

Der Async -Modul verfügt über verschiedene Methoden zum Iterieren, während ein Callback für diesen Iterator verarbeitet wird, wobei das wahrscheinlich einfachste direkte for while ist . Mongoose übernimmt auch die Verbindungsverwaltung für Sie, ohne dass eine Einbettung in den Rückruf erforderlich ist, da die Modelle verbindungsbewusst sind:

var tempColSchema = new Schema({
    cid: {
        type: Number,
        required: true
    },
    loc:[]
});

var TempCol = mongoose.model( "TempCol", tempColSchema );

mongoose.connect( 'mongodb://localhost/mean-dev' );

var i = 0;
async.whilst(
    function() { return i < 10000000; },
    function(callback) {
        i++;
        var c = i;
        console.log(c);
        var lon = parseInt(c/100000);
        var lat = c%100000;
        new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){
            callback(err);
        });            
    },
    function(err) {
       // When the loop is complete or on error
    }
);

Nicht der beste Weg, es zu tun, es ist immer noch ein Schreibvorgang nach dem anderen, und Sie könnten andere Methoden verwenden, um die gleichzeitigen Operationen zu "beherrschen", aber dies wird zumindest nicht den Call-Stack sprengen.

Ab MongoDB 2.6 können Sie die Bulk Operations API verwenden um mehr als einen Schreibvorgang gleichzeitig auf dem Server zu verarbeiten. Der Prozess ist also ähnlich, aber dieses Mal können Sie 1000 auf einmal in einem einzigen Schreib- und Antwortvorgang an den Server senden, was viel schneller ist:

var tempColSchema = new Schema({
    cid: {
        type: Number,
        required: true
    },
    loc:[]
});

var TempCol = mongoose.model( "TempCol", tempColSchema );

mongoose.connect( 'mongodb://localhost/mean-dev' );

mongoose.on("open",function(err,conn) {

    var i = 0;
    var bulk = TempCol.collection.initializeOrderedBulkOp();

    async.whilst(
      function() { return i < 10000000; },
      function(callback) {
        i++;
        var c = i;
        console.log(c);
        var lon = parseInt(c/100000);
        var lat = c%100000;

        bulk.insert({ "cid": Math.random(), "loc": [ lon, lat ] });

        if ( i % 1000 == 0 ) {
            bulk.execute(function(err,result) {
                bulk = TempCol.collection.initializeOrderedBulkOp();
                callback(err);
            });
        } else {
            process.nextTick(callback);
        }
      },
      function(err) {
        // When the loop is complete or on error

        // If you had a number not plainly divisible by 1000
        if ( i % 1000 != 0 )
            bulk.execute(function(err,result) {
                // possibly check for errors here
            });
      }
    );

});

Dabei werden tatsächlich die nativen Treibermethoden verwendet, die noch nicht in Mongoose verfügbar sind, sodass zusätzliche Sorgfalt darauf verwendet wird, sicherzustellen, dass die Verbindung verfügbar ist. Das ist ein Beispiel, aber nicht der einzige Weg, aber der Hauptpunkt ist, dass die Mungo-"Magie" für Verbindungen hier nicht eingebaut ist, also sollten Sie sicher sein, dass sie eingerichtet ist.

Sie müssen eine runde Anzahl von Elementen verarbeiten, aber wo dies nicht der Fall ist, sollten Sie bulk.execute() aufrufen in diesem letzten Block sowie gezeigt, aber es hängt von der Zahl ab, die auf das Modulo reagiert.

Der Hauptpunkt besteht darin, einen Stapel von Operationen nicht auf eine unangemessene Größe anwachsen zu lassen und die Verarbeitung begrenzt zu halten. Die Flusssteuerung ermöglicht hier Vorgänge, die einige Zeit in Anspruch nehmen, bevor sie mit der nächsten Iteration fortfahren. Also entweder die Stapelaktualisierungen oder einige zusätzliche parallele Warteschlangen sind das, was Sie für die beste Leistung wünschen.

Es gibt auch die .initializeUnorderedBulkOp() Formular dafür, wenn Sie nicht möchten, dass Schreibfehler schwerwiegend sind, sondern diese stattdessen anders behandeln. Sehen Sie sich meistens die offizielle Dokumentation zu Bulk API und Antworten an, um zu erfahren, wie die gegebene Antwort zu interpretieren ist.