Map-reduce er måske den mest alsidige af de aggregeringsoperationer, som MongoDB understøtter.
Map-Reduce er en populær programmeringsmodel, der stammer fra Google til at behandle og aggregere store mængder data parallelt. En detaljeret diskussion om Map-Reduce er uden for denne artikels omfang, men i det væsentlige er det en aggregeringsproces i flere trin. De vigtigste to trin er kortstadiet (behandle hvert dokument og udsende resultater) og reduceringsstadiet (samler resultater udsendt under kortfasen).
MongoDB understøtter tre slags aggregeringsoperationer:Map-Reduce, aggregeringspipeline og aggregeringskommandoer til enkelt formål. Du kan bruge dette MongoDB-sammenligningsdokument til at se, hvad der passer til dine behov.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
I mit sidste indlæg så vi, med eksempler, hvordan man kører Aggregation pipelines på sekundære. I dette indlæg vil vi gennemgå at køre Map-Reduce-job på MongoDB sekundære replikaer.
MongoDB Map-Reduce
MongoDB understøtter at køre Map-Reduce-job på databaseserverne. Dette giver fleksibiliteten til at skrive komplekse aggregeringsopgaver, der ikke så let kan udføres via aggregeringspipelines. MongoDB lader dig skrive brugerdefineret kort og reducere funktioner i Javascript, der kan overføres til databasen via Mongo shell eller enhver anden klient. På store og konstant voksende datasæt kan man endda overveje at køre inkrementelle Map-Reduce-job for at undgå at behandle ældre data hver gang.
Historisk set plejede kortet og reduceringsmetoderne at blive udført i en enkelt-trådskontekst. Den begrænsning blev dog fjernet i version 2.4.
Hvorfor køre Map-Reduce-job på den sekundære?
Ligesom andre aggregeringsopgaver er Map-Reduce også et ressourcekrævende "batch"-job, så det passer godt til at køre på skrivebeskyttede replikaer. Forbeholdene ved at gøre det er:
1) Det burde være ok at bruge lidt forældede data. Eller du kan justere skriveproblemet for at sikre, at replikaer altid er synkroniserede med det primære. Denne anden mulighed forudsætter, at det er acceptabelt at få et slag på skriveydelsen.
2) Outputtet fra Map-Reduce-jobbet bør ikke skrives til en anden samling i databasen, men snarere returneres til applikationen (dvs. ingen skrivninger til databasen).
Lad os se på, hvordan man gør dette via eksempler, både fra mongo-skallen og Java-driveren.
Map-Reduce på replikasæt
Datasæt
Til illustration vil vi bruge et ret simpelt datasæt:Et dagligt transaktionsregistreringsdump fra en forhandler. En eksempelindgang ser sådan ud:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
I vores eksempler vil vi beregne det samlede forbrug for en given kunde på den pågældende dag. I betragtning af vores skema vil kort- og reduktionsmetoderne således se ud:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
Med vores skema etableret, lad os se på Map-Reduce i aktion.
MongoDB Shell
For at sikre, at et Map-Reduce-job udføres på det sekundære, skal læsepræferencen indstilles til sekundær . Som vi sagde ovenfor, for at en Map-Reduce kan køre på en sekundær, skal resultatet af resultatet være inline (Faktisk er det den eneste udeværdi, der er tilladt på sekundære). Lad os se, hvordan det virker.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
Et kig på logfilerne på den sekundære bekræfter, at jobbet faktisk kørte på den sekundære.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
Lad os nu prøve at køre et Map-Reduce-job på læsereplikaerne fra en Java-applikation. På MongoDB Java-driveren gør det tricket at indstille læsepræferencen. Outputtet er som standard inline, så der skal ikke videregives yderligere parametre. Her er et eksempel, der bruger driverversion 3.2.2:
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
Som det fremgår af logfilerne, kørte jobbet på den sekundære:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce på Sharded-klynger
MongoDB understøtter Map-Reduce på sharded-klynger, både når en sharded-samling er input, og når det er output fra et Map-Reduce-job. MengoDB understøtter i øjeblikket ikke at køre kort-reducerende job på sekundære dele af en sharded klynge. Så selvom ud-muligheden er indstillet til inline , Map-Reduce-job vil altid køre på de primære data fra en sharded klynge. Dette problem spores gennem denne JIRA-fejl.
Syntaksen for at udføre et Map-Reduce-job på en sharded klynge er den samme som på et replikasæt. Så eksemplerne i ovenstående afsnit holder. Hvis ovenstående Java-eksempel køres på en fragmenteret klynge, vises logmeddelelser på de primære programmer, der indikerer, at kommandoen kørte der.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Besøg vores MongoDB-produktside for at finde ud af om vores omfattende liste over funktioner.