使用消息传递在线程之间传输数据

一个越来越受欢迎的保证安全并发的方法是消息传递,其中线程或参与者通过发送包含数据的消息来通信。这里有一个来自Go语言文档的口号: “不要通过共享内存来通信;相反,通过通信来共享内存。”

为了实现消息发送的并发,Rust 的标准库提供了一个 通道 的实现。通道是一种通用的编程概念,通过它数据可以从一个线程发送到另一个线程。

你可以想象编程中的通道就像是一条有方向的水流,比如小溪或河流。如果你把像橡皮鸭这样的东西放入河中,它会顺流而下到达水道的尽头。

一个通道有两个部分:一个发送端和一个接收端。发送端是你将橡皮鸭放入河中的上游位置,而接收端是橡皮鸭最终到达的下游位置。代码的一部分调用发送端的方法来发送你想要发送的数据,而另一部分则检查接收端是否有到达的消息。如果发送端或接收端的任一半被丢弃,通道就被认为是关闭的。

在这里,我们将编写一个程序,其中一个线程生成值并通过通道发送,另一个线程接收这些值并打印出来。我们将使用通道在线程之间发送简单的值来说明这一特性。一旦你熟悉了这种技术,你就可以使用通道来实现任何需要相互通信的线程,例如聊天系统或多个线程执行计算的一部分并将这些部分发送到一个汇总结果的线程的系统。

首先,在清单 16-6 中,我们将创建一个通道但不对其进行任何操作。 请注意,这还不能编译,因为 Rust 无法确定我们希望通过通道发送什么类型的值。

文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

列表 16-6:创建一个通道并将两个半部分分配给txrx

我们使用 mpsc::channel 函数创建一个新的通道;mpsc 代表 多个生产者,单个消费者。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个 发送 端来生成值,但只有一个 接收 端来消费这些值。想象多条溪流汇入一条大河:通过任何一条溪流发送的所有东西最终都会汇入同一条河。我们先从一个生产者开始,但当这个示例运行起来后,我们将添加多个生产者。

mpsc::channel 函数返回一个元组,其中第一个元素是发送端——发射器——第二个元素是接收端——接收器。在许多领域中,txrx 传统上分别用于表示 发射器接收器,因此我们这样命名我们的变量以指示每一端。我们使用一个带有模式的 let 语句来解构元组;我们将在第 19 章讨论 let 语句中的模式使用和解构。目前,要知道以这种方式使用 let 语句是一种方便的方法,可以提取 mpsc::channel 返回的元组的各个部分。

让我们将发送端移到一个新创建的线程中,并让它发送一个字符串,这样新创建的线程就可以与主线程通信,如清单 16-7 所示。这就像在河的上游放一个橡皮鸭,或者从一个线程向另一个线程发送聊天消息。

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
Listing 16-7: Moving tx to a spawned thread and sending “hi”

再次,我们使用 thread::spawn 创建一个新线程,然后使用 movetx 移动到闭包中,使新线程拥有 tx。新线程需要拥有发送方才能通过通道发送消息。发送方有一个 send 方法,该方法接受我们想要发送的值。send 方法返回一个 Result<T, E> 类型,因此如果接收方已经被释放且没有地方可以发送值,发送操作将返回一个错误。在这个例子中,我们调用 unwrap 在发生错误时引发恐慌。但在实际应用中,我们会正确处理它:返回第 9 章复习正确的错误处理策略。

在清单 16-8 中,我们将在主线程中从接收者获取值。这就像从河的尽头水中取出橡皮鸭或接收聊天消息。

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-8: Receiving the value “hi” in the main thread and printing it

接收者有两个有用的方法:recvtry_recv。我们使用的是 recv,即 接收 的缩写,它会阻塞主线程的执行并等待通道发送值。一旦值被发送,recv 将在 Result<T, E> 中返回它。当发送者关闭时,recv 将返回一个错误,以表示不会再有值发送。

try_recv 方法不会阻塞,而是会立即返回一个 Result<T, E>:如果消息可用,则返回一个包含消息的 Ok 值;如果没有消息,则返回一个 Err 值。使用 try_recv 很有用,如果此线程在等待消息时有其他工作要做:我们可以编写一个循环,每隔一段时间调用一次 try_recv,如果有消息则处理消息,否则在再次检查之前稍作其他工作。

我们在本例中使用了recv以保持简单;主线程除了等待消息外没有其他工作要做,因此阻塞主线程是合适的。

当我们运行列表 16-8 中的代码时,我们将看到从主线程打印出的值:

Got: hi

完美!

通道和所有权转移

所有权规则在消息传递中起着至关重要的作用,因为它们帮助你编写安全的并发代码。防止并发编程中的错误是贯穿整个 Rust 程序思考所有权的优势。让我们做一个实验来展示通道和所有权如何协同工作以防止问题:我们将尝试在通过通道发送 val之后 在派生的线程中使用它。尝试编译清单 16-9 中的代码,看看为什么这段代码是不允许的:

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-9: Attempting to use val after we’ve sent it down the channel

在这里,我们尝试在通过 tx.sendval 发送到通道后打印它。 允许这样做是一个坏主意:一旦值被发送到另一个线程,该线程可能在我们再次尝试使用该值之前对其进行修改或丢弃。潜在地,其他线程的修改可能会由于不一致或不存在的数据导致错误或意外结果。然而,如果我们尝试编译清单 16-9 中的代码,Rust 会给我们一个错误:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

我们的并发错误导致了一个编译时错误。send 函数获取其参数的所有权,当值被移动时,接收者获得了它的所有权。这阻止了我们在发送值后再次意外使用该值;所有权系统会检查一切是否正常。

发送多个值并观察接收者等待

列表 16-8 中的代码编译并运行了,但它没有清楚地显示两个独立的线程是通过通道相互通信的。在列表 16-10 中,我们进行了一些修改,以证明列表 16-8 中的代码是并发运行的:现在派生的线程将发送多条消息,并在每条消息之间暂停一秒。

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Listing 16-10: Sending multiple messages and pausing between each

这次,生成的线程有一个字符串向量,我们希望将其发送到主线程。我们遍历这些字符串,逐个发送,并通过调用带有1秒Duration值的thread::sleep函数在每次发送之间暂停。

在主线程中,我们不再显式调用recv函数: 而是将rx视为一个迭代器。对于接收到的每个值,我们都会 打印它。当通道关闭时,迭代将结束。

当运行清单 16-10 中的代码时,您应该看到以下输出,每行之间有 1 秒的暂停:

Got: hi
Got: from
Got: the
Got: thread

因为我们在主线程的for循环中没有任何暂停或延迟的代码,我们可以判断主线程正在等待从派生线程接收值。

通过克隆发送者创建多个生产者

Earlier we mentioned that mpsc was an acronym for 多个生产者,单个消费者. Let’s put mpsc to use and expand the code in Listing 16-10 to create multiple threads that all send values to the same receiver. We can do so by cloning the transmitter, as shown in Listing 16-11:

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}
Listing 16-11: Sending multiple messages from multiple producers

这次,在我们创建第一个派生线程之前,我们对发送者调用clone。这将给我们一个新的发送者,我们可以将其传递给第一个派生线程。我们将原始的发送者传递给第二个派生线程。这使我们有两个线程,每个线程向一个接收者发送不同的消息。

当你运行代码时,你的输出应该看起来像这样:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

你可能会看到不同的顺序,这取决于你的系统。这正是使并发既有趣又困难的原因。如果你尝试使用thread::sleep,在不同的线程中给它不同的值,每次运行都会更加不确定,并且每次都会产生不同的输出。

现在我们已经了解了通道的工作原理,让我们来看看另一种并发方法。