MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

MongoDB-Leistung:Ausführen von MongoDB-Aggregationen auf sekundären Servern

Mit Aggregationsvorgängen in MongoDB können Sie Datensätze verarbeiten, gruppieren und ihre berechneten Ergebnisse zurückgeben. MongoDB unterstützt drei Arten von Aggregationsvorgängen:

  1. Einzelzweck-Aggregationsbefehle
  2. Karte reduzieren
  3. Aggregationspipeline

Sie können dieses MongoDB-Vergleichsdokument verwenden, um zu sehen, welches Ihren Anforderungen entspricht.

Aggregationspipeline

Die Aggregationspipeline ist ein MongoDB-Framework, das die Datenaggregation über eine Datenverarbeitungspipeline bereitstellt. Das heißt, Dokumente werden durch eine mehrstufige Pipeline gesendet, wobei die Dokumente bei jedem Schritt gefiltert, gruppiert und anderweitig transformiert werden. Es stellt SQL „GROUP BY …“ zur Verfügung. Art von Konstrukten für MongoDB, die auf der Datenbank selbst ausgeführt werden. Die Aggregationsdokumentation enthält nützliche Beispiele für die Erstellung solcher Pipelines.

Warum Aggregationen auf der Sekundärseite ausführen?

Aggregationspipelines sind ressourcenintensive Vorgänge. Es ist sinnvoll, Aggregationsjobs auf Sekundärdatenbanken eines MongoDB-Replikatsatzes auszulagern, wenn es in Ordnung ist, mit leicht veralteten Daten zu arbeiten. Dies gilt typischerweise für „Batch“-Vorgänge, da sie nicht erwarten, mit den neuesten Daten ausgeführt zu werden. Wenn die Ausgabe in eine Sammlung geschrieben werden muss, werden die Aggregationsjobs nur auf dem Primärserver ausgeführt, da nur der Primärserver in MongoDB beschreibbar ist.

In diesem Beitrag zeigen wir Ihnen, wie Sie sicherstellen können, dass Aggregationspipelines sowohl von der Mongo-Shell als auch von Java auf der Sekundärseite ausgeführt werden.

Führen Sie Aggregationspipelines auf der Sekundärseite von Mongo Shell und Java in MongoDBClick To Tweet aus

Hinweis:Wir verwenden den von MongoDB bereitgestellten Beispieldatensatz in ihrem Beispiel für die Aggregation von Postleitzahlen, um unsere Beispiele zu präsentieren. Sie können es wie im Beispiel beschrieben herunterladen.

Aggregationspipeline für Replikatsätze

MongoDB-Shell

Setzen der Lesepräferenz auf sekundär macht den Trick, wenn ein Aggregationsjob von der Mongo-Shell ausgeführt wird. Versuchen wir, alle Staaten mit mehr als 10 Millionen Einwohnern abzurufen (erste Aggregation im Beispiel mit Postleitzahlen). Sowohl die Shell als auch der Server führen MongoDB Version 3.2.10 aus.

mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
RS-repl0-0:PRIMARY> use test
switched to db test
RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave
RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref
RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-repl0-0:PRIMARY> db.zips.aggregate( [
...    { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
...    { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }
{ "_id" : "TX", "totalPop" : 16984601 }

Ein Blick in die MongoDB-Protokolle (mit aktivierter Protokollierung für Befehle) auf dem sekundären zeigt, dass die Aggregation tatsächlich auf dem sekundären ausgeführt wurde:

...
2016-12-05T06:20:14.783+0000 I COMMAND  [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { 
$match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire
Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms
...

Java

Aus dem MongoDB-Java-Treiber reicht es aus, erneut die Leseeinstellung festzulegen. Hier ist ein Beispiel mit Treiberversion 3.2.2:

public class AggregationChecker {

    /*
     * Data and code inspired from:
     * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million
     */
    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0";

    private static final String COL_NAME = "zips";
    private static final String DEF_DB = "test";

    public AggregationChecker() {
    }

    public static void main(String[] args) {
        AggregationChecker writer = new AggregationChecker();
        writer.aggregationJob();
    }

    private void aggregationJob() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        try {
            final DB db = client.getDB(DEF_DB);
            final DBCollection coll = db.getCollection(COL_NAME);
            // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state
            Iterable iterable = coll.aggregate(
                    Arrays.asList(
                            new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop",
                                    new BasicDBObject("$sum", "$pop"))),
                                    new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results();

            for (DBObject entry : iterable) {
                printer(entry.toString());
            }
        } finally {
            client.close();
        }
        printer("Done...");
    }
...
}

Meldet den sekundären an:

...
2016-12-01T10:54:18.667+0000 I COMMAND  [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms
...

Auf der Primärseite wurde kein Vorgang aufgezeichnet.

Aggregationspipeline auf Sharded-Clustern

Aggregationspipelines werden auf fragmentierten Clustern unterstützt. Das detaillierte Verhalten wird in der Dokumentation erläutert. In Bezug auf die Implementierung gibt es bei Verwendung einer Aggregationspipeline kaum einen Unterschied zwischen Replikatsatz und Sharding-Cluster.

So richten Sie eine Aggregationspipeline auf Sharded-Clustern in MongoDBClick To Tweet ein

MongoDB-Shell

Aktivieren Sie vor dem Importieren von Daten in den Sharding-Cluster das Sharding für die Sammlung.

mongos> sh.enableSharding("test")
mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )

Danach sind die Operationen dieselben wie beim Replikatsatz:

mongos> db.setSlaveOk()
mongos> db.getMongo().setReadPref('secondary')
mongos> db.getMongo().getReadPrefMode()
secondary
mongos> db.zips.aggregate( [
...    { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
...    { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "TX", "totalPop" : 16984601 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }

Protokolle von einem der Secondaries:

...
2016-12-02T05:46:24.627+0000 I COMMAND  [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:46:24.641+0000 I COMMAND  [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms
...

Java

Derselbe Code wie im Replikatsatz funktioniert gut mit einem Sharding-Cluster. Ersetzen Sie einfach die Verbindungszeichenfolge des Replikatsatzes durch die des Sharding-Clusters. Protokolle von einem sekundären Server weisen darauf hin, dass der Job tatsächlich auf den sekundären Servern ausgeführt wurde:

...
2016-12-02T05:39:12.339+0000 I COMMAND  [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:39:12.371+0000 I COMMAND  [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms
...

War dieser Inhalt hilfreich? Lass es uns wissen, indem du an uns @scaledgridio twitterst und wie immer, wenn du irgendwelche Fragen hast, lass es uns in den Kommentaren unten wissen. Oh und! Vergessen Sie nicht, sich unsere MongoDB-Hosting-Produkte anzusehen, mit denen Sie bis zu 40 % der langfristigen MongoDB®-Hosting-Kosten einsparen können.