Redis
 sql >> Datenbank >  >> NoSQL >> Redis

Wie implementiert man einen Futures-Stream für einen blockierenden Aufruf mit futures.rs und Redis PubSub?

Schwerer Vorbehalt Ich habe diese Bibliothek noch nie zuvor verwendet, und mein Wissen auf niedrigem Niveau über einige der Konzepte ist ein bisschen … mangelhaft. Meistens lese ich das Tutorial durch. Ich bin mir ziemlich sicher, dass jeder, der asynchrone Arbeit geleistet hat, dies lesen und lachen wird, aber es kann ein nützlicher Ausgangspunkt für andere Leute sein. Vorsichtsmaßnahme!

Beginnen wir mit etwas Einfacherem und demonstrieren, wie ein Stream funktioniert funktioniert. Wir können einen Iterator von Result konvertieren s in einen Stream:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Dies zeigt uns eine Möglichkeit, den Stream zu konsumieren. Wir verwenden and_then um etwas mit jeder Nutzlast zu tun (hier einfach ausdrucken) und dann for_each um den Stream umzuwandeln zurück in eine Future . Wir können dann die Zukunft ausführen, indem wir den seltsam benannten forget aufrufen Methode.

Als Nächstes binden Sie die Redis-Bibliothek in den Mix ein und verarbeiten nur eine Nachricht. Da die get_message() -Methode blockiert, müssen wir einige Threads in die Mischung einführen. Es ist keine gute Idee, große Mengen an Arbeit in dieser Art von asynchronem System durchzuführen, da alles andere blockiert wird. Zum Beispiel:

Sofern nicht anders vereinbart, sollte sichergestellt werden, dass Implementierungen dieser Funktion sehr schnell abgeschlossen werden .

In einer idealen Welt würde die Redis-Kiste auf einer Bibliothek wie Futures aufgebaut und all dies nativ offenlegen.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Mein Verständnis wird hier schwammiger. In einem separaten Thread blockieren wir die Nachricht und schieben sie in den Kanal, wenn wir sie erhalten. Was ich nicht verstehe, ist, warum wir den Griff des Threads festhalten müssen. Ich würde erwarten, dass foo.forget würde sich selbst blockieren und warten, bis der Stream leer ist.

Senden Sie in einer Telnet-Verbindung zum Redis-Server Folgendes:

publish rust awesome

Und Sie werden sehen, es funktioniert. Das Hinzufügen von print-Anweisungen zeigt, dass (für mich) das foo.forget -Anweisung wird ausgeführt, bevor der Thread erzeugt wird.

Mehrere Nachrichten sind schwieriger. Der Sender verbraucht sich selbst, um zu verhindern, dass die erzeugende Seite der verbrauchenden Seite zu weit voraus ist. Dies wird erreicht, indem ein weiteres Future von send zurückgegeben wird ! Wir müssen es dort wieder herausholen, um es für die nächste Iteration der Schleife wiederzuverwenden:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Ich bin mir sicher, dass es im Laufe der Zeit mehr Ökosysteme für diese Art der Zusammenarbeit geben wird. Zum Beispiel könnte die Futures-Cpupool-Kiste wahrscheinlich erweitert werden, um einen ähnlichen Anwendungsfall wie diesen zu unterstützen.