MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

MongoDB:Ist es möglich, TTL-Ereignisse mit Change Stream zu erfassen, um einen Scheduler (Cronjob) zu emulieren?

Ich konnte Change Streams und TTL verwenden, um einen Cronjob zu emulieren. Ich habe einen Beitrag veröffentlicht, in dem ich ausführlich erkläre, was ich getan habe, und unter:https://www. patreon.com/posts/17697287

Aber grundsätzlich erstelle ich jedes Mal, wenn ich ein „Ereignis“ für ein Dokument planen muss, parallel dazu ein Ereignisdokument, wenn ich das Dokument erstelle. Dieses Ereignisdokument hat als _id dieselbe ID wie das erste Dokument.

Außerdem werde ich für dieses Ereignisdokument eine TTL festlegen.

Wenn die TTL abläuft, werde ich ihre „Lösch“-Änderung mit Change Streams erfassen. Und dann verwende ich den documentKey der Änderung (da es dieselbe ID wie das Dokument ist, das ich auslösen möchte), um das Zieldokument in der ersten Sammlung zu finden, und mache mit dem Dokument alles, was ich will.

Ich verwende Node.js mit Express und Mongoose, um auf MongoDB zuzugreifen. Hier ist der relevante Teil, der in App.js hinzugefügt werden muss:

const { ReplSet } = require('mongodb-topology-manager');

run().catch(error => console.error(error));

async function run() {
    console.log(new Date(), 'start');
    const bind_ip = 'localhost';
    // Starts a 3-node replica set on ports 31000, 31001, 31002, replica set
    // name is "rs0".
    const replSet = new ReplSet('mongod', [
        { options: { port: 31000, dbpath: `${__dirname}/data/db/31000`, bind_ip } },
        { options: { port: 31001, dbpath: `${__dirname}/data/db/31001`, bind_ip } },
        { options: { port: 31002, dbpath: `${__dirname}/data/db/31002`, bind_ip } }
    ], { replSet: 'rs0' });

    // Initialize the replica set
    await replSet.purge();
    await replSet.start();
    console.log(new Date(), 'Replica set started...');

    // Connect to the replica set
    const uri = 'mongodb://localhost:31000,localhost:31001,localhost:31002/' + 'test?replicaSet=rs0';
    await mongoose.connect(uri);
    var db = mongoose.connection;
    db.on('error', console.error.bind(console, 'connection error:'));
    db.once('open', function () {
        console.log("Connected correctly to server");
    });

    // To work around "MongoError: cannot open $changeStream for non-existent database: test" for this example
    await mongoose.connection.createCollection('test');

    // *** we will add our scheduler here *** //

    var Item = require('./models/item');
    var ItemExpiredEvent = require('./models/scheduledWithin');

    let deleteOps = {
      $match: {
          operationType: "delete" 
      }
    };

    ItemExpiredEvent.watch([deleteOps]).
        on('change', data => {
            // *** treat the event here *** //
            console.log(new Date(), data.documentKey);
            Item.findById(data.documentKey, function(err, item) {
                console.log(item);
            });
        });

    // The TTL set in ItemExpiredEvent will trigger the change stream handler above
    console.log(new Date(), 'Inserting item');
    Item.create({foo:"foo", bar: "bar"}, function(err, cupom) {
        ItemExpiredEvent.create({_id : item._id}, function(err, event) {
            if (err) console.log("error: " + err);
            console.log('event inserted');
        });
    });

}

Und hier ist der Code für model/ScheduledWithin:

var mongoose = require('mongoose');
var Schema = mongoose.Schema;

var ScheduledWithin = new Schema({
    _id: mongoose.Schema.Types.ObjectId,
}, {timestamps: true}); 
// timestamps: true will automatically create a "createdAt" Date field

ScheduledWithin.index({createdAt: 1}, {expireAfterSeconds: 90});

module.exports = mongoose.model('ScheduledWithin', ScheduledWithin);