Message Passing

Um Daten zwischen Threads zu teilen gibt es verschiedene Möglichkeiten. Das mehr und mehr populäre Message Passing ist der einzige Weg, den wir in diesem Kurs besprechen werden. Dabei werden nicht Zugriffsschutzmechanismen auf geteilten Speicherbereich verwendet, sondern Daten werden zwischen Threads verschickt. Rust verwendet zu diesem Zweck Einweg-Kanäle, die nur das Verschicken von Daten von einem oder mehreren Versendern zu einem Empfänger erlauben. Dementsprechend lebt die Funktion zum erzeugen von Kanälen channel im Modul std::sync::mpsc, denn mpsc steht für multiple producer, single consumer. Die Funktion channel gibt einen Versender vom Typ std::sync::mpsc::Sender<T> und einen Empfänger vom Typ std::sync::mpsc::Receiver<T> zurück. Der Empfänger kann geklont werden, der Empfänger kann nur verschoben werden. Der generische Typ T ist der Typ der Instanz, die versendet wird.

use std::thread;
use std::sync::{mpsc::{self, SendError, RecvError}};
fn main() -> Result<(), SendError<i32>> {
    let n = 5;
    let (send, recv) = mpsc::channel();
    tx.send(n)?;
    let print_n = move || -> Result<(), RecvError> {
        let n = recv.recv()?;
        println!("{n}");
        Ok(())
    };
    thread::spawn(print_n).join().unwrap();
    Ok(())
}

Im obigen Schnipsel haben wir eine Nachricht an den separat gestarteten Thread geschickt, der auf die Nachricht wartet. Die Methode recv blockiert den aktuellen Thread, bis eine Nachricht angekommen ist. Wenn man überprüfen möchte, ob eine Nachricht da ist, ohne den aktuellen Thread zu blockieren, bietet sich die Verwendung von try_recv an. Die send- und recv-Calls schlagen fehl und geben Result::Err zurück, wenn das entpsrechende Gegenstück nicht mehr verfügbar ist. Andersrum ist es ebenfalls möglich, aus einem oder mehreren Threads Nachrichten an den Hauptthread zu schicken. Dazu iterieren wir über den Empfängernachrichten mit recv.iter(). Das ist ein blockierender Aufruf, der über alle gesendeten Nachrichtenpakete iteriert und auf neue Nachrichten wartet, solange noch ein Sender existiert, der in Zukunft Pakete senden könnte. Daher verwenden wir im folgenden Schnipsel die Funktion drop, die nichts tut und ihre Parameter per Move bekommt. Dadurch gibt es keine aktiven Sender mehr, wenn wir über Receiver<T>::iter() iterieren. Ansonsten würden wir eine Endlosschleife produzieren.

use std::thread;
use std::sync::{mpsc::{self, SendError, RecvError}};

fn main() -> Result<(), SendError<i32>> {
    let (send, recv) = mpsc::channel();
    for i in 0..3 {
        let send_i = send.clone();  // clone for thread i
        let print_i = move || -> Result<(), SendError<i32>> {
            send_i.send(i)?;
            Ok(())
        };
        thread::spawn(print_i);
    }
    drop(send);
    for x in recv.iter() {
        println!("{x}");
    }
    Ok(())
}

Analog zu try_recv gibt es auch die nicht-blockierende Variante Receiver<T>::iter() namens Receiver<T>::try_iter.