sql >> Database teknologi >  >> NoSQL >> MongoDB

MongoDB-ydelse:Kørsel af MongoDB-aggregeringer på sekundære

Aggregeringsoperationer i MongoDB giver dig mulighed for at behandle dataposter, gruppere dem og returnere deres beregnede resultater. MongoDB understøtter tre slags aggregeringsoperationer:

  1. Aggregeringskommandoer til enkelt formål
  2. Map-Reduce
  3. Aggregation pipeline

Du kan bruge dette MongoDB sammenligningsdokument til at se, hvad der passer til dine behov.

Aggregation Pipeline

 Aggregation-pipelinen er en MongoDB-ramme, der sørger for dataaggregering via en databehandlingspipeline. Det vil sige, at dokumenter sendes gennem en flertrinspipeline, der filtrerer, grupperer og på anden måde transformerer dokumenterne ved hvert trin. Det giver SQL "GROUP BY ...." type konstruktioner til MongoDB, der kører på selve databasen. Aggregeringsdokumentation giver nyttige eksempler på oprettelse af sådanne pipelines.

Hvorfor køre aggregeringer på den sekundære?

Aggregationspipelines er ressourcekrævende operationer – det giver mening at overføre aggregeringsjob til sekundære af et MongoDB-replikasæt, når det er ok at operere på lidt forældede data. Dette gælder typisk for "batch"-operationer, da de ikke forventer at køre på de seneste data. Hvis outputtet skal skrives til en samling, kører aggregeringsjobbene kun på det primære, da kun det primære er skrivbart i MongoDB.

I dette indlæg vil vi vise dig, hvordan du sikrer, at aggregeringspipelines udføres på den sekundære både fra mongo-skallen og Java.

Udfør aggregationsrørledninger på den sekundære fra Mongo Shell og Java i MongoDBCklik for at tweete

Bemærk:Vi bruger prøvedatasættet leveret af MongoDB i deres postnummersammenlægningseksempel for at fremvise vores eksempler. Du kan downloade det som anvist i eksemplet.

Aggregation Pipeline på replikasæt

MongoDB shell

Indstilling af læsepræference til sekundær gør tricket, når du kører et aggregeringsjob fra mongo-skallen. Lad os prøve at hente alle stater med en befolkning på over 10 millioner (første aggregering i postnummereksemplet). Både shell og server kører MongoDB version 3.2.10.

mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
RS-repl0-0:PRIMARY> use test
switched to db test
RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave
RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref
RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-repl0-0:PRIMARY> db.zips.aggregate( [
...    { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
...    { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }
{ "_id" : "TX", "totalPop" : 16984601 }

Et kig i MongoDB-logfilerne (med logning aktiveret for kommandoer) på den sekundære viser, at aggregering faktisk kørte på den sekundære:

...
2016-12-05T06:20:14.783+0000 I COMMAND  [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { 
$match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire
Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms
...

Java

Fra MongoDB Java-driveren gør det igen at indstille læsepræferencen. Her er et eksempel, der bruger driverversion 3.2.2:

public class AggregationChecker {

    /*
     * Data and code inspired from:
     * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million
     */
    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0";

    private static final String COL_NAME = "zips";
    private static final String DEF_DB = "test";

    public AggregationChecker() {
    }

    public static void main(String[] args) {
        AggregationChecker writer = new AggregationChecker();
        writer.aggregationJob();
    }

    private void aggregationJob() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        try {
            final DB db = client.getDB(DEF_DB);
            final DBCollection coll = db.getCollection(COL_NAME);
            // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state
            Iterable iterable = coll.aggregate(
                    Arrays.asList(
                            new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop",
                                    new BasicDBObject("$sum", "$pop"))),
                                    new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results();

            for (DBObject entry : iterable) {
                printer(entry.toString());
            }
        } finally {
            client.close();
        }
        printer("Done...");
    }
...
}

Log på den sekundære:

...
2016-12-01T10:54:18.667+0000 I COMMAND  [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms
...

Der blev ikke registreret nogen operation på den primære.

Aggregationsrørledning på splittede klynger

Aggregationsrørledninger understøttes på opdelte klynger. Detaljeret adfærd er forklaret i dokumentationen. Implementeringsmæssigt er der lille forskel mellem replikasæt og sharded cluster, når du bruger en aggregeringspipeline.

Sådan opsætter du en aggregationspipeline på splittede klynger i MongoDBClik for at tweete

MongoDB shell

Før du importerer data til den shardede klynge, skal du aktivere sharding på samlingen.

mongos> sh.enableSharding("test")
mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )

Derefter er operationerne de samme som replikasættet:

mongos> db.setSlaveOk()
mongos> db.getMongo().setReadPref('secondary')
mongos> db.getMongo().getReadPrefMode()
secondary
mongos> db.zips.aggregate( [
...    { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
...    { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "TX", "totalPop" : 16984601 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }

Logfiler fra en af ​​sekundærerne:

...
2016-12-02T05:46:24.627+0000 I COMMAND  [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:46:24.641+0000 I COMMAND  [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms
...

Java

Samme kode som gældende i replikasæt fungerer fint med en sønderdelt klynge. Bare udskift replikasættets forbindelsesstreng med den fra den sønderdelte klynge. Logfiler fra en sekundær indikerer, at jobbet faktisk blev kørt på de sekundære:

...
2016-12-02T05:39:12.339+0000 I COMMAND  [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:39:12.371+0000 I COMMAND  [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms
...

Var dette indhold nyttigt? Fortæl os det ved at tweete til os @scaledgridio, og som altid, hvis du har spørgsmål, så lad os det vide i kommentarerne nedenfor. Åh og! Glem ikke at tjekke vores MongoDB-hostingprodukter, der kan spare op til 40 % på langsigtede MongoDB®-hostingomkostninger.


  1. Sådan skriver du fagforeningsforespørgsler i mongoDB

  2. Bruger Redis et brugernavn til godkendelse?

  3. Føj data til eksisterende gridfs-fil

  4. 3 måder at konvertere en dato til en streng i MongoDB