深入异步特性

在本章中,我们以各种方式使用了FuturePinUnpinStreamStreamExt特质。然而,到目前为止,我们还没有深入探讨它们的工作原理或它们是如何协同工作的。在日常编写Rust代码时,大多数情况下这样做是没问题的。但有时,您会遇到需要理解更多这些细节的情况。在本节中,我们将深入探讨足够多的细节以帮助处理这些情况——同时仍然将真正深入的内容留给其他文档!

未来

Futures 和异步语法中,我们提到Future 是一个特质。让我们先仔细看看它是如何工作的。以下是 Rust 如何定义Future

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}

那个特征定义包括了一堆新的类型还有一些我们之前没见过的语法,所以让我们一步步地解析这个定义。

首先,Future 的关联类型 Output 说明了未来的解析结果。 这类似于 Iterator 特性中的 Item 关联类型。 其次,Future 还有一个 poll 方法,该方法接受一个特殊的 Pin 引用作为其 self 参数和一个可变的 Context 类型引用, 并返回一个 Poll<Self::Output>。我们将在本节稍后讨论 PinContext。现在,让我们先关注方法的返回值, 即 Poll 类型:

#![allow(unused)]
fn main() {
enum Poll<T> {
    Ready(T),
    Pending,
}
}

这个 Poll 类型类似于 Option:它有一个包含值的变体 (Ready(T)),和一个不包含值的变体 (Pending)。然而,它的含义却大不相同!Pending 变体表示该未来还有工作要做,因此调用者需要稍后再次检查。Ready 变体表示 Future 已完成其工作,并且 T 值可用。

注意:对于大多数 future,调用者不应在 future 返回 Ready 后再次调用 poll。许多 future 在变为就绪状态后再次被轮询时会 panic!可以在文档中明确说明可以再次轮询的 future。这类似于 Iterator::next 的行为!

在内部,当你看到使用 await 的代码时,Rust 会将其编译为调用 poll 的代码。如果你回顾一下列表 17-4,我们在其中打印出单个 URL 解析后的页面标题,Rust 会将其编译成类似这样的代码(虽然不完全相同):

match page_title(url).poll() {
    Ready(page_title) => match page_title {
        Some(title) => println!("The title for {url} was {title}"),
        None => println!("{url} had no title"),
    }
    Pending => {
        // But what goes here?
    }
}

Future 仍然是 Pending 时,我们应该怎么办?我们需要某种方法来尝试... 再次,再再次,直到未来最终准备好。换句话说,一个循环:

let mut page_title_fut = page_title(url);
loop {
    match page_title_fut.poll() {
        Ready(value) => match page_title {
            Some(title) => println!("The title for {url} was {title}"),
            None => println!("{url} had no title"),
        }
        Pending => {
            // continue
        }
    }
}

如果 Rust 将其编译为完全相同的代码,那么每个 await 都会是阻塞的——这与我们的目标完全相反!相反,Rust 需要确保循环可以将控制权交给某个可以暂停此未来的处理并处理其他未来并在稍后再次检查此未来的东西。这个“东西”是一个异步运行时,而这种调度和协调工作是运行时的主要任务之一。

回顾我们在计数部分对等待rx.recv的描述。recv调用返回一个Future,并且等待它会轮询它。在我们最初的讨论中,我们提到运行时会暂停该未来对象,直到它准备好返回Some(message)或在通道关闭时返回None。有了我们对Future的更深入理解,特别是Future::poll,我们可以看到它是如何工作的。当返回Poll::Pending时,运行时知道未来对象尚未准备好。相反,当poll返回Poll::Ready(Some(message))Poll::Ready(None)时,运行时知道未来对象已准备好并推进它。

确切的细节关于运行时如何做到这一点超出了我们即使在这一深入探讨部分的范围。关键在于理解未来的基机制:运行时轮询每个它负责的未来,在它们尚未准备好时将其重新置于休眠状态。

固定和 Pin 与 Unpin 特性

当我们介绍在处理清单 17-16 时固定的概念时,遇到了一个非常棘手的错误信息。以下是它相关部分的再次展示:

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
  --> src/main.rs:48:33
   |
48 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

当我们仔细阅读这个错误信息时,它不仅告诉我们需要固定这些值,还告诉我们为什么需要固定。`trpl::join_all` 函数返回一个名为 `JoinAll` 的结构体。该结构体泛型于类型 `F`,该类型被约束为实现 `Future` 特性。直接使用 `await` 等待一个未来会隐式地固定该未来。这就是为什么我们在想要等待未来的地方不需要到处使用 `pin!`。

然而,我们在这里并不是直接等待一个未来。相反,我们通过将一个未来集合传递给 join_all 函数来构造一个新的未来,JoinAlljoin_all 的签名要求集合中的项目类型都实现了 Future 特性,而 Box<T> 只有在其包装的 T 是一个实现了 Unpin 特性的未来时才实现 Future

那确实很多!但是,如果我们进一步探讨 Future 类型的工作原理,特别是关于 固定 的部分,我们就可以理解了。

让我们再次看看 Future 的定义:

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}

cx 参数及其 Context 类型是运行时实际知道何时检查任何给定的 future,同时仍然保持惰性化的关键。尽管如此,这些细节超出了本章的范围:通常只有在编写自定义 Future 实现时才需要关注这些。

相反,我们将关注 self 的类型。这是我们第一次看到一个方法中 self 有类型注解。self 的类型注解与其他函数参数的类型注解类似,但有两个关键区别。首先,当我们以这种方式指定 self 的类型时,我们是在告诉 Rust 调用此方法时 self 必须是什么类型。其次,self 的类型注解不能是任意类型。它只能是实现该方法的类型、该类型的引用或智能指针,或者是包装了该类型引用的 Pin。我们将在第 18 章中看到更多关于这种语法的内容。目前,知道如果我们想轮询一个未来(检查它是否为 PendingReady(Output)),我们需要一个被 Pin 包装的类型的可变引用就足够了。

Pin 是一个包装类型。在某些方面,它类似于我们在第 15 章中看到的 BoxRc 和其他智能指针类型,这些类型也包装了其他类型。然而,与这些不同的是,Pin 只与 指针类型 一起工作,例如引用 (&&mut) 和智能指针 (BoxRc 等)。具体来说,Pin 与实现了 DerefDerefMut 特性的类型一起工作,我们在第 15 章中讨论了这些特性。你可以将这种限制视为等同于仅与指针一起工作,因为实现 DerefDerefMut 意味着你的类型行为类似于指针类型。Pin 本身也不是指针,它没有像 RcArc 那样具有引用计数的行为。它纯粹是编译器可以用来通过包装指针类型来维护相关保证的工具。

回想 await 是通过调用 poll 来实现的,这开始解释了我们上面看到的错误信息——但那是在 Unpin 的背景下,而不是 Pin。那么,PinUnpin 到底是什么,它们之间有什么关系,为什么 Future 需要 self 处于 Pin 类型中才能调用 poll

我们的第一个异步程序中,我们描述了未来中的一个系列的await点如何被编译成一个状态机——并指出编译器如何帮助确保该状态机遵循Rust关于安全性的所有常规规则,包括借用和所有权。为了使这一点生效,Rust会查看每个await点与下一个await点或异步块结束之间所需的数据。然后,它在创建的状态机中创建相应的变体。每个变体都会获得对该部分源代码中将使用的数据的访问权限,无论是通过获取该数据的所有权还是通过获取其可变或不可变引用。

到目前为止一切顺利:如果我们对给定异步块中的所有权或引用有任何错误,借用检查器会告诉我们。当我们想要移动与该块对应的未来对象时——比如将其移动到Vec中以传递给join_all,就像我们在“处理任意数量的未来对象”部分所做的那样——事情会变得更复杂。

当我们移动一个未来——无论是通过将其推入数据结构以用作 join_all 的迭代器,还是从函数返回它们——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为异步块创建的未来可以在任何给定变体的字段中最终包含对自身的引用,如图 17-4 所示(这是一个简化的图示,旨在帮助您理解这个概念,而不是深入探讨通常相当复杂的细节)。

Concurrent work flow
Figure 17-4: A self-referential data type.

默认情况下,任何具有指向自身引用的对象在移动时都是不安全的,因为引用总是指向它们所引用对象的实际内存地址。如果你移动了数据结构本身,那些内部引用将仍然指向旧的位置。然而,那个内存位置现在是无效的。一方面,当你对数据结构进行更改时,其值不会被更新。另一方面,更重要的是!计算机现在可以自由地将该内存用于其他事情!你可能会在稍后读取完全不相关的数据。

Concurrent work flow
Figure 17-5: The unsafe result of moving a self-referential data type.

原则上,Rust 编译器可以尝试在每次对象移动时更新每个对该对象的引用。这可能会带来大量的性能开销,特别是考虑到可能需要更新的引用网络。另一方面,如果我们能确保该数据结构 不会在内存中移动,我们就不需要更新任何引用。这正是 Rust 的借用检查器所要求的:你不能使用安全代码移动任何有活动引用的项。

Pin 在此基础上为我们提供了所需的精确保证。当我们通过将指向该值的指针包装在 Pin 中来 固定 一个值时,该值将不能再移动。因此,如果你有 Pin<Box<SomeType>>,你实际上固定的是 SomeType 值,而不是 Box 指针。图 17-6 说明了这一点:

Concurrent work flow
Figure 17-6: Pinning a `Box` which points to a self-referential future type.

事实上,Box 指针仍然可以自由移动。记住:我们关心的是确保最终被引用的数据保持在原位。如果指针移动了,但其所指向的数据仍在同一位置,如图 17-7 所示,就没有潜在的问题。(如何使用 Pin 包装 Box 来实现这一点超出了我们当前讨论的范围,但这将是一个很好的练习!如果你查看这些类型的文档以及 std::pin 模块,你可能会弄清楚如何做到这一点。)关键是自引用类型本身不能移动,因为它仍然是固定的。

Concurrent work flow
Figure 17-7: Moving a `Box` which points to a self-referential future type.

然而,大多数类型即使在 Pin 指针后面也是完全安全的。我们只有在项目具有内部引用时才需要考虑固定。像数字和布尔值这样的原始值没有任何内部引用,因此它们显然是安全的。你通常在 Rust 中使用的大多数类型也是如此。例如,Vec 没有任何需要保持更新的内部引用,因此你可以随意移动它而无需担心。如果你有一个 Pin<Vec<String>>,即使 Vec<String> 在没有其他引用的情况下总是可以安全移动的,你也必须通过 Pin 提供的安全但受限的 API 来做所有事情。我们需要一种方法来告诉编译器在这些情况下实际上可以安全地移动项目。为此,我们有 Unpin

Unpin 是一个标记特征,类似于我们在第 16 章中看到的 SendSync 特征。回想一下,标记特征本身没有任何功能。它们仅存在于告诉编译器,实现给定特征的类型在特定上下文中使用是安全的。Unpin 告知编译器,给定的类型不需要维持任何关于该值是否可以移动的特定保证。

正如对 SendSync 一样,编译器会自动为所有可以证明安全的类型实现 Unpin。特殊情况,同样类似于 SendSync,是 Unpin 为某个类型实现的情况。这种表示法是 impl !Unpin for SomeType,其中 SomeType 是需要在使用该类型的指针时确保这些保证以确保安全的类型的名称。

换句话说,关于 PinUnpin 之间的关系,有两点需要注意。首先,Unpin 是“正常”情况,而 !Unpin 是特殊情况。其次,一个类型是否实现了 Unpin!Unpin 仅在 使用指向该类型的固定指针(如 Pin<&mut SomeType>)时才重要。

为了具体说明这一点,考虑一个 String:它有一个长度和组成它的 Unicode 字符。我们可以在 Pin 中包装一个 String,如图 17-8 所示。然而,String 自动实现了 Unpin,就像 Rust 中的大多数其他类型一样。

Concurrent work flow
Figure 17-8: Pinning a String, with a dotted line indicating that the String implements the `Unpin` trait, so it is not pinned.

因此,我们可以做一些如果 String 实现了 !Unpin 就会非法的事情,比如在内存中的确切相同位置用另一个字符串替换一个字符串,如图17-9所示。这不会违反 Pin 协议,因为 String 没有内部引用使其移动不安全!这正是它实现 Unpin 而不是 !Unpin 的原因。

Concurrent work flow
Figure 17-9: Replacing the String with an entirely different String in memory.

现在我们已经了解了足够的知识,可以理解在清单 17-17 中报告的 join_all 调用的错误。我们最初尝试将由 async 块生成的 future 移动到 Vec<Box<dyn Future<Output = ()>>> 中,但正如我们所见,这些 future 可能有内部引用,因此它们不实现 Unpin。它们需要被固定,然后我们可以将 Pin 类型传递给 Vec,确信 future 中的底层数据 不会 被移动。

PinUnpin 主要对于构建底层库或构建运行时本身非常重要,而不是用于日常的 Rust 代码。当你在错误消息中看到这些特征时,现在你将更好地了解如何修复代码!

注意:这种 PinUnpin 的组合使得一类复杂的类型在 Rust 中是安全的,而这些类型由于是自引用的,通常很难实现。需要 Pin 的类型在今天的异步 Rust 中最常见,但你也可能——非常罕见!——在其他上下文中看到它。

PinUnpin 的具体工作方式以及它们需要遵守的规则,在 std::pin 的 API 文档中进行了详尽的介绍,因此如果你想更深入地了解它们,这是一个很好的起点。

如果您想更详细地了解“底层”工作原理,官方的Rust 中的异步编程书籍会为您解答:

流特质

现在我们对 FuturePinUnpin 特性有了更深入的了解,我们可以将注意力转向 Stream 特性。正如在介绍流的部分所述,流类似于异步迭代器。与 IteratorFuture 不同,截至本文撰写时,标准库中没有 Stream 特性的定义,但 futures 库中有一个非常常见的定义,在整个生态系统中广泛使用。

让我们回顾一下 IteratorFuture 特性的定义,这样我们就可以逐步了解如何将它们合并到一个 Stream 特性中。从 Iterator 中,我们有序列的概念:其 next 方法提供一个 Option<Self::Item>。从 Future 中,我们有随时间变化的准备状态的概念:其 poll 方法提供一个 Poll<Self::Output>。为了表示随时间变化的项目序列,我们定义了一个 Stream 特性,将这些特性结合在一起:

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}
}

Stream 特性定义了一个关联类型 Item,用于表示流产生的项的类型。这与 Iterator 类似:可能有零个到多个这样的项,而与 Future 不同,Future 始终只有一个 Output(即使它是单元类型 ())。

Stream 还定义了一个方法来获取这些项。我们称之为 poll_next,以明确它以与 Future::poll 相同的方式进行轮询,并以与 Iterator::next 相同的方式生成一系列项。其返回类型将 PollOption 结合在一起。外部类型是 Poll,因为它需要像未来一样检查就绪状态。内部类型是 Option,因为它需要像迭代器一样指示是否还有更多消息。

与这非常相似的内容很可能会成为 Rust 标准库的一部分。在此期间,它已经是大多数运行时工具包的一部分,因此你可以依赖它,下面涵盖的所有内容通常都适用!

在我们之前在流处理部分看到的例子中,我们并没有使用poll_next Stream,而是使用了nextStreamExt。我们可以通过手动编写自己的Stream状态机来直接使用poll_next API,当然,我们也可以通过它们的poll方法直接处理未来对象。然而,使用await要方便得多,因此StreamExt特质提供了next方法,使我们能够这样做。

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

trait Stream {
    type Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

trait StreamExt: Stream {
    async fn next(&mut self) -> Option<Self::Item>
    where
        Self: Unpin;

    // other methods...
}
}

注意:我们在本章前面使用的实际定义与此略有不同,因为它支持尚未支持在特质中使用异步函数的 Rust 版本。因此,它看起来像这样:

fn next(&mut self) -> Next<'_, Self> where Self: Unpin;

那个 Next 类型是一个 struct,它实现了 Future 并提供了一种方法来命名对 self 的引用的生命周期,使用 Next<'_, Self>,这样 await 就可以与这个方法一起工作!

StreamExt 特性也是所有可用于流的有趣方法的所在地。StreamExt 会自动为每个实现了 Stream 的类型实现,但这些特性是单独定义的,以便社区可以独立于基础特性迭代便利性 API。

在用于 trpl crate 的 StreamExt 版本中,该 trait 不仅定义了 next 方法,还提供了 next 方法的实现,该实现正确处理了调用 Stream::poll_next 的细节。这意味着,即使你需要编写自己的流数据类型,你也 只需 实现 Stream,然后任何使用你数据类型的人可以自动使用 StreamExt 及其方法。

这就是我们对这些特质的底层细节的所有介绍。为了总结,让我们考虑一下未来(包括流)、任务和线程是如何协同工作的!