深入探讨异步特性

在本章中,我们以各种方式使用了FuturePinUnpinStreamStreamExt特质。到目前为止,我们避免深入探讨它们的工作原理或它们如何协同工作,这在大多数日常的Rust工作中是完全可以的。然而,有时你会遇到需要了解这些细节的情况。在本节中,我们将深入探讨足够的内容以帮助解决这些情况,但仍将更深入的探讨留给其他文档。

Future 特性》

让我们先仔细看看 Future 特性是如何工作的。以下是 Rust 对其的定义:

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}

那个特征定义包括了一堆新的类型还有一些我们之前没见过的语法,所以让我们一步步地解析这个定义。

首先,Future 的关联类型 Output 表示未来将解析为何种值。 这类似于 Iterator 特性中的 Item 关联类型。 其次,Future 还具有 poll 方法,该方法接受一个特殊的 Pin 引用作为其 self 参数和一个可变的 Context 类型引用, 并返回一个 Poll<Self::Output>。我们稍后会详细讨论 PinContext。现在,让我们先关注该方法的返回值, 即 Poll 类型:

#![allow(unused)]
fn main() {
enum Poll<T> {
    Ready(T),
    Pending,
}
}

这个 Poll 类型类似于 Option。它有一个包含值的变体 Ready(T),以及一个不包含值的变体 Pending。不过,PollOption 的含义大不相同!Pending 变体表示未来还有工作要做,因此调用者需要稍后再次检查。Ready 变体表示未来已经完成了其工作,T 值可用。

注意:对于大多数 future,调用者不应在 future 返回 Ready 后再次调用 poll。许多 future 在变为就绪状态后再次被轮询时会 panic。可以在文档中明确说明可以安全再次轮询的 future。这类似于 Iterator::next 的行为。

当你看到使用 await 的代码时,Rust 会在底层将其编译为调用 poll 的代码。如果你回顾一下清单 17-4,我们在其中打印出单个 URL 解析后的页面标题,Rust 会将其编译成类似这样的代码(虽然不完全相同):

match page_title(url).poll() {
    Ready(page_title) => match page_title {
        Some(title) => println!("The title for {url} was {title}"),
        None => println!("{url} had no title"),
    }
    Pending => {
        // But what goes here?
    }
}

当我们遇到未来状态仍为Pending时,我们应该怎么办?我们需要某种方法尝试 一次又一次,直到未来最终准备就绪。换句话说, 我们需要一个循环:

let mut page_title_fut = page_title(url);
loop {
    match page_title_fut.poll() {
        Ready(value) => match page_title {
            Some(title) => println!("The title for {url} was {title}"),
            None => println!("{url} had no title"),
        }
        Pending => {
            // continue
        }
    }
}

如果 Rust 将其编译为完全相同的代码,那么每个 await 都会是阻塞的——这与我们的目标完全相反!相反,Rust 确保循环可以将控制权交给某个可以暂停此未来的处理以处理其他未来,然后稍后再检查此未来的东西。正如我们所见,这个东西是一个异步运行时,而这种调度和协调工作是它的主要职责之一。

在本章前面,我们描述了等待rx.recvrecv调用 返回一个未来对象,等待该未来对象会轮询它。我们提到,当通道关闭时,运行时会 暂停该未来对象,直到它准备好返回Some(message)None。通过我们对Future特质的更深入理解,特别是Future::poll,我们可以看到它是如何工作的。当返回Poll::Pending时,运行时知道 未来对象尚未准备好。相反,当poll返回 Poll::Ready(Some(message))Poll::Ready(None)时,运行时知道未来对象准备好并推进它。

具体的运行时如何做到这一点的细节超出了本书的范围, 但关键是了解未来的的基本机制:运行时会轮询它负责的每个未来, 当未来尚未准备好时,将其重新置于休眠状态。

PinUnpin 特性》

当我们介绍固定(pinning)的概念时,在清单 17-16 中遇到了一个非常棘手的错误信息。以下是相关部分:

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
  --> src/main.rs:48:33
   |
48 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

这个错误信息不仅告诉我们需要固定这些值,还解释了为什么需要固定。`trpl::join_all` 函数返回一个名为 `JoinAll` 的结构体。该结构体泛型于类型 `F`,该类型被约束为实现 `Future` 特性。直接使用 `await` 等待一个未来会隐式地固定该未来。这就是为什么我们不需要在每个想要等待未来的代码中使用 `pin!`。

然而,我们在这里并不是直接等待一个未来。相反,我们通过将一个未来集合传递给 join_all 函数来构建一个新的未来,JoinAlljoin_all 的签名要求集合中的项目类型都实现 Future 特性,而 Box<T> 仅在其包装的 T 是一个实现了 Unpin 特性的未来时才实现 Future

这有很多需要吸收的内容!为了真正理解它,让我们进一步探讨 Future 特性实际上是如何工作的,特别是关于 固定

再次查看 Future 特性的定义:

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}

cx 参数及其 Context 类型是运行时实际上知道何时检查给定的未来同时仍然保持惰性化的关键。再次,这些工作原理的细节超出了本章的范围,通常只有在编写自定义 Future 实现时才需要考虑这一点。我们将重点放在 self 的类型上,因为这是第一次看到 self 有类型注解的方法。self 的类型注解与其他函数参数的类型注解类似,但有两个关键区别:

  • 它告诉 Rust self 必须是什么类型才能调用该方法。
  • 它不能是任意类型。它被限制为实现该方法的类型、该类型的引用或智能指针,或者是一个包裹该类型引用的Pin

我们将在第 18 章中看到更多关于这种语法的内容。目前, 知道如果我们想要轮询一个未来以检查它是Pending还是Ready(Output),我们需要一个Pin包装的可变引用就足够了。

Pin 是一个包装器,用于像 &&mutBoxRc 这样的指针类型。(技术上,Pin 适用于实现了 DerefDerefMut 特性的类型,但这实际上等同于仅与指针一起工作。)Pin 本身不是一个指针,也没有像 RcArc 那样具有引用计数的行为;它纯粹是编译器可以用来强制执行指针使用约束的工具。

回忆 await 是通过调用 poll 来实现的,这开始解释了我们之前看到的错误信息,但那是在 Unpin 的背景下,而不是 Pin。那么 Pin 到底是如何与 Unpin 相关的,为什么 Future 需要将 self 放在 Pin 类型中才能调用 poll

记住本章前面提到的,在一个 future 中的一系列 await 点会被编译成一个状态机,编译器确保该状态机遵循 Rust 的所有正常安全规则,包括借用和所有权。为了使这一点生效,Rust 会查看从一个 await 点到下一个 await 点或 async 块结束之间需要哪些数据。然后在编译后的状态机中创建相应的变体。每个变体都会根据需要获取该部分源代码中将要使用的数据的访问权限,无论是通过获取该数据的所有权,还是通过获取其可变或不可变引用。

到目前为止,一切顺利:如果我们对给定异步块中的所有权或引用有任何错误,借用检查器会告诉我们。当我们想要移动与该块对应 的未来对象——比如将其移动到 Vec 中以传递给 join_all——事情就变得更复杂了。

当我们移动一个未来——无论是将其推入数据结构以用作 join_all 的迭代器,还是从函数返回它——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为异步块创建的未来可以在任何给定变体的字段中最终包含对自身的引用,如图 17-4 中的简化图所示。

A single-column, three-row table representing a future, fut1, which has data values 0 and 1 in the first two rows and an arrow pointing from the third row back to the second row, representing an internal reference within the future.
Figure 17-4: A self-referential data type.

默认情况下,任何具有指向自身引用的对象在移动时都是不安全的,因为引用总是指向它们所引用内容的实际内存地址(见图17-5)。如果你移动数据结构本身,这些内部引用将仍然指向旧位置。然而,该内存位置现在是无效的。一方面,当你对数据结构进行更改时,其值不会被更新。另一方面——更重要的是——计算机现在可以自由地将该内存用于其他目的!你可能会在稍后读取完全不相关的数据。

Two tables, depicting two futures, fut1 and fut2, each of which has one column and three rows, representing the result of having moved a future out of fut1 into fut2. The first, fut1, is grayed out, with a question mark in each index, representing unknown memory. The second, fut2, has 0 and 1 in the first and second rows and an arrow pointing from its third row back to the second row of fut1, representing a pointer that is referencing the old location in memory of the future before it was moved.
Figure 17-5: The unsafe result of moving a self-referential data type

理论上,Rust 编译器可以尝试在对象被移动时更新每个引用,但这可能会增加大量的性能开销,特别是当需要更新一整网的引用时。如果我们能够确保相关数据结构 不会在内存中移动,我们就不必更新任何引用。这正是 Rust 的借用检查器所要求的:在安全代码中,它会阻止你移动任何有活动引用的项。

Pin 建立在这一点上,为我们提供了所需的精确保证。当我们通过将指向该值的指针包装在 Pin 中来 固定 一个值时,该值将不能再移动。因此,如果你有 Pin<Box<SomeType>>,你实际上固定的是 SomeType 值,而不是 Box 指针。图 17-6 说明了这一过程。

Three boxes laid out side by side. The first is labeled “Pin”, the second “b1”, and the third “pinned”. Within “pinned” is a table labeled “fut”, with a single column; it represents a future with cells for each part of the data structure. Its first cell has the value “0”, its second cell has an arrow coming out of it and pointing to the fourth and final cell, which has the value “1” in it, and the third cell has dashed lines and an ellipsis to indicate there may be other parts to the data structure. All together, the “fut” table represents a future which is self-referential. An arrow leaves the box labeled “Pin”, goes through the box labeled “b1” and has terminates inside the “pinned” box at the “fut” table.
Figure 17-6: Pinning a `Box` that points to a self-referential future type.

事实上,Box 指针仍然可以自由移动。记住:我们关心的是确保最终被引用的数据保持在原位。如果指针移动了,但其所指向的数据仍在同一位置,如图 17-7 所示,就没有潜在的问题。作为一个独立的练习,查看类型以及 std::pin 模块的文档,尝试弄清楚如何使用 Pin 包装一个 Box。) 关键在于,自引用类型本身不能移动,因为它仍然是固定的。

Four boxes laid out in three rough columns, identical to the previous diagram with a change to the second column. Now there are two boxes in the second column, labeled “b1” and “b2”, “b1” is grayed out, and the arrow from “Pin” goes through “b2” instead of “b1”, indicating that the pointer has moved from “b1” to “b2”, but the data in “pinned” has not moved.
Figure 17-7: Moving a `Box` which points to a self-referential future type.

然而,大多数类型即使在 Pin 指针后面也是完全安全的。只有当项目具有内部引用时,我们才需要考虑固定。像数字和布尔值这样的原始值显然是安全的,因为它们显然没有任何内部引用,所以它们显然是安全的。大多数你在 Rust 中通常使用的类型也是如此。例如,你可以随意移动一个 Vec,而无需担心。根据我们目前所见,如果你有一个 Pin<Vec<String>>,即使 Vec<String> 在没有其他引用的情况下总是可以安全移动,你也必须通过 Pin 提供的安全但限制性的 API 来做所有事情。我们需要一种方法来告诉编译器,在这种情况下移动项目是安全的——这就是 Unpin 发挥作用的地方。

Unpin 是一个标记特征,类似于我们在第 16 章中看到的 SendSync 特征,因此没有自己的功能。标记特征仅存在于告诉编译器在特定上下文中使用实现给定特征的类型是安全的。Unpin 告诉编译器给定类型不需要维持任何关于该值是否可以安全移动的保证。

就像 SendSync 一样,编译器会自动为所有可以证明安全的类型实现 Unpin。一个特殊情况,再次类似于 SendSync,是 Unpin 没有 为某个类型实现。这种表示方法是 impl !Unpin for SomeType,其中 SomeType 是一个类型的名字,该类型在使用指向该类型的指针时 确实 需要保持这些保证以确保安全。

换句话说,关于 PinUnpin 之间的关系有两点需要注意。首先,Unpin 是“正常”情况,而 !Unpin 是特殊情况。其次,一个类型是否实现了 Unpin!Unpin 仅在 使用指向该类型的固定指针(如 Pin<&mut SomeType>)时才重要。

为了具体说明这一点,考虑一个 String:它有一个长度和组成它的 Unicode 字符。我们可以在 Pin 中包装一个 String,如图 17-8 所示。然而,String 自动实现了 Unpin,Rust 中的大多数其他类型也是如此。

Concurrent work flow
Figure 17-8: Pinning a `String`; the dotted line indicates that the `String` implements the `Unpin` trait, and thus is not pinned.

因此,我们可以做一些如果 String 实现了 !Unpin 就会非法的事情,比如在内存中的确切相同位置用另一个字符串替换一个字符串,如图 17-9 所示。这不会违反 Pin 合约,因为 String 没有内部引用使其移动不安全!这正是它实现 Unpin 而不是 !Unpin 的原因。

Concurrent work flow
Figure 17-9: Replacing the `String` with an entirely different `String` in memory.

现在我们已经了解了足够的知识,可以理解在清单 17-17 中报告的 join_all 调用的错误。我们最初尝试将由 async 块生成的 future 移动到 Vec<Box<dyn Future<Output = ()>>> 中,但正如我们所见,这些 future 可能有内部引用,因此它们不实现 Unpin。它们需要被固定,然后我们可以将 Pin 类型传递给 Vec,确信 future 中的底层数据 不会 被移动。

PinUnpin 主要对于构建底层库或构建运行时本身非常重要,而不是用于日常的 Rust 代码。当你在错误消息中看到这些特征时,现在你将更好地了解如何修复你的代码!

注意:这种 PinUnpin 的组合使得在 Rust 中安全实现一类复杂的类型成为可能,这些类型由于自引用而通常会变得具有挑战性。需要 Pin 的类型在今天的异步 Rust 中最为常见,但偶尔你也会在其他上下文中看到它们。

PinUnpin 的具体工作方式以及它们需要遵守的规则,在 std::pin 的 API 文档中有详细的介绍,所以如果你有兴趣深入了解,那是一个很好的起点。

如果你想更详细地了解底层的工作原理,请参阅第2章第4章Rust中的异步编程

流特质

现在你对 FuturePinUnpin 特性有了更深的了解,我们可以将注意力转向 Stream 特性。正如你在本章前面所学,流类似于异步迭代器。然而,与 IteratorFuture 不同,Stream 在编写本文时标准库中没有定义,但 futures crate 中有一个非常常见的定义,被整个生态系统广泛使用。

让我们在查看 Stream 特性如何将它们合并之前,先回顾一下 IteratorFuture 特性的定义。从 Iterator,我们有序列的概念:其 next 方法提供一个 Option<Self::Item>。从 Future,我们有随时间变化的准备状态的概念:其 poll 方法提供一个 Poll<Self::Output>。为了表示随时间变得可用的项目序列,我们定义了一个 Stream 特性,将这些特性结合在一起:

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}
}

Stream 特性定义了一个关联类型 Item,用于表示流生成的项的类型。这类似于 Iterator,其中可能有零到多个项,而不像 Future,其中总是有一个单一的 Output,即使它是单元类型 ()

Stream 还定义了一个方法来获取这些项。我们称之为 poll_next,以明确它以与 Future::poll 相同的方式进行轮询,并以与 Iterator::next 相同的方式生成一系列项。其返回类型将 PollOption 结合在一起。外部类型是 Poll,因为它需要像未来一样检查就绪状态。内部类型是 Option,因为它需要像迭代器一样指示是否还有更多消息。

与这个定义非常相似的内容很可能会成为 Rust 标准库的一部分。在此期间,它已经是大多数运行时工具包的一部分,因此你可以依赖它,接下来我们讨论的所有内容通常都适用!

在我们之前在流处理部分看到的例子中,我们并没有使用 poll_next Stream,而是使用了 nextStreamExt。我们当然可以 通过手动编写自己的 Stream 状态机来直接使用 poll_next API,就像我们可以 通过它们的 poll 方法直接处理未来对象一样。然而,使用 await 要方便得多,而 StreamExt 特性提供了 next 方法,使我们能够做到这一点:

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

trait Stream {
    type Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

trait StreamExt: Stream {
    async fn next(&mut self) -> Option<Self::Item>
    where
        Self: Unpin;

    // other methods...
}
}

注意:本章前面实际使用的定义与此略有不同,因为它支持那些还不支持在特质中使用异步函数的 Rust 版本。因此,它看起来像这样:

fn next(&mut self) -> Next<'_, Self> where Self: Unpin;

那个 Next 类型是一个 struct,它实现了 Future 并允许我们用 Next<'_, Self> 命名对 self 的引用的生命周期,以便 await 可以与这个方法一起工作。

StreamExt 特性也是所有可用于流的有趣方法的所在地。StreamExt 会自动为每个实现了 Stream 的类型实现,但这些特性是单独定义的,以便社区可以在不影响基础特性的情况下迭代便利的 API。

trpl crate 中使用的 StreamExt 版本中,该 trait 不仅定义了 next 方法,还提供了一个默认的 next 实现,该实现正确处理了调用 Stream::poll_next 的细节。这意味着即使你需要编写自己的流数据类型,你也只需实现 Stream,然后任何使用你数据类型的人就可以自动使用 StreamExt 及其方法。

这就是我们对这些特质的底层细节的所有介绍。为了总结,让我们考虑一下未来(包括流)、任务和线程是如何协同工作的!