深入探讨异步特性
在本章中,我们以各种方式使用了Future
、Pin
、Unpin
、Stream
和StreamExt
特质。到目前为止,我们避免深入探讨它们的工作原理或它们如何协同工作,这在大多数日常的Rust工作中是完全可以的。然而,有时你会遇到需要了解这些细节的情况。在本节中,我们将深入探讨足够的内容以帮助解决这些情况,但仍将更深入的探讨留给其他文档。
《Future
特性》
让我们先仔细看看 Future
特性是如何工作的。以下是 Rust 对其的定义:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
那个特征定义包括了一堆新的类型还有一些我们之前没见过的语法,所以让我们一步步地解析这个定义。
首先,Future
的关联类型 Output
表示未来将解析为何种值。
这类似于 Iterator
特性中的 Item
关联类型。
其次,Future
还具有 poll
方法,该方法接受一个特殊的 Pin
引用作为其 self
参数和一个可变的 Context
类型引用,
并返回一个 Poll<Self::Output>
。我们稍后会详细讨论 Pin
和
Context
。现在,让我们先关注该方法的返回值,
即 Poll
类型:
#![allow(unused)] fn main() { enum Poll<T> { Ready(T), Pending, } }
这个 Poll
类型类似于 Option
。它有一个包含值的变体 Ready(T)
,以及一个不包含值的变体 Pending
。不过,Poll
与 Option
的含义大不相同!Pending
变体表示未来还有工作要做,因此调用者需要稍后再次检查。Ready
变体表示未来已经完成了其工作,T
值可用。
注意:对于大多数 future,调用者不应在 future 返回 Ready
后再次调用 poll
。许多 future 在变为就绪状态后再次被轮询时会 panic。可以在文档中明确说明可以安全再次轮询的 future。这类似于 Iterator::next
的行为。
当你看到使用 await
的代码时,Rust 会在底层将其编译为调用 poll
的代码。如果你回顾一下清单 17-4,我们在其中打印出单个 URL 解析后的页面标题,Rust 会将其编译成类似这样的代码(虽然不完全相同):
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// But what goes here?
}
}
当我们遇到未来状态仍为Pending
时,我们应该怎么办?我们需要某种方法尝试
一次又一次,直到未来最终准备就绪。换句话说,
我们需要一个循环:
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// continue
}
}
}
如果 Rust 将其编译为完全相同的代码,那么每个 await
都会是阻塞的——这与我们的目标完全相反!相反,Rust 确保循环可以将控制权交给某个可以暂停此未来的处理以处理其他未来,然后稍后再检查此未来的东西。正如我们所见,这个东西是一个异步运行时,而这种调度和协调工作是它的主要职责之一。
在本章前面,我们描述了等待rx.recv
。recv
调用
返回一个未来对象,等待该未来对象会轮询它。我们提到,当通道关闭时,运行时会
暂停该未来对象,直到它准备好返回Some(message)
或None
。通过我们对Future
特质的更深入理解,特别是Future::poll
,我们可以看到它是如何工作的。当返回Poll::Pending
时,运行时知道
未来对象尚未准备好。相反,当poll
返回
Poll::Ready(Some(message))
或Poll::Ready(None)
时,运行时知道未来对象已准备好并推进它。
具体的运行时如何做到这一点的细节超出了本书的范围, 但关键是了解未来的的基本机制:运行时会轮询它负责的每个未来, 当未来尚未准备好时,将其重新置于休眠状态。
《Pin
和 Unpin
特性》
当我们介绍固定(pinning)的概念时,在清单 17-16 中遇到了一个非常棘手的错误信息。以下是相关部分:
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
这个错误信息不仅告诉我们需要固定这些值,还解释了为什么需要固定。`trpl::join_all
` 函数返回一个名为 `JoinAll
` 的结构体。该结构体泛型于类型 `F
`,该类型被约束为实现 `Future
` 特性。直接使用 `await
` 等待一个未来会隐式地固定该未来。这就是为什么我们不需要在每个想要等待未来的代码中使用 `pin!
`。
然而,我们在这里并不是直接等待一个未来。相反,我们通过将一个未来集合传递给 join_all
函数来构建一个新的未来,JoinAll
。join_all
的签名要求集合中的项目类型都实现 Future
特性,而 Box<T>
仅在其包装的 T
是一个实现了 Unpin
特性的未来时才实现 Future
。
这有很多需要吸收的内容!为了真正理解它,让我们进一步探讨 Future
特性实际上是如何工作的,特别是关于 固定。
再次查看 Future
特性的定义:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; // Required method fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
cx
参数及其 Context
类型是运行时实际上知道何时检查给定的未来同时仍然保持惰性化的关键。再次,这些工作原理的细节超出了本章的范围,通常只有在编写自定义 Future
实现时才需要考虑这一点。我们将重点放在 self
的类型上,因为这是第一次看到 self
有类型注解的方法。self
的类型注解与其他函数参数的类型注解类似,但有两个关键区别:
- 它告诉 Rust
self
必须是什么类型才能调用该方法。 - 它不能是任意类型。它被限制为实现该方法的类型、该类型的引用或智能指针,或者是一个包裹该类型引用的
Pin
。
我们将在第 18 章中看到更多关于这种语法的内容。目前,
知道如果我们想要轮询一个未来以检查它是Pending
还是Ready(Output)
,我们需要一个Pin
包装的可变引用就足够了。
Pin
是一个包装器,用于像 &
、&mut
、Box
和 Rc
这样的指针类型。(技术上,Pin
适用于实现了 Deref
或 DerefMut
特性的类型,但这实际上等同于仅与指针一起工作。)Pin
本身不是一个指针,也没有像 Rc
和 Arc
那样具有引用计数的行为;它纯粹是编译器可以用来强制执行指针使用约束的工具。
回忆 await
是通过调用 poll
来实现的,这开始解释了我们之前看到的错误信息,但那是在 Unpin
的背景下,而不是 Pin
。那么 Pin
到底是如何与 Unpin
相关的,为什么 Future
需要将 self
放在 Pin
类型中才能调用 poll
?
记住本章前面提到的,在一个 future 中的一系列 await 点会被编译成一个状态机,编译器确保该状态机遵循 Rust 的所有正常安全规则,包括借用和所有权。为了使这一点生效,Rust 会查看从一个 await 点到下一个 await 点或 async 块结束之间需要哪些数据。然后在编译后的状态机中创建相应的变体。每个变体都会根据需要获取该部分源代码中将要使用的数据的访问权限,无论是通过获取该数据的所有权,还是通过获取其可变或不可变引用。
到目前为止,一切顺利:如果我们对给定异步块中的所有权或引用有任何错误,借用检查器会告诉我们。当我们想要移动与该块对应 的未来对象——比如将其移动到 Vec
中以传递给 join_all
——事情就变得更复杂了。
当我们移动一个未来——无论是将其推入数据结构以用作 join_all
的迭代器,还是从函数返回它——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为异步块创建的未来可以在任何给定变体的字段中最终包含对自身的引用,如图 17-4 中的简化图所示。
默认情况下,任何具有指向自身引用的对象在移动时都是不安全的,因为引用总是指向它们所引用内容的实际内存地址(见图17-5)。如果你移动数据结构本身,这些内部引用将仍然指向旧位置。然而,该内存位置现在是无效的。一方面,当你对数据结构进行更改时,其值不会被更新。另一方面——更重要的是——计算机现在可以自由地将该内存用于其他目的!你可能会在稍后读取完全不相关的数据。
理论上,Rust 编译器可以尝试在对象被移动时更新每个引用,但这可能会增加大量的性能开销,特别是当需要更新一整网的引用时。如果我们能够确保相关数据结构 不会在内存中移动,我们就不必更新任何引用。这正是 Rust 的借用检查器所要求的:在安全代码中,它会阻止你移动任何有活动引用的项。
Pin
建立在这一点上,为我们提供了所需的精确保证。当我们通过将指向该值的指针包装在 Pin
中来 固定 一个值时,该值将不能再移动。因此,如果你有 Pin<Box<SomeType>>
,你实际上固定的是 SomeType
值,而不是 Box
指针。图 17-6 说明了这一过程。
事实上,Box
指针仍然可以自由移动。记住:我们关心的是确保最终被引用的数据保持在原位。如果指针移动了,但其所指向的数据仍在同一位置,如图 17-7 所示,就没有潜在的问题。作为一个独立的练习,查看类型以及 std::pin
模块的文档,尝试弄清楚如何使用 Pin
包装一个 Box
。) 关键在于,自引用类型本身不能移动,因为它仍然是固定的。
然而,大多数类型即使在 Pin
指针后面也是完全安全的。只有当项目具有内部引用时,我们才需要考虑固定。像数字和布尔值这样的原始值显然是安全的,因为它们显然没有任何内部引用,所以它们显然是安全的。大多数你在 Rust 中通常使用的类型也是如此。例如,你可以随意移动一个 Vec
,而无需担心。根据我们目前所见,如果你有一个 Pin<Vec<String>>
,即使 Vec<String>
在没有其他引用的情况下总是可以安全移动,你也必须通过 Pin
提供的安全但限制性的 API 来做所有事情。我们需要一种方法来告诉编译器,在这种情况下移动项目是安全的——这就是 Unpin
发挥作用的地方。
Unpin
是一个标记特征,类似于我们在第 16 章中看到的 Send
和 Sync
特征,因此没有自己的功能。标记特征仅存在于告诉编译器在特定上下文中使用实现给定特征的类型是安全的。Unpin
告诉编译器给定类型不需要维持任何关于该值是否可以安全移动的保证。
就像 Send
和 Sync
一样,编译器会自动为所有可以证明安全的类型实现 Unpin
。一个特殊情况,再次类似于 Send
和 Sync
,是 Unpin
没有 为某个类型实现。这种表示方法是 impl !Unpin for SomeType
,其中 SomeType
是一个类型的名字,该类型在使用指向该类型的指针时 确实 需要保持这些保证以确保安全。
换句话说,关于 Pin
和 Unpin
之间的关系有两点需要注意。首先,Unpin
是“正常”情况,而 !Unpin
是特殊情况。其次,一个类型是否实现了 Unpin
或 !Unpin
仅在 使用指向该类型的固定指针(如 Pin<&mut SomeType>
)时才重要。
为了具体说明这一点,考虑一个 String
:它有一个长度和组成它的 Unicode 字符。我们可以在 Pin
中包装一个 String
,如图 17-8 所示。然而,String
自动实现了 Unpin
,Rust 中的大多数其他类型也是如此。
因此,我们可以做一些如果 String
实现了 !Unpin
就会非法的事情,比如在内存中的确切相同位置用另一个字符串替换一个字符串,如图 17-9 所示。这不会违反 Pin
合约,因为 String
没有内部引用使其移动不安全!这正是它实现 Unpin
而不是 !Unpin
的原因。
现在我们已经了解了足够的知识,可以理解在清单 17-17 中报告的 join_all
调用的错误。我们最初尝试将由 async 块生成的 future 移动到 Vec<Box<dyn Future<Output = ()>>>
中,但正如我们所见,这些 future 可能有内部引用,因此它们不实现 Unpin
。它们需要被固定,然后我们可以将 Pin
类型传递给 Vec
,确信 future 中的底层数据 不会 被移动。
Pin
和 Unpin
主要对于构建底层库或构建运行时本身非常重要,而不是用于日常的 Rust 代码。当你在错误消息中看到这些特征时,现在你将更好地了解如何修复你的代码!
注意:这种 Pin
和 Unpin
的组合使得在 Rust 中安全实现一类复杂的类型成为可能,这些类型由于自引用而通常会变得具有挑战性。需要 Pin
的类型在今天的异步 Rust 中最为常见,但偶尔你也会在其他上下文中看到它们。
Pin
和 Unpin
的具体工作方式以及它们需要遵守的规则,在 std::pin
的 API 文档中有详细的介绍,所以如果你有兴趣深入了解,那是一个很好的起点。
如果你想更详细地了解底层的工作原理,请参阅第2章和第4章的Rust中的异步编程。
流特质
现在你对 Future
、Pin
和 Unpin
特性有了更深的了解,我们可以将注意力转向 Stream
特性。正如你在本章前面所学,流类似于异步迭代器。然而,与 Iterator
和 Future
不同,Stream
在编写本文时标准库中没有定义,但 futures
crate 中有一个非常常见的定义,被整个生态系统广泛使用。
让我们在查看 Stream
特性如何将它们合并之前,先回顾一下 Iterator
和 Future
特性的定义。从 Iterator
,我们有序列的概念:其 next
方法提供一个 Option<Self::Item>
。从 Future
,我们有随时间变化的准备状态的概念:其 poll
方法提供一个 Poll<Self::Output>
。为了表示随时间变得可用的项目序列,我们定义了一个 Stream
特性,将这些特性结合在一起:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; } }
Stream
特性定义了一个关联类型 Item
,用于表示流生成的项的类型。这类似于 Iterator
,其中可能有零到多个项,而不像 Future
,其中总是有一个单一的 Output
,即使它是单元类型 ()
。
Stream
还定义了一个方法来获取这些项。我们称之为 poll_next
,以明确它以与 Future::poll
相同的方式进行轮询,并以与 Iterator::next
相同的方式生成一系列项。其返回类型将 Poll
与 Option
结合在一起。外部类型是 Poll
,因为它需要像未来一样检查就绪状态。内部类型是 Option
,因为它需要像迭代器一样指示是否还有更多消息。
与这个定义非常相似的内容很可能会成为 Rust 标准库的一部分。在此期间,它已经是大多数运行时工具包的一部分,因此你可以依赖它,接下来我们讨论的所有内容通常都适用!
在我们之前在流处理部分看到的例子中,我们并没有使用 poll_next
或 Stream
,而是使用了 next
和 StreamExt
。我们当然可以 通过手动编写自己的 Stream
状态机来直接使用 poll_next
API,就像我们可以 通过它们的 poll
方法直接处理未来对象一样。然而,使用 await
要方便得多,而 StreamExt
特性提供了 next
方法,使我们能够做到这一点:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>; } trait StreamExt: Stream { async fn next(&mut self) -> Option<Self::Item> where Self: Unpin; // other methods... } }
注意:本章前面实际使用的定义与此略有不同,因为它支持那些还不支持在特质中使用异步函数的 Rust 版本。因此,它看起来像这样:
fn next(&mut self) -> Next<'_, Self> where Self: Unpin;
那个 Next
类型是一个 struct
,它实现了 Future
并允许我们用 Next<'_, Self>
命名对 self
的引用的生命周期,以便 await
可以与这个方法一起工作。
StreamExt
特性也是所有可用于流的有趣方法的所在地。StreamExt
会自动为每个实现了 Stream
的类型实现,但这些特性是单独定义的,以便社区可以在不影响基础特性的情况下迭代便利的 API。
在 trpl
crate 中使用的 StreamExt
版本中,该 trait 不仅定义了 next
方法,还提供了一个默认的 next
实现,该实现正确处理了调用 Stream::poll_next
的细节。这意味着即使你需要编写自己的流数据类型,你也只需实现 Stream
,然后任何使用你数据类型的人就可以自动使用 StreamExt
及其方法。
这就是我们对这些特质的底层细节的所有介绍。为了总结,让我们考虑一下未来(包括流)、任务和线程是如何协同工作的!