Vor kurzem hat MongoDB eine neue Funktion ab Version 3.6 veröffentlicht, Change Streams. Dies gibt Ihnen den sofortigen Zugriff auf Ihre Daten, was Ihnen hilft, mit Ihren Datenänderungen auf dem Laufenden zu bleiben. In der heutigen Welt möchte jeder sofortige Benachrichtigungen, anstatt sie nach einigen Stunden oder Minuten zu erhalten. Für einige Anwendungen ist es wichtig, Echtzeitbenachrichtigungen für alle abonnierten Benutzer für jedes einzelne Update zu senden. MongoDB hat diesen Prozess durch die Einführung dieser Funktion wirklich einfach gemacht. In diesem Artikel erfahren wir anhand einiger Beispiele etwas über den MongoDB-Änderungsstrom und seine Anwendungen.
Änderungsströme definieren
Änderungsströme sind nichts anderes als der Echtzeitstrom aller Änderungen, die in der Datenbank oder Sammlung oder sogar in Bereitstellungen auftreten. Beispielsweise löst MongoDB jedes Mal, wenn eine Aktualisierung (Einfügen, Aktualisieren oder Löschen) in einer bestimmten Sammlung auftritt, ein Änderungsereignis mit allen geänderten Daten aus.
Sie können Änderungsströme für jede Sammlung wie alle anderen normalen Aggregationsoperatoren mit dem $changeStream-Operator und der watch()-Methode definieren. Sie können den Änderungsstrom auch mit der Methode MongoCollection.watch() definieren.
Beispiel
db.myCollection.watch()
Streams-Funktionen ändern
-
Filtern von Änderungen
Sie können die Änderungen filtern, um Ereignisbenachrichtigungen nur für bestimmte Zieldaten zu erhalten.
Beispiel:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
Dieser Code stellt sicher, dass Sie nur Aktualisierungen für Datensätze erhalten, deren Name gleich Bob ist. Auf diese Weise können Sie beliebige Pipelines schreiben, um die Änderungsströme zu filtern.
-
Fortsetzen von Änderungsströmen
Diese Funktion stellt sicher, dass bei Ausfällen kein Datenverlust auftritt. Jede Antwort im Stream enthält das Resume-Token, mit dem der Stream ab einem bestimmten Punkt neu gestartet werden kann. Bei einigen häufigen Netzwerkausfällen versucht der mongodb-Treiber, die Verbindung mit den Abonnenten mithilfe des neuesten Wiederaufnahme-Tokens wiederherzustellen. Im Falle eines vollständigen Anwendungsausfalls sollte das Wiederaufnahme-Token jedoch von den Clients beibehalten werden, um den Stream fortzusetzen.
-
Geordnete Änderungsströme
MongoDB verwendet eine globale logische Uhr, um alle Change Stream-Ereignisse über alle Replikate und Shards eines beliebigen Clusters hinweg zu ordnen, sodass der Empfänger die Benachrichtigungen immer in derselben Reihenfolge erhält, in der die Befehle auf die Datenbank angewendet wurden.
-
Veranstaltungen mit vollständigen Dokumenten
MongoDB gibt standardmäßig den Teil der übereinstimmenden Dokumente zurück. Sie können die Change Stream-Konfiguration jedoch ändern, um ein vollständiges Dokument zu erhalten. Übergeben Sie dazu { fullDocument:„updateLookup“} an die Watch-Methode.
Beispiel:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Haltbarkeit
Change Streams benachrichtigen nur für die Daten, die für die Mehrheit der Replikate festgeschrieben sind. Dadurch wird sichergestellt, dass Ereignisse durch Mehrheitspersistenzdaten generiert werden, die die Dauerhaftigkeit der Nachricht gewährleisten.
-
Sicherheit/Zugriffskontrolle
Change Streams sind sehr sicher. Benutzer können Änderungsströme nur für die Sammlungen erstellen, für die sie Leseberechtigungen haben. Sie können Änderungsströme basierend auf Benutzerrollen erstellen.
Beispiel für Änderungsströme
In diesem Beispiel erstellen wir Änderungsströme für die Stock-Sammlung, um benachrichtigt zu werden, wenn ein Aktienkurs über einen Schwellenwert steigt.
-
Cluster einrichten
Um Change Streams verwenden zu können, müssen wir zuerst einen Replikatsatz erstellen. Führen Sie den folgenden Befehl aus, um einen Einzelknoten-Replikatsatz zu erstellen.
mongod --dbpath ./data --replSet “rs”
-
Einige Datensätze in die Stocks-Sammlung einfügen
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Knotenumgebung einrichten und Abhängigkeiten installieren
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Änderungen abonnieren
Erstellen Sie eine index.js-Datei und fügen Sie den folgenden Code ein.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Führen Sie nun diese Datei aus:
node index.js
-
Fügen Sie einen neuen Datensatz in db ein, um eine Aktualisierung zu erhalten
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Überprüfen Sie jetzt Ihre Konsole, Sie erhalten ein Update von MongoDB.
Beispielantwort:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Hier können Sie den Wert des operationType-Parameters mit den folgenden Operationen ändern, um auf verschiedene Arten von Änderungen in einer Sammlung zu lauschen:
- Einfügen
- Ersetzen (außer eindeutige ID)
- Aktualisieren
- Löschen
- Ungültig machen (immer wenn Mongo einen ungültigen Cursor zurückgibt)
Andere Modi von Änderungsströmen
Sie können Änderungsströme für eine Datenbank und Bereitstellung auf die gleiche Weise wie für die Sammlung starten. Diese Funktion wurde von MongoDB Version 4.0 veröffentlicht. Hier sind die Befehle zum Öffnen eines Änderungsstroms für Datenbanken und Bereitstellungen.
Against DB: db.watch()
Against deployment: Mongo.watch()
Schlussfolgerung
MongoDB Change Streams vereinfacht die Integration zwischen Frontend und Backend in Echtzeit und nahtlos. Diese Funktion kann Ihnen dabei helfen, MongoDB für das Pubsub-Modell zu verwenden, sodass Sie Kafka- oder RabbitMQ-Bereitstellungen nicht mehr verwalten müssen. Wenn Ihre Anwendung Echtzeitinformationen benötigt, müssen Sie sich diese Funktion von MongoDB ansehen. Ich hoffe, dieser Beitrag hilft Ihnen beim Einstieg in MongoDB-Change-Streams.