MongoDB
 sql >> Datenbank >  >> NoSQL >> MongoDB

MongoDB-Leistung:Ausführen von MongoDB-Map-Reduce-Vorgängen auf sekundären Servern

Map-Reduce ist vielleicht die vielseitigste Aggregationsoperation, die MongoDB unterstützt.

Map-Reduce ist ein beliebtes Programmiermodell, das seinen Ursprung bei Google hat, um große Datenmengen parallel zu verarbeiten und zu aggregieren. Eine detaillierte Erörterung von Map-Reduce würde den Rahmen dieses Artikels sprengen, aber im Wesentlichen handelt es sich um einen mehrstufigen Aggregationsprozess. Die zwei wichtigsten Schritte sind die Map-Phase (verarbeitet jedes Dokument und gibt Ergebnisse aus) und die Reduce-Phase (vergleicht die während der Map-Phase ausgegebenen Ergebnisse).

MongoDB unterstützt drei Arten von Aggregationsvorgängen:Map-Reduce, Aggregationspipeline und Einzelzweck-Aggregationsbefehle. Sie können dieses MongoDB-Vergleichsdokument verwenden, um zu sehen, welches Ihren Anforderungen entspricht. https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/

In meinem letzten Beitrag haben wir anhand von Beispielen gesehen, wie Aggregationspipelines auf sekundären Servern ausgeführt werden. In diesem Beitrag gehen wir durch die Ausführung von Map-Reduce-Jobs auf den sekundären Replikaten von MongoDB.

MongoDB Map-Reduce

MongoDB unterstützt das Ausführen von Map-Reduce-Jobs auf den Datenbankservern. Dies bietet die Flexibilität, komplexe Aggregationsaufgaben zu schreiben, die nicht so einfach über Aggregationspipelines zu erledigen sind. Mit MongoDB können Sie benutzerdefinierte Karten schreiben und Funktionen in Javascript reduzieren, die über die Mongo-Shell oder einen anderen Client an die Datenbank übergeben werden können. Bei großen und ständig wachsenden Datensätzen kann man sogar erwägen, inkrementelle Map-Reduce-Jobs auszuführen, um zu vermeiden, dass jedes Mal ältere Daten verarbeitet werden.

Früher wurden die Map- und die Reduce-Methode in einem Singlethread-Kontext ausgeführt. Diese Einschränkung wurde jedoch in Version 2.4 entfernt.

Warum Map-Reduce-Jobs auf der Sekundärseite ausführen?

Wie andere Aggregationsjobs ist auch Map-Reduce ein ressourcenintensiver „Batch“-Job und eignet sich daher gut für die Ausführung auf schreibgeschützten Replikaten. Die Einschränkungen dabei sind:

1) Es sollte in Ordnung sein, leicht veraltete Daten zu verwenden. Oder Sie können den Schreibschutz optimieren, um sicherzustellen, dass Replikate immer mit dem Primärserver synchron sind. Diese zweite Option geht davon aus, dass es akzeptabel ist, die Schreibleistung zu beeinträchtigen.

2) Die Ausgabe des Map-Reduce-Jobs sollte nicht in eine andere Sammlung innerhalb der Datenbank geschrieben werden, sondern an die Anwendung zurückgegeben werden (d. h. keine Schreibvorgänge in die Datenbank).

Sehen wir uns anhand von Beispielen an, wie das geht, sowohl aus der Mongo-Shell als auch aus dem Java-Treiber.

Kartenreduzierung bei Replikat-Sets

Datensatz

Zur Veranschaulichung verwenden wir einen ziemlich einfachen Datensatz:Ein täglicher Transaktionsaufzeichnungs-Dump von einem Einzelhändler. Ein Beispieleintrag sieht so aus:

RS-replica-0:PRIMARY> use test
switched to db test
RS-replica-0:PRIMARY> show tables
txns
RS-replica-0:PRIMARY> db.txns.findOne()
{
    "_id" : ObjectId("584a3b71cdc1cb061957289b"),
    "custid" : "cust_66",
    "txnval" : 100,
    "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...],
...
}

In unseren Beispielen berechnen wir die Gesamtausgaben eines bestimmten Kunden an diesem Tag. Bei unserem Schema sehen die map- und Reduce-Methoden also so aus:

var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record
var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid

Nachdem unser Schema eingerichtet ist, sehen wir uns Map-Reduce in Aktion an.

MongoDB-Shell

Um sicherzustellen, dass ein Map-Reduce-Job auf dem Secondary ausgeführt wird, sollte die Lesepräferenz auf Secondary gesetzt werden . Wie wir oben gesagt haben, muss die Ausgabe des Ergebnisses inline sein, damit ein Map-Reduce auf einem Secondary ausgeführt werden kann (Tatsächlich ist dies der einzige out-Wert, der auf Secondaries erlaubt ist). Mal sehen, wie es funktioniert.

$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
MongoDB shell version: 3.2.10
connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test
2016-12-09T08:15:19.347+0000 I NETWORK  [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017
2016-12-09T08:15:19.349+0000 I NETWORK  [ReplicaSetMonitorWatcher] starting
RS-replica-0:PRIMARY> db.setSlaveOk()
RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary')
RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); }
RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); }
RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }})
{
    "results" : [
        {
            "_id" : "cust_0",
            "value" : 72734
        },
        {
            "_id" : "cust_1",
            "value" : 67737
        },
...
    ]
    "timeMillis" : 215,
    "counts" : {
        "input" : 10000,
        "emit" : 10000,
        "reduce" : 909,
        "output" : 101
    },
    "ok" : 1

}

Ein Blick in die Protokolle auf dem Sekundärserver bestätigt, dass der Job tatsächlich auf dem Sekundärserver ausgeführt wurde.

...
2016-12-09T08:17:24.842+0000 D COMMAND  [conn344] mr ns: test.txns
2016-12-09T08:17:24.843+0000 I COMMAND  [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:17:24.865+0000 I COMMAND  [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:17:25.063+0000 I COMMAND  [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms
...

Java

Lassen Sie uns nun versuchen, einen Map-Reduce-Job auf den Read Replicas von einer Java-Anwendung auszuführen. Beim MongoDB-Java-Treiber reicht das Festlegen der Leseeinstellung aus. Die Ausgabe erfolgt standardmäßig inline, sodass keine zusätzlichen Parameter übergeben werden müssen. Hier ist ein Beispiel mit Treiberversion 3.2.2:

public class MapReduceExample {

    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0";
    private static final String COL_NAME = "txns";
    private static final String DEF_DB = "test";

    public MapReduceExample() { }

    public static void main(String[] args) {
        MapReduceExample writer = new MapReduceExample();
        writer.mapReduce();
    }

    public static final String mapfunction = "function() { emit(this.custid, this.txnval); }";
    public static final String reducefunction = "function(key, values) { return Array.sum(values); }";

    private void mapReduce() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase database = client.getDatabase(DEF_DB);
        MongoCollection collection = database.getCollection(COL_NAME);
        MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default
        MongoCursor cursor = iterable.iterator();
        while (cursor.hasNext()) {
           Document result = cursor.next();
           printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value"));
        }
        printer("Done...");
    }
...
}

Wie aus den Protokollen hervorgeht, lief der Job auf dem sekundären:

...
2016-12-09T08:32:31.419+0000 D COMMAND  [conn371] mr ns: test.txns
2016-12-09T08:32:31.420+0000 I COMMAND  [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:32:31.444+0000 I COMMAND  [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:32:31.890+0000 I COMMAND  [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms
...

MongoDB Map-Reduce auf Sharded-Clustern

MongoDB unterstützt Map-Reduce auf Sharding-Clustern, sowohl wenn eine Sharding-Sammlung die Eingabe als auch die Ausgabe eines Map-Reduce-Jobs ist. MongoDB unterstützt jedoch derzeit keine Ausführung von Map-Reduce-Jobs auf Secondaries eines Sharding-Clusters. Also auch wenn die out option auf inline gesetzt ist , Map-Reduce-Jobs werden immer auf den primären Knoten eines Sharding-Clusters ausgeführt. Dieses Problem wird über diesen JIRA-Bug verfolgt.

Die Syntax zum Ausführen eines Map-Reduce-Jobs auf einem Sharding-Cluster ist dieselbe wie auf einem Replikatsatz. Die Beispiele im obigen Abschnitt gelten also. Wenn das obige Java-Beispiel auf einem Sharding-Cluster ausgeführt wird, erscheinen Protokollmeldungen auf den primären Clustern, die darauf hinweisen, dass der Befehl dort ausgeführt wurde.

...
2016-11-24T08:46:30.828+0000 I COMMAND  [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in
line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS
tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing:
 { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha
rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu
t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu
ireCount: { r: 1 } } } protocol:op_command 115ms
2016-11-24T08:46:30.830+0000 I COMMAND  [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0
...

Bitte besuchen Sie unsere MongoDB-Produktseite, um mehr über unsere umfangreiche Funktionsliste zu erfahren.