MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

Echtzeit-Datenstreaming mit MongoDB Change Streams

Vor kurzem hat MongoDB eine neue Funktion ab Version 3.6 veröffentlicht, Change Streams. Dies gibt Ihnen den sofortigen Zugriff auf Ihre Daten, was Ihnen hilft, mit Ihren Datenänderungen auf dem Laufenden zu bleiben. In der heutigen Welt möchte jeder sofortige Benachrichtigungen, anstatt sie nach einigen Stunden oder Minuten zu erhalten. Für einige Anwendungen ist es wichtig, Echtzeitbenachrichtigungen für alle abonnierten Benutzer für jedes einzelne Update zu senden. MongoDB hat diesen Prozess durch die Einführung dieser Funktion wirklich einfach gemacht. In diesem Artikel erfahren wir anhand einiger Beispiele etwas über den MongoDB-Änderungsstrom und seine Anwendungen.

Änderungsströme definieren

Änderungsströme sind nichts anderes als der Echtzeitstrom aller Änderungen, die in der Datenbank oder Sammlung oder sogar in Bereitstellungen auftreten. Beispielsweise löst MongoDB jedes Mal, wenn eine Aktualisierung (Einfügen, Aktualisieren oder Löschen) in einer bestimmten Sammlung auftritt, ein Änderungsereignis mit allen geänderten Daten aus.

Sie können Änderungsströme für jede Sammlung wie alle anderen normalen Aggregationsoperatoren mit dem $changeStream-Operator und der watch()-Methode definieren. Sie können den Änderungsstrom auch mit der Methode MongoCollection.watch() definieren.

Beispiel

db.myCollection.watch()

Streams-Funktionen ändern

  • Filtern von Änderungen

    Sie können die Änderungen filtern, um Ereignisbenachrichtigungen nur für bestimmte Zieldaten zu erhalten.

    Beispiel:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Dieser Code stellt sicher, dass Sie nur Aktualisierungen für Datensätze erhalten, deren Name gleich Bob ist. Auf diese Weise können Sie beliebige Pipelines schreiben, um die Änderungsströme zu filtern.

  • Fortsetzen von Änderungsströmen

    Diese Funktion stellt sicher, dass bei Ausfällen kein Datenverlust auftritt. Jede Antwort im Stream enthält das Resume-Token, mit dem der Stream ab einem bestimmten Punkt neu gestartet werden kann. Bei einigen häufigen Netzwerkausfällen versucht der mongodb-Treiber, die Verbindung mit den Abonnenten mithilfe des neuesten Wiederaufnahme-Tokens wiederherzustellen. Im Falle eines vollständigen Anwendungsausfalls sollte das Wiederaufnahme-Token jedoch von den Clients beibehalten werden, um den Stream fortzusetzen.

  • Geordnete Änderungsströme

    MongoDB verwendet eine globale logische Uhr, um alle Change Stream-Ereignisse über alle Replikate und Shards eines beliebigen Clusters hinweg zu ordnen, sodass der Empfänger die Benachrichtigungen immer in derselben Reihenfolge erhält, in der die Befehle auf die Datenbank angewendet wurden.

  • Veranstaltungen mit vollständigen Dokumenten

    MongoDB gibt standardmäßig den Teil der übereinstimmenden Dokumente zurück. Sie können die Change Stream-Konfiguration jedoch ändern, um ein vollständiges Dokument zu erhalten. Übergeben Sie dazu { fullDocument:„updateLookup“} an die Watch-Methode.
    Beispiel:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Haltbarkeit

    Change Streams benachrichtigen nur für die Daten, die für die Mehrheit der Replikate festgeschrieben sind. Dadurch wird sichergestellt, dass Ereignisse durch Mehrheitspersistenzdaten generiert werden, die die Dauerhaftigkeit der Nachricht gewährleisten.

  • Sicherheit/Zugriffskontrolle

    Change Streams sind sehr sicher. Benutzer können Änderungsströme nur für die Sammlungen erstellen, für die sie Leseberechtigungen haben. Sie können Änderungsströme basierend auf Benutzerrollen erstellen.

Multiplenines Become a MongoDB DBA – Bringing MongoDB to ProductionErfahren Sie, was Sie wissen müssen, um MongoDBDownload for Free bereitzustellen, zu überwachen, zu verwalten und zu skalieren

Beispiel für Änderungsströme

In diesem Beispiel erstellen wir Änderungsströme für die Stock-Sammlung, um benachrichtigt zu werden, wenn ein Aktienkurs über einen Schwellenwert steigt.

  • Cluster einrichten

    Um Change Streams verwenden zu können, müssen wir zuerst einen Replikatsatz erstellen. Führen Sie den folgenden Befehl aus, um einen Einzelknoten-Replikatsatz zu erstellen.

    mongod --dbpath ./data --replSet “rs”
  • Einige Datensätze in die Stocks-Sammlung einfügen

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Knotenumgebung einrichten und Abhängigkeiten installieren

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Änderungen abonnieren

    Erstellen Sie eine index.js-Datei und fügen Sie den folgenden Code ein.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Führen Sie nun diese Datei aus:

    node index.js
  • Fügen Sie einen neuen Datensatz in db ein, um eine Aktualisierung zu erhalten

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Überprüfen Sie jetzt Ihre Konsole, Sie erhalten ein Update von MongoDB.
    Beispielantwort:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Hier können Sie den Wert des operationType-Parameters mit den folgenden Operationen ändern, um auf verschiedene Arten von Änderungen in einer Sammlung zu lauschen:

  • Einfügen
  • Ersetzen (außer eindeutige ID)
  • Aktualisieren
  • Löschen
  • Ungültig machen (immer wenn Mongo einen ungültigen Cursor zurückgibt)

Andere Modi von Änderungsströmen

Sie können Änderungsströme für eine Datenbank und Bereitstellung auf die gleiche Weise wie für die Sammlung starten. Diese Funktion wurde von MongoDB Version 4.0 veröffentlicht. Hier sind die Befehle zum Öffnen eines Änderungsstroms für Datenbanken und Bereitstellungen.

Against DB: db.watch()
Against deployment: Mongo.watch()

Schlussfolgerung

MongoDB Change Streams vereinfacht die Integration zwischen Frontend und Backend in Echtzeit und nahtlos. Diese Funktion kann Ihnen dabei helfen, MongoDB für das Pubsub-Modell zu verwenden, sodass Sie Kafka- oder RabbitMQ-Bereitstellungen nicht mehr verwalten müssen. Wenn Ihre Anwendung Echtzeitinformationen benötigt, müssen Sie sich diese Funktion von MongoDB ansehen. Ich hoffe, dieser Beitrag hilft Ihnen beim Einstieg in MongoDB-Change-Streams.