Aggregeringsoperationer i MongoDB giver dig mulighed for at behandle dataposter, gruppere dem og returnere deres beregnede resultater. MongoDB understøtter tre slags aggregeringsoperationer:
- Aggregeringskommandoer til enkelt formål
- Map-Reduce
- Aggregation pipeline
Du kan bruge dette MongoDB sammenligningsdokument til at se, hvad der passer til dine behov.
Aggregation Pipeline
Aggregation-pipelinen er en MongoDB-ramme, der sørger for dataaggregering via en databehandlingspipeline. Det vil sige, at dokumenter sendes gennem en flertrinspipeline, der filtrerer, grupperer og på anden måde transformerer dokumenterne ved hvert trin. Det giver SQL "GROUP BY ...." type konstruktioner til MongoDB, der kører på selve databasen. Aggregeringsdokumentation giver nyttige eksempler på oprettelse af sådanne pipelines.
Hvorfor køre aggregeringer på den sekundære?
Aggregationspipelines er ressourcekrævende operationer – det giver mening at overføre aggregeringsjob til sekundære af et MongoDB-replikasæt, når det er ok at operere på lidt forældede data. Dette gælder typisk for "batch"-operationer, da de ikke forventer at køre på de seneste data. Hvis outputtet skal skrives til en samling, kører aggregeringsjobbene kun på det primære, da kun det primære er skrivbart i MongoDB.
I dette indlæg vil vi vise dig, hvordan du sikrer, at aggregeringspipelines udføres på den sekundære både fra mongo-skallen og Java.
Udfør aggregationsrørledninger på den sekundære fra Mongo Shell og Java i MongoDBCklik for at tweeteBemærk:Vi bruger prøvedatasættet leveret af MongoDB i deres postnummersammenlægningseksempel for at fremvise vores eksempler. Du kan downloade det som anvist i eksemplet.
Aggregation Pipeline på replikasæt
MongoDB shell
Indstilling af læsepræference til sekundær gør tricket, når du kører et aggregeringsjob fra mongo-skallen. Lad os prøve at hente alle stater med en befolkning på over 10 millioner (første aggregering i postnummereksemplet). Både shell og server kører MongoDB version 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 RS-repl0-0:PRIMARY> use test switched to db test RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-repl0-0:PRIMARY> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 } { "_id" : "TX", "totalPop" : 16984601 }
Et kig i MongoDB-logfilerne (med logning aktiveret for kommandoer) på den sekundære viser, at aggregering faktisk kørte på den sekundære:
... 2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms ...
Java
Fra MongoDB Java-driveren gør det igen at indstille læsepræferencen. Her er et eksempel, der bruger driverversion 3.2.2:
public class AggregationChecker { /* * Data and code inspired from: * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million */ private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0"; private static final String COL_NAME = "zips"; private static final String DEF_DB = "test"; public AggregationChecker() { } public static void main(String[] args) { AggregationChecker writer = new AggregationChecker(); writer.aggregationJob(); } private void aggregationJob() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); try { final DB db = client.getDB(DEF_DB); final DBCollection coll = db.getCollection(COL_NAME); // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state Iterable iterable = coll.aggregate( Arrays.asList( new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop", new BasicDBObject("$sum", "$pop"))), new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results(); for (DBObject entry : iterable) { printer(entry.toString()); } } finally { client.close(); } printer("Done..."); } ... }
Log på den sekundære:
... 2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms ...
Der blev ikke registreret nogen operation på den primære.
Aggregationsrørledning på splittede klynger
Aggregationsrørledninger understøttes på opdelte klynger. Detaljeret adfærd er forklaret i dokumentationen. Implementeringsmæssigt er der lille forskel mellem replikasæt og sharded cluster, når du bruger en aggregeringspipeline.
Sådan opsætter du en aggregationspipeline på splittede klynger i MongoDBClik for at tweeteMongoDB shell
Før du importerer data til den shardede klynge, skal du aktivere sharding på samlingen.
mongos> sh.enableSharding("test") mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
Derefter er operationerne de samme som replikasættet:
mongos> db.setSlaveOk() mongos> db.getMongo().setReadPref('secondary') mongos> db.getMongo().getReadPrefMode() secondary mongos> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "TX", "totalPop" : 16984601 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 }
Logfiler fra en af sekundærerne:
... 2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms ...
Java
Samme kode som gældende i replikasæt fungerer fint med en sønderdelt klynge. Bare udskift replikasættets forbindelsesstreng med den fra den sønderdelte klynge. Logfiler fra en sekundær indikerer, at jobbet faktisk blev kørt på de sekundære:
... 2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms ...
Var dette indhold nyttigt? Fortæl os det ved at tweete til os @scaledgridio, og som altid, hvis du har spørgsmål, så lad os det vide i kommentarerne nedenfor. Åh og! Glem ikke at tjekke vores MongoDB-hostingprodukter, der kan spare op til 40 % på langsigtede MongoDB®-hostingomkostninger.