For nylig udgav MongoDB en ny funktion fra version 3.6, Change Streams. Dette giver dig øjeblikkelig adgang til dine data, som hjælper dig med at holde dig opdateret med dine dataændringer. I dagens verden vil alle have øjeblikkelige meddelelser i stedet for at få det efter nogle timer eller minutter. For nogle applikationer er det afgørende at skubbe notifikationer i realtid til alle abonnerede brugere for hver eneste opdatering. MongoDB gjorde denne proces virkelig nem ved at introducere denne funktion. I denne artikel vil vi lære om MongoDB change stream og dens applikationer med nogle eksempler.
Definition af ændringsstrømme
Ændringsstrømme er intet andet end realtidsstrømmen af ændringer, der forekommer i databasen eller samlingen eller endda i implementeringer. For eksempel, når en opdatering (Indsæt, Opdater eller Slet) forekommer i en specifik samling, udløser MongoDB en ændringshændelse med alle de data, der er blevet ændret.
Du kan definere ændringsstrømme på enhver samling ligesom alle andre normale aggregeringsoperatører ved at bruge $changeStream-operatoren og watch()-metoden. Du kan også definere ændringsstrøm ved hjælp af MongoCollection.watch() metoden.
Eksempel
db.myCollection.watch()
Skift streams-funktioner
-
Filtrering af ændringer
Du kan filtrere ændringerne for kun at få hændelsesmeddelelser for nogle målrettede data.
Eksempel:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
Denne kode sørger for, at du kun får opdateringer for poster, der har navnet svarende til Bob. På denne måde kan du skrive alle pipelines for at filtrere ændringsstrømmene.
-
Genoptagelse af ændringsstrømme
Denne funktion sikrer, at der ikke er datatab i tilfælde af fejl. Hvert svar i streamen indeholder CV-tokenet, som kan bruges til at genstarte streamen fra et bestemt punkt. For nogle hyppige netværksfejl vil mongodb-driveren forsøge at genetablere forbindelsen med abonnenterne ved hjælp af det seneste CV-token. Selvom, i tilfælde af fuldstændig applikationsfejl, skal CV-token vedligeholdes af klienterne for at genoptage streamen.
-
Bestilte ændringsstrømme
MongoDB bruger et globalt logisk ur til at ordne alle ændringsstream-hændelser på tværs af alle replikaer og shards af enhver klynge, så modtageren vil altid modtage meddelelserne i samme rækkefølge, som kommandoerne blev anvendt på databasen.
-
Begivenheder med fuldstændige dokumenter
MongoDB returnerer som standard den del af de matchende dokumenter. Men du kan ændre ændringsstrømkonfigurationen for at modtage et komplet dokument. For at gøre det skal du sende { fullDocument:“updateLookup”} til overvågningsmetoden.
Eksempel:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Holdbarhed
Ændringsstrømme vil kun give besked for de data, der er forpligtet til størstedelen af replikaerne. Dette vil sikre, at hændelser genereres af de fleste persistensdata, hvilket sikrer meddelelsens holdbarhed.
-
Sikkerhed/adgangskontrol
Skift streams er meget sikre. Brugere kan kun oprette ændringsstrømme på de samlinger, som de har læsetilladelser til. Du kan oprette ændringsstrømme baseret på brugerroller.
Eksempel på Change Streams
I dette eksempel vil vi oprette ændringsstrømme på aktiesamlingen for at få besked, når en aktiekurs går over en tærskel.
-
Konfigurer klyngen
For at bruge ændringsstrømme skal vi først oprette replikasæt. Kør følgende kommando for at oprette enkelt node replikasæt.
mongod --dbpath ./data --replSet “rs”
-
Indsæt nogle poster i Aktiesamlingen
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Opsæt nodemiljø og installationsafhængigheder
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Abonner på ændringerne
Opret én index.js-fil og indsæt følgende kode i den.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Kør nu denne fil:
node index.js
-
Indsæt en ny post i db for at modtage en opdatering
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Tjek nu din konsol, du vil modtage en opdatering fra MongoDB.
Eksempel svar:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Her kan du ændre værdien af operationType parameter med følgende operationer for at lytte efter forskellige typer ændringer i en samling:
- Indsæt
- Erstat (undtagen unikt id)
- Opdater
- Slet
- Ugyldig (når Mongo returnerer ugyldig markør)
Andre ændringstilstande streams
Du kan starte ændringsstrømme mod en database og implementering på samme måde som mod indsamling. Denne funktion er blevet frigivet fra MongoDB version 4.0. Her er kommandoerne til at åbne en ændringsstrøm mod database og implementeringer.
Against DB: db.watch()
Against deployment: Mongo.watch()
Konklusion
MongoDB Change Streams forenkler integrationen mellem frontend og backend på en realtid og problemfri måde. Denne funktion kan hjælpe dig med at bruge MongoDB til pubsub-modellen, så du ikke længere behøver at administrere Kafka- eller RabbitMQ-implementeringer. Hvis din applikation kræver information i realtid, skal du tjekke denne funktion i MongoDB. Jeg håber, at dette indlæg vil få dig i gang med MongoDB change streams.