sql >> Database teknologi >  >> NoSQL >> MongoDB

Realtidsdatastreaming med MongoDB Change Streams

For nylig udgav MongoDB en ny funktion fra version 3.6, Change Streams. Dette giver dig øjeblikkelig adgang til dine data, som hjælper dig med at holde dig opdateret med dine dataændringer. I dagens verden vil alle have øjeblikkelige meddelelser i stedet for at få det efter nogle timer eller minutter. For nogle applikationer er det afgørende at skubbe notifikationer i realtid til alle abonnerede brugere for hver eneste opdatering. MongoDB gjorde denne proces virkelig nem ved at introducere denne funktion. I denne artikel vil vi lære om MongoDB change stream og dens applikationer med nogle eksempler.

Definition af ændringsstrømme

Ændringsstrømme er intet andet end realtidsstrømmen af ​​ændringer, der forekommer i databasen eller samlingen eller endda i implementeringer. For eksempel, når en opdatering (Indsæt, Opdater eller Slet) forekommer i en specifik samling, udløser MongoDB en ændringshændelse med alle de data, der er blevet ændret.

Du kan definere ændringsstrømme på enhver samling ligesom alle andre normale aggregeringsoperatører ved at bruge $changeStream-operatoren og watch()-metoden. Du kan også definere ændringsstrøm ved hjælp af MongoCollection.watch() metoden.

Eksempel

db.myCollection.watch()

Skift streams-funktioner

  • Filtrering af ændringer

    Du kan filtrere ændringerne for kun at få hændelsesmeddelelser for nogle målrettede data.

    Eksempel:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Denne kode sørger for, at du kun får opdateringer for poster, der har navnet svarende til Bob. På denne måde kan du skrive alle pipelines for at filtrere ændringsstrømmene.

  • Genoptagelse af ændringsstrømme

    Denne funktion sikrer, at der ikke er datatab i tilfælde af fejl. Hvert svar i streamen indeholder CV-tokenet, som kan bruges til at genstarte streamen fra et bestemt punkt. For nogle hyppige netværksfejl vil mongodb-driveren forsøge at genetablere forbindelsen med abonnenterne ved hjælp af det seneste CV-token. Selvom, i tilfælde af fuldstændig applikationsfejl, skal CV-token vedligeholdes af klienterne for at genoptage streamen.

  • Bestilte ændringsstrømme

    MongoDB bruger et globalt logisk ur til at ordne alle ændringsstream-hændelser på tværs af alle replikaer og shards af enhver klynge, så modtageren vil altid modtage meddelelserne i samme rækkefølge, som kommandoerne blev anvendt på databasen.

  • Begivenheder med fuldstændige dokumenter

    MongoDB returnerer som standard den del af de matchende dokumenter. Men du kan ændre ændringsstrømkonfigurationen for at modtage et komplet dokument. For at gøre det skal du sende { fullDocument:“updateLookup”} til overvågningsmetoden.
    Eksempel:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Holdbarhed

    Ændringsstrømme vil kun give besked for de data, der er forpligtet til størstedelen af ​​replikaerne. Dette vil sikre, at hændelser genereres af de fleste persistensdata, hvilket sikrer meddelelsens holdbarhed.

  • Sikkerhed/adgangskontrol

    Skift streams er meget sikre. Brugere kan kun oprette ændringsstrømme på de samlinger, som de har læsetilladelser til. Du kan oprette ændringsstrømme baseret på brugerroller.

Severalnines Bliv en MongoDB DBA - Bring MongoDB to ProductionFå flere oplysninger om, hvad du skal vide for at implementere, overvåge, administrere og skalere MongoDBDownload gratis

Eksempel på Change Streams

I dette eksempel vil vi oprette ændringsstrømme på aktiesamlingen for at få besked, når en aktiekurs går over en tærskel.

  • Konfigurer klyngen

    For at bruge ændringsstrømme skal vi først oprette replikasæt. Kør følgende kommando for at oprette enkelt node replikasæt.

    mongod --dbpath ./data --replSet “rs”
  • Indsæt nogle poster i Aktiesamlingen

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Opsæt nodemiljø og installationsafhængigheder

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Abonner på ændringerne

    Opret én index.js-fil og indsæt følgende kode i den.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Kør nu denne fil:

    node index.js
  • Indsæt en ny post i db for at modtage en opdatering

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Tjek nu din konsol, du vil modtage en opdatering fra MongoDB.
    Eksempel svar:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Her kan du ændre værdien af ​​operationType parameter med følgende operationer for at lytte efter forskellige typer ændringer i en samling:

  • Indsæt
  • Erstat (undtagen unikt id)
  • Opdater
  • Slet
  • Ugyldig (når Mongo returnerer ugyldig markør)

Andre ændringstilstande streams

Du kan starte ændringsstrømme mod en database og implementering på samme måde som mod indsamling. Denne funktion er blevet frigivet fra MongoDB version 4.0. Her er kommandoerne til at åbne en ændringsstrøm mod database og implementeringer.

Against DB: db.watch()
Against deployment: Mongo.watch()

Konklusion

MongoDB Change Streams forenkler integrationen mellem frontend og backend på en realtid og problemfri måde. Denne funktion kan hjælpe dig med at bruge MongoDB til pubsub-modellen, så du ikke længere behøver at administrere Kafka- eller RabbitMQ-implementeringer. Hvis din applikation kræver information i realtid, skal du tjekke denne funktion i MongoDB. Jeg håber, at dette indlæg vil få dig i gang med MongoDB change streams.


  1. MongoDB - Slet et dokument

  2. HDFS-datablok – Lær det interne i Big Data Hadoop

  3. mongodb-forespørgsel uden feltnavn

  4. Hvordan opretter man en lokal Windows-baseret servicebus uden for Azure, der ligner Redis med automatisk fail-over?