Futures, 任务和线程

正如我们在上一章中看到的,线程提供了一种实现并发的方法。 在本章中,我们看到了另一种使用 async 与 futures 和 streams 实现并发的方法。你可能会想知道为什么要在其中选择一种。答案是:这取决于具体的情况!在许多情况下,选择不是线程 异步,而是线程 异步。

许多操作系统多年来一直提供基于线程的并发模型,许多编程语言也因此支持它们。然而,它们并非没有权衡。在许多操作系统中,每个线程都会占用相当多的内存,并且启动和关闭时会带来一些开销。线程也仅在你的操作系统和硬件支持时才是一种选择!与主流的桌面和移动计算机不同,一些嵌入式系统根本没有操作系统,因此它们也没有线程!

异步模型提供了一组不同且最终互补的权衡。在异步模型中,并发操作不需要自己的线程。相反,它们可以在任务上运行,就像我们在流部分中使用trpl::spawn_task从同步函数启动工作一样。任务类似于线程,但不是由操作系统管理,而是由库级别的代码:运行时管理。

在上一节中,我们看到可以通过使用异步通道和生成一个可以从同步代码调用的异步任务来构建一个Stream。我们也可以用线程来做同样的事情!在清单 17-40 中,我们使用了trpl::spawn_tasktrpl::sleep。在清单 17-41 中,我们用标准库中的thread::spawnthread::sleepAPI 替换了这些,在get_intervals函数中。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, thread, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // This is *not* `trpl::spawn` but `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-41: Using the std::thread APIs instead of the async trpl APIs for the get_intervals function

如果你运行这个,输出是相同的。并且注意,从调用代码的角度来看,这里的变化是多么小!更重要的是,即使我们的一个函数在运行时启动了一个异步任务,而另一个启动了一个操作系统线程,最终的流也没有受到这些差异的影响。

尽管有这些相似之处,但这两种方法的行为却大不相同,尽管在这样一个非常简单的例子中我们可能很难测量出来。我们可以在任何现代个人计算机上生成数百万个异步任务。如果我们尝试用线程来做这件事,我们实际上会耗尽内存!

然而,这些 API 如此相似是有原因的。线程作为一组同步操作的边界;并发可以在线程之间发生。任务作为一组异步操作的边界;并发既可以在任务之间也可以在任务内部发生,因为任务可以在其主体中切换不同的未来。最后,未来是 Rust 中最细粒度的并发单元,每个未来可能代表其他未来的树。运行时——具体来说,其执行器——管理任务,而任务管理未来。在这方面,任务类似于轻量级的、运行时管理的线程,具有来自运行时管理的额外功能,而不是由操作系统管理。

这并不意味着异步任务总是优于线程,就像线程并不总是优于任务一样。

并发使用线程在某些方面比使用async的并发模型更简单。这可以是优点也可以是缺点。线程 somewhat “fire and forget”,它们没有类似未来的原生等效物,因此它们只是运行到完成,除非被操作系统本身中断,否则不会中断。也就是说,它们没有像未来那样对任务内并发的内置支持。Rust 中的线程也没有取消机制——这个主题我们在本章中没有深入讨论,但隐含在我们每次结束一个未来时,其状态都会正确清理的事实中。

这些限制也使得线程比未来(futures)更难以组合。例如,使用线程构建如我们在“构建我们自己的异步抽象”中构建的timeout,或在“组合流”中使用的throttle方法要困难得多。由于未来(futures)是更丰富的数据结构,这意味着它们可以更自然地组合在一起,正如我们所见。

任务然后提供对未来的额外控制,允许你选择在哪里以及如何分组未来的任务。事实证明,线程和任务通常可以很好地一起工作,因为任务(至少在某些运行时中)可以在线程之间移动。直到现在我们还没有提到,但我们在使用的Runtime,包括spawn_blockingspawn_task函数,默认情况下是多线程的!许多运行时使用一种称为工作窃取的方法,根据线程的当前利用率透明地在各线程之间移动任务,以提高系统的整体性能。构建这一点实际上需要线程任务,因此需要未来的任务。

作为默认的思维方式,关于何时使用哪个:

  • 如果工作是非常并行的,比如处理可以分别处理的数据集,线程是更好的选择。
  • 如果工作是非常并发的,例如处理来自许多不同来源的消息,这些消息可能以不同的间隔或不同的速率到达,那么异步是一个更好的选择。

而且,如果你需要一些并行性和并发性的混合,你不必在线程和异步之间做出选择。你可以自由地将它们一起使用,让每个部分发挥其最擅长的作用。例如,列表 17-42 展示了这种混合在实际的 Rust 代码中一个相当常见的例子。

Filename: src/main.rs
extern crate trpl; // for mdbook test

use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    thread::spawn(move || {
        for i in 1..11 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    trpl::run(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}
Listing 17-42: Sending messages with blocking code in a thread and awaiting the messages in an async block

我们首先创建一个异步通道。然后我们启动一个线程,该线程获取通道的发送端的所有权。在线程内,我们发送数字1到10,并在每次发送之间休眠一秒钟。最后,我们运行一个使用异步块传递给trpl::run创建的未来,就像我们在本章中所做的那样。在那个未来中,我们等待那些消息,就像在我们见过的其他消息传递示例中一样。

回到我们在本章开头的例子:你可以想象使用一个专用线程来运行一系列视频编码任务,因为视频编码是计算密集型的,但使用异步通道通知UI这些操作已完成。这种混合的例子比比皆是!

摘要

这不是你在本书中最后一次看到并发:第 21 章的项目将使用本章中的概念,在比这里讨论的较小示例更现实的情况下——并更直接地比较使用线程与使用任务和未来的解决这些问题的样子。

无论是使用线程、使用未来和任务,还是将它们全部结合使用,Rust 都为您提供编写安全、快速、并发代码所需的工具——无论是用于高吞吐量的 Web 服务器还是嵌入式操作系统。

接下来,我们将讨论随着您的 Rust 程序变得更大,如何以惯用的方式建模问题和结构化解决方案。此外,我们还将讨论 Rust 的惯用法与您可能熟悉的面向对象编程的惯用法之间的关系。