深入探讨异步特性
在本章中,我们以各种方式使用了Future、Stream和StreamExt特质。到目前为止,我们避免深入探讨它们的工作原理或它们如何协同工作,这在大多数日常的Rust工作中是完全可以的。然而,有时你会遇到需要更多了解这些特质的细节,以及Pin类型和Unpin特质的情况。在本节中,我们将深入探讨足够的内容以帮助解决这些情况,但仍将真正深入的探讨留给其他文档。
《Future 特性》
让我们先仔细看看 Future 特性是如何工作的。以下是 Rust 对其的定义:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
那个特征定义包括了一堆新的类型还有一些我们之前没见过的语法,所以让我们一步步地解析这个定义。
首先,Future 的关联类型 Output 表示未来将解析为何种值。
这类似于 Iterator 特性中的 Item 关联类型。
其次,Future 拥有 poll 方法,该方法接受一个特殊的 Pin 引用
作为其 self 参数,并接受一个可变的 Context 类型引用,返回一个 Poll<Self::Output>。我们稍后会详细讨论 Pin 和 Context。现在,让我们先关注方法的返回值,即 Poll 类型:
#![allow(unused)] fn main() { pub enum Poll<T> { Ready(T), Pending, } }
这个 Poll 类型类似于 Option。它有一个包含值的变体 Ready(T),和一个不包含值的变体 Pending。不过,Poll 与 Option 的含义相当不同!Pending 变体表示未来还有工作要做,因此调用者需要稍后再次检查。Ready 变体表示 Future 已完成其工作,并且 T 值可用。
注意:很少需要直接调用 poll,但如果您确实需要调用,请记住,对于大多数未来对象,调用者在将来对象返回 Ready 后不应再次调用 poll。许多未来对象在变为就绪后再次被轮询时会引发恐慌。可以在文档中明确说明可以再次轮询的未来对象。这类似于 Iterator::next 的行为。
当你看到使用 await 的代码时,Rust 会在底层将其编译为调用 poll 的代码。如果你回顾一下清单 17-4,我们在其中打印出单个 URL 解析后的页面标题,Rust 会将其编译成类似这样的代码(虽然不完全相同):
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// But what goes here?
}
}
当我们遇到未来状态仍为Pending时,我们应该怎么办?我们需要某种方法尝试
一次又一次,直到未来最终准备就绪。换句话说,
我们需要一个循环:
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// continue
}
}
}
如果 Rust 将其编译为完全相同的代码,那么每个 await 都会是阻塞的——这与我们的目标完全相反!相反,Rust 确保循环可以将控制权交给某个可以暂停处理此未来的任务,转而处理其他未来的任务,然后稍后再检查此任务。正如我们所见,这个东西是一个异步运行时,而调度和协调工作正是它的主要职责之一。
在“使用消息传递在两个任务之间发送数据”部分,我们描述了等待rx.recv。recv调用返回一个未来对象,等待该未来对象会轮询它。我们提到,当通道关闭时,运行时会暂停未来对象,直到它准备好返回Some(message)或None。通过我们对Future特质的更深入理解,特别是Future::poll,我们可以看到它是如何工作的。当返回Poll::Pending时,运行时知道未来对象尚未准备好。相反,当poll返回Poll::Ready(Some(message))或Poll::Ready(None)时,运行时知道未来对象已经准备好并推进它。
具体的运行时如何做到这一点的细节超出了本书的范围, 但关键是了解未来的的基本机制:运行时会轮询它负责的每个未来, 当未来尚未准备好时,将其重新置于休眠状态。
《Pin 类型和 Unpin 特性》
在清单 17-13 中,我们使用了 trpl::join! 宏来等待三个 future。然而,通常我们有一个集合,比如一个向量,其中包含一些在运行时才确定数量的 future。让我们将清单 17-13 更改为清单 17-23 中的代码,将三个 future 放入一个向量中,并调用 trpl::join_all 函数,这还不能编译。
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
我们将每个 future 放入一个 Box 中,使它们成为 trait 对象,就像我们在第 12 章“从 run 返回错误”部分所做的那样。(我们将在第 18 章详细讨论 trait 对象。)使用 trait 对象让我们可以将这些类型产生的每个匿名 future 视为同一类型,因为它们都实现了 Future trait。
这可能会让你感到惊讶。毕竟,这些异步块都没有返回任何内容,
所以每个都生成一个 Future<Output = ()>。请记住,Future 是一个
特质,编译器为每个异步块创建一个唯一的枚举,即使它们的输出类型相同。就像你不能将两个
不同的手写结构体放入一个 Vec 一样,你也不能混合编译器生成的枚举。
然后我们将未来的集合传递给 trpl::join_all 函数并等待结果。然而,这无法编译;以下是错误消息的相关部分。
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
此错误消息中的注释告诉我们应该使用 pin! 宏来 固定 值,这意味着将它们放在 Pin 类型中,该类型保证值不会在内存中移动。错误消息说需要固定是因为 dyn Future<Output = ()> 需要实现 Unpin 特性,而它目前尚未实现。
trpl::join_all 函数返回一个名为 JoinAll 的结构体。该结构体
泛型于一个类型 F,该类型被约束为实现 Future 特性。
直接使用 await 等待一个未来会隐式地固定该未来。这就是为什么
我们不需要在每个想要等待未来的地点使用 pin!。
然而,我们在这里并不是直接等待一个未来。相反,我们通过将一个未来集合传递给 join_all 函数来构建一个新的未来,JoinAll。join_all 的签名要求集合中的项目类型都实现 Future 特性,而 Box<T> 仅在其包装的 T 是一个实现了 Unpin 特性的未来时才实现 Future。
这有很多需要吸收的内容!为了真正理解它,让我们进一步探讨 Future 特性实际上是如何工作的,特别是关于固定。再次看看 Future 特性的定义:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; // Required method fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
cx 参数及其 Context 类型是运行时实际知道何时检查给定的未来同时仍然保持惰性化的关键。再次,这些细节超出了本章的范围,通常只有在编写自定义 Future 实现时才需要考虑。相反,我们将重点放在 self 的类型上,因为这是第一次看到 self 有类型注解。self 的类型注解与其他函数参数的类型注解类似,但有两个关键区别:
- 它告诉 Rust
self必须是什么类型才能调用该方法。 - 它不能是任意类型。它被限制为实现该方法的类型、该类型的引用或智能指针,或者是一个包裹该类型引用的
Pin。
我们将在第 18 章中看到更多关于这种语法的内容。目前,
知道如果我们想要轮询一个未来以检查它是Pending还是Ready(Output),我们需要一个Pin包装的可变引用就足够了。
Pin 是一个包装器,用于像 &、&mut、Box 和 Rc 这样的指针类型。(技术上,Pin 适用于实现了 Deref 或 DerefMut 特性的类型,但这实际上等同于仅与引用和智能指针一起工作。)Pin 本身不是一个指针,也没有像 Rc 和 Arc 那样的引用计数行为;它纯粹是编译器可以用来强制执行指针使用约束的工具。
回忆 await 是通过调用 poll 来实现的,这开始解释了我们之前看到的错误信息,但那是在 Unpin 的背景下,而不是 Pin。那么 Pin 到底是如何与 Unpin 相关的,为什么 Future 需要将 self 放在 Pin 类型中才能调用 poll?
记住本章前面提到的,未来(future)中的一系列 await 点会被编译成一个状态机,编译器确保该状态机遵循 Rust 的所有正常安全规则,包括借用和所有权。为了使这一点生效,Rust 会查看从一个 await 点到下一个 await 点或异步块结束之间所需的数据。然后在编译后的状态机中创建一个相应的变体。每个变体都会根据需要获取该部分源代码中将要使用的数据的访问权限,无论是通过获取该数据的所有权,还是通过获取其可变或不可变引用。
到目前为止,一切顺利:如果我们对给定异步块中的所有权或引用有任何错误,借用检查器会告诉我们。当我们想要移动与该块对应 的未来对象——比如将其移动到 Vec 中以传递给 join_all——事情就变得更复杂了。
当我们移动一个未来——无论是将其推入数据结构以用作 join_all 的迭代器,还是从函数返回它——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为异步块创建的未来可以在任何给定变体的字段中最终包含对自身的引用,如图 17-4 中的简化图所示。
默认情况下,任何具有指向自身引用的对象在移动时都是不安全的,因为引用总是指向它们所引用内容的实际内存地址(见图17-5)。如果你移动数据结构本身,这些内部引用将仍然指向旧位置。然而,该内存位置现在是无效的。一方面,当你对数据结构进行更改时,其值不会被更新。另一方面——更重要的是——计算机现在可以自由地将该内存用于其他目的!你可能会在稍后读取完全不相关的数据。
理论上,Rust 编译器可以尝试在对象移动时更新每个引用,但这可能会增加大量的性能开销,特别是如果需要更新一整网的引用时。如果我们能够确保相关数据结构 不会在内存中移动,那么我们就不需要更新任何引用。这正是 Rust 的借用检查器的作用:在安全代码中,它防止你移动任何有活动引用的项。
Pin 建立在这一点上,为我们提供了所需的精确保证。当我们通过将指向该值的指针包装在 Pin 中来 固定 一个值时,该值将不能再移动。因此,如果你有 Pin<Box<SomeType>>,你实际上固定的是 SomeType 值,而不是 Box 指针。图 17-6 说明了这一过程。
事实上,Box 指针仍然可以自由移动。记住:我们关心的是确保最终被引用的数据保持在原位。如果指针移动,但其指向的数据 保持在同一位置,如图 17-7 所示,就没有潜在的问题。(作为一个独立的练习,查看类型以及 std::pin 模块的文档,尝试弄清楚如何使用 Pin 包装一个 Box。)关键在于,自引用类型本身不能移动,因为它仍然是固定的。
然而,大多数类型即使在 Pin 指针后面也是完全安全的。只有当项目具有内部引用时,我们才需要考虑固定。像数字和布尔值这样的原始值是安全的,因为它们显然没有任何内部引用。大多数你在 Rust 中通常使用的类型也是如此。例如,你可以随意移动一个 Vec,而无需担心。根据我们迄今为止所见,如果你有一个 Pin<Vec<String>>,即使 Vec<String> 在没有其他引用的情况下总是可以安全移动的,你也必须通过 Pin 提供的安全但受限的 API 来做所有事情。我们需要一种方法来告诉编译器,在这种情况下移动项目是安全的——这就是 Unpin 发挥作用的地方。
Unpin 是一个标记特征,类似于我们在第 16 章中看到的 Send 和 Sync 特征,因此没有自己的功能。标记特征仅存在于告诉编译器在特定上下文中使用实现给定特征的类型是安全的。Unpin 告诉编译器给定类型不需要维持任何关于该值是否可以安全移动的保证。
就像 Send 和 Sync 一样,编译器会自动为所有可以证明安全的类型实现 Unpin。一个特殊情况,再次类似于 Send 和 Sync,是 Unpin 没有 为某个类型实现。这种表示方法是 impl !Unpin for SomeType,其中 SomeType 是一个类型的名字,该类型在使用指向该类型的指针时 确实 需要保持这些保证以确保安全。
换句话说,关于 Pin 和 Unpin 之间的关系有两点需要注意。首先,Unpin 是“正常”情况,而 !Unpin 是特殊情况。其次,一个类型是否实现了 Unpin 或 !Unpin 仅在 使用指向该类型的固定指针(如 Pin<&mut SomeType>)时才重要。
为了具体说明这一点,考虑一个 String:它有一个长度和组成它的 Unicode 字符。我们可以在 Pin 中包装一个 String,如图 17-8 所示。然而,String 自动实现了 Unpin,Rust 中的大多数其他类型也是如此。
因此,我们可以做一些如果 String 实现了 !Unpin 就会非法的事情,例如在内存中的确切相同位置用另一个字符串替换一个字符串,如图 17-9 所示。这不会违反 Pin 合约,因为 String 没有内部引用使其移动不安全。这正是它实现 Unpin 而不是 !Unpin 的原因。
现在我们已经了解了足够的知识,可以理解在清单 17-23 中报告的 join_all 调用的错误。我们最初尝试将由异步块产生的 future 移动到 Vec<Box<dyn Future<Output = ()>>> 中,但正如我们所见,这些 future 可能有内部引用,因此它们不会自动实现 Unpin。一旦我们固定了它们,就可以将生成的 Pin 类型传递给 Vec,确信 future 中的底层数据 不会 被移动。清单 17-24 显示了如何通过在定义每个 future 时调用 pin! 宏并调整 trait 对象类型来修复代码。
extern crate trpl; // required for mdbook test use std::pin::{Pin, pin}; // --snip-- use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
这个示例现在可以编译和运行,我们可以在运行时向向量中添加或删除 future 并将它们全部 join。
Pin 和 Unpin 主要对于构建底层库或构建运行时本身非常重要,而不是用于日常的 Rust 代码。当你在错误消息中看到这些特征时,现在你将更好地了解如何修复你的代码!
注意:这种 Pin 和 Unpin 的组合使得在 Rust 中安全实现一类复杂的类型成为可能,这些类型由于自引用而通常会变得具有挑战性。需要 Pin 的类型在今天的异步 Rust 中最为常见,但偶尔你也会在其他上下文中看到它们。
Pin 和 Unpin 的具体工作方式以及它们需要遵守的规则,在 std::pin 的 API 文档中有详细的介绍,所以如果你有兴趣深入了解,那是一个很好的起点。
如果你想更详细地了解底层的工作原理,请参阅第2章和第4章的Rust中的异步编程。
流特质
现在你对 Future、Pin 和 Unpin 特性有了更深的了解,我们可以将注意力转向 Stream 特性。正如你在本章前面所学,流类似于异步迭代器。然而,与 Iterator 和 Future 不同,Stream 在编写本文时标准库中没有定义,但 futures crate 中有一个非常常见的定义,被整个生态系统广泛使用。
让我们在查看 Stream 特性如何将它们合并之前,先回顾一下 Iterator 和 Future 特性的定义。从 Iterator,我们有序列的概念:其 next 方法提供一个 Option<Self::Item>。从 Future,我们有随时间变化的准备状态的概念:其 poll 方法提供一个 Poll<Self::Output>。为了表示随时间变得可用的项目序列,我们定义了一个 Stream 特性,将这些特性结合在一起:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; } }
Stream 特性定义了一个关联类型 Item,用于表示流生成的项的类型。这类似于 Iterator,其中可能有零到多个项,而不像 Future,其中总是有一个单一的 Output,即使它是单元类型 ()。
Stream 还定义了一个方法来获取这些项。我们称之为 poll_next,以明确它以与 Future::poll 相同的方式进行轮询,并以与 Iterator::next 相同的方式生成一系列项。其返回类型将 Poll 与 Option 结合在一起。外部类型是 Poll,因为它需要像未来一样检查就绪状态。内部类型是 Option,因为它需要像迭代器一样指示是否还有更多消息。
与这个定义非常相似的内容很可能会成为 Rust 标准库的一部分。在此期间,它已经是大多数运行时工具包的一部分,因此你可以依赖它,接下来我们讨论的所有内容通常都适用!
在“Streams: Futures in Sequence”部分的示例中,我们并没有使用poll_next 或 Stream,而是使用了next和StreamExt。当然,我们可以通过手动编写自己的Stream状态机来直接使用poll_next API,就像我们可以通过它们的poll方法直接处理未来一样。然而,使用await要方便得多,而StreamExt特质提供了next方法,使我们能够做到这一点:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>; } trait StreamExt: Stream { async fn next(&mut self) -> Option<Self::Item> where Self: Unpin; // other methods... } }
注意:本章前面实际使用的定义与此略有不同,因为它支持那些还不支持在特质中使用异步函数的 Rust 版本。因此,它看起来像这样:
fn next(&mut self) -> Next<'_, Self> where Self: Unpin;
那个 Next 类型是一个 struct,它实现了 Future 并允许我们用 Next<'_, Self> 命名对 self 的引用的生命周期,以便 await 可以与这个方法一起工作。
StreamExt 特性也是所有可用于流的有趣方法的所在地。StreamExt 会自动为每个实现了 Stream 的类型实现,但这些特性是单独定义的,以便社区可以在不影响基础特性的情况下迭代便利的 API。
在 trpl crate 中使用的 StreamExt 版本中,该 trait 不仅定义了 next 方法,还提供了一个默认的 next 实现,该实现正确处理了调用 Stream::poll_next 的细节。这意味着即使你需要编写自己的流数据类型,你也只需实现 Stream,然后任何使用你数据类型的人就可以自动使用 StreamExt 及其方法。
这就是我们对这些特质的底层细节的所有介绍。为了总结,让我们考虑一下未来(包括流)、任务和线程是如何协同工作的!