将所有内容整合在一起:Future、任务和线程
正如我们在第16章中看到的,线程提供了一种实现并发的方法。在本章中,我们看到了另一种方法:使用异步与未来和流。如果你在想何时选择一种方法而不是另一种,答案是:这取决于具体情境!而且在许多情况下,选择不是线程或异步,而是线程和异步。
许多操作系统几十年来一直提供基于线程的并发模型,许多编程语言也因此支持它们。然而,这些模型并非没有权衡。在许多操作系统中,每个线程都会占用相当多的内存,并且启动和关闭时也会带来一些开销。线程也仅在操作系统和硬件支持的情况下才是一种选择。与主流的桌面和移动计算机不同,一些嵌入式系统根本没有操作系统,因此也没有线程。
异步模型提供了一组不同且最终互补的权衡。在异步模型中,并发操作不需要自己的线程。相反,它们可以在任务上运行,就像我们在流部分中使用trpl::spawn_task
从同步函数启动工作一样。任务类似于线程,但不是由操作系统管理,而是由库级别的代码:运行时管理。
在上一节中,我们看到可以使用异步通道和异步任务来构建一个流,这些任务可以从同步代码中调用。我们也可以用线程做完全相同的事情。在清单 17-40 中,我们使用了 trpl::spawn_task
和 trpl::sleep
。在清单 17-41 中,我们用标准库中的 thread::spawn
和 thread::sleep
API 替换了它们,在 get_intervals
函数中。
extern crate trpl; // required for mdbook test use std::{pin::pin, thread, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); // This is *not* `trpl::spawn` but `std::thread::spawn`! thread::spawn(move || { let mut count = 0; loop { // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`! thread::sleep(Duration::from_millis(1)); count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
std::thread
APIs instead of the async trpl
APIs for the get_intervals
function如果您运行此代码,输出与清单 17-40 完全相同。并且请注意,从调用代码的角度来看,这里的变化是多么小。更重要的是,即使我们的一个函数在运行时启动了一个异步任务,而另一个函数启动了一个操作系统线程,最终生成的流也没有受到这些差异的影响。
尽管它们有相似之处,但这两种方法的行为却大不相同, 尽管在这个非常简单的例子中我们可能很难测量出来。我们 可以在任何现代个人计算机上生成数百万个异步任务。如果我们尝试 用线程来做这件事,我们实际上会耗尽内存!
然而,这些 API 如此相似是有原因的。线程作为一组同步操作的边界;并发可以在线程之间发生。任务作为一组异步操作的边界;并发既可以在任务之间也可以在任务内部发生,因为任务可以在其主体中切换不同的未来。最后,未来是 Rust 中最细粒度的并发单元,每个未来可能代表其他未来的树。运行时——具体来说,其执行器——管理任务,而任务管理未来。在这方面,任务类似于轻量级的、运行时管理的线程,具有来自运行时管理的额外功能,而不是由操作系统管理。
这并不意味着异步任务总是优于线程(反之亦然)。在某些方面,使用线程的并发比使用async
的并发更简单。这可以是优点也可以是缺点。线程 somewhat “fire and forget”;它们没有未来(future)的原生等效物,因此除了被操作系统本身中断外,它们会一直运行到完成。也就是说,它们没有像未来(future)那样的内置任务内并发支持。Rust 中的线程也没有取消机制——我们在本章中没有明确讨论这个主题,但通过每次结束一个未来(future)时其状态都能正确清理这一事实可以推断出来。
这些限制也使得线程比未来更难组合。例如,使用线程构建如我们在本章前面构建的timeout
和throttle
方法这样的辅助工具要困难得多。由于未来是更丰富的数据结构,这意味着它们可以更自然地组合在一起,正如我们所见。
任务,因此,为我们提供了对未来的额外控制,使我们能够选择如何以及在哪里对它们进行分组。事实证明,线程和任务通常配合得非常好,因为任务(至少在某些运行时中)可以在不同线程之间移动。实际上,在底层,我们一直在使用的运行时——包括spawn_blocking
和spawn_task
函数——默认是多线程的!许多运行时使用一种称为工作窃取的方法,根据线程当前的使用情况透明地在不同线程之间移动任务,以提高系统的整体性能。这种方法实际上需要线程和任务,因此也需要未来的概念。
在考虑何时使用哪种方法时,请考虑以下经验法则:
- 如果工作是非常并行的,比如处理可以分别处理的数据集,线程是更好的选择。
- 如果工作是非常并发的,例如处理来自许多不同来源的消息,这些消息可能以不同的间隔或不同的速率到达,那么异步是一个更好的选择。
而且,如果您需要并行性和并发性,您不必在线程和异步之间做出选择。您可以自由地将它们一起使用,让每一种发挥其最擅长的作用。例如,列表 17-42 展示了这种混合在实际 Rust 代码中相当常见的一种情况。
extern crate trpl; // for mdbook test use std::{thread, time::Duration}; fn main() { let (tx, mut rx) = trpl::channel(); thread::spawn(move || { for i in 1..11 { tx.send(i).unwrap(); thread::sleep(Duration::from_secs(1)); } }); trpl::run(async { while let Some(message) = rx.recv().await { println!("{message}"); } }); }
我们首先创建一个异步通道,然后启动一个线程,该线程获取通道的发送端的所有权。在该线程中,我们发送数字1到10,每次发送之间暂停一秒。最后,我们运行一个使用异步块传递给trpl::run
创建的未来,就像我们在本章中所做的那样。在那个未来中,我们等待那些消息,就像在我们见过的其他消息传递示例中一样。
回到我们在本章开头讨论的场景,想象一下使用一个专用线程运行一组视频编码任务(因为视频编码是计算密集型的),但通过异步通道通知UI这些操作已完成。在实际应用中,这类组合的例子数不胜数。
摘要
这不会是你在这本书中最后一次看到并发。在第 21 章的项目中,将会在一个比这里讨论的更现实的情况下应用这些概念,并更直接地比较使用线程与任务解决问题的方法。
无论你选择哪种方法,Rust 都为你提供了编写安全、快速、并发代码所需的工具——无论是用于高吞吐量的 Web 服务器还是嵌入式操作系统。
接下来,我们将讨论随着您的 Rust 程序变得更大,如何以惯用的方式建模问题和结构化解决方案。此外,我们还将讨论 Rust 的惯用法与您可能熟悉的面向对象编程的惯用法之间的关系。