sql >> Database teknologi >  >> NoSQL >> Redis

Hvordan implementerer man en strøm af futures til et blokerende opkald ved hjælp af futures.rs og Redis PubSub?

Tung advarsel Jeg har aldrig brugt dette bibliotek før, og mit kendskab på lavt niveau til nogle af begreberne er lidt... mangler. For det meste læser jeg tutorialen igennem. Jeg er ret sikker på, at alle, der har udført async-arbejde, vil læse dette og grine, men det kan være et nyttigt udgangspunkt for andre mennesker. Advarsel emptor!

Lad os starte med noget lidt enklere, og demonstrere hvordan en Stream arbejder. Vi kan konvertere en iterator af Result s i en strøm:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Dette viser os en måde at forbruge streamen på. Vi bruger and_then at gøre noget ved hver nyttelast (her kun udskrive det) og derefter for_each for at konvertere Stream tilbage til en Future . Vi kan derefter køre fremtiden ved at kalde den mærkeligt navngivne forget metode.

Det næste er at binde Redis-biblioteket ind i blandingen og håndtere kun én besked. Siden get_message() metoden blokerer, skal vi introducere nogle tråde i blandingen. Det er ikke en god idé at udføre store mængder arbejde i denne type asynkrone systemer, da alt andet vil blive blokeret. For eksempel:

Medmindre det er arrangeret på anden måde, bør det sikres, at implementeringer af denne funktion afsluttes meget hurtigt .

I en ideel verden ville redis-kassen blive bygget oven på et bibliotek som futures og afsløre alt dette naturligt.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Min forståelse bliver mere uklar her. I en separat tråd blokerer vi for beskeden og skubber den ind i kanalen, når vi får den. Hvad jeg ikke forstår er, hvorfor vi skal holde fast i trådens håndtag. Jeg ville forvente, at foo.forget ville blokere sig selv og vente, indtil strømmen er tom.

I en telnet-forbindelse til Redis-serveren, send dette:

publish rust awesome

Og du vil se det virker. Tilføjelse af udskriftsudsagn viser, at (for mig) foo.forget sætningen køres, før tråden opstår.

Flere beskeder er vanskeligere. Sender forbruger sig selv for at forhindre, at den genererende side kommer for langt foran den forbrugende side. Dette opnås ved at returnere en anden fremtid fra send ! Vi er nødt til at shuttle det tilbage derfra for at genbruge det til næste iteration af løkken:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Jeg er sikker på, at der vil være flere økosystemer for denne type interoperation, som tiden går. For eksempel kunne futures-cpupool-kassen sandsynligvis udvides til at understøtte en lignende usecase til dette.




  1. Mongoose - RangeError:Maksimal opkaldsstabelstørrelse overskredet

  2. Java/MongoDB-forespørgsel efter dato

  3. Hvad er den bedste måde at bruge Redis i et multi-threaded Rails-miljø? (Puma / Sidekiq)

  4. Hvordan sletter man mange mongodb-samlinger på én gang?