处理任意数量的 Future
当我们从使用两个 future 转变为使用三个 future 时,我们还必须从使用 join
转变为使用 join3
。每次我们改变想要连接的 future 数量时,都必须调用不同的函数,这会很烦人。幸运的是,我们有一个宏形式的 join
,可以传递任意数量的参数。它还自行处理 future 的等待。因此,我们可以将列表 17-13 中的代码重写为使用 join!
而不是 join3
,如列表 17-14 所示:
这肯定比需要在 join
和 join3
和 join4
等之间切换要好得多!然而,即使这种宏形式也只在我们提前知道未来数量的情况下有效。然而,在现实世界的 Rust 中,将未来推入集合,然后等待该集合中的一些或所有未来完成,是一种常见的模式。
要检查集合中的所有 future,我们需要遍历并连接所有的它们。trpl::join_all
函数接受任何实现了 Iterator
特性的类型,我们在第 13 章中学习过,所以它看起来正是我们需要的。让我们尝试将我们的 future 放入一个向量中,并用 join_all
替换 join!
。
不幸的是,这无法编译。相反,我们得到了这个错误:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
这可能会让人感到惊讶。毕竟,它们都没有返回任何东西,所以每个块都会生成一个Future<Output = ()>
。然而,Future
是一个特质,而不是一个具体类型。具体类型是由编译器为异步块生成的各个数据结构。你不能将两个不同的手写结构体放入一个Vec
中,同样地,也不能将编译器生成的不同结构体放入其中。
要使这工作,我们需要使用trait 对象,就像我们在“从 run 函数返回错误”第 12 章中所做的那样。(我们将在第 18 章详细讨论 trait 对象。)使用 trait 对象让我们可以将这些类型产生的每个匿名 future 视为同一类型,因为它们都实现了Future
trait。
注意:在第 8 章中,我们讨论了另一种在 Vec
中包含多种类型的方法:使用枚举来表示向量中可能出现的每种不同类型。然而,我们在这里不能这样做。一方面,我们无法命名这些不同的类型,因为它们是匿名的。另一方面,我们首先选择向量和 join_all
的原因是为了能够处理一个动态的未来集合,直到运行时我们才知道它们会是什么。
我们首先将 vec!
中的每个 future 用 Box::new
包装起来,如清单 17-16 所示。
不幸的是,这仍然无法编译。事实上,我们遇到了与之前相同的基本错误,但这次在第二个和第三个 Box::new
调用时都出现了错误,同时还有新的错误提到了 Unpin
特性。我们稍后会回到 Unpin
错误。首先,让我们通过显式注解 futures
变量的类型来修复 Box::new
调用的类型错误:
我们在这里必须编写的类型有点复杂,所以让我们逐步分析它:
- 最内层的类型是未来本身。我们明确指出,未来的输出是单元类型
()
,通过写Future<Output = ()>
。 - 然后我们用
dyn
注解这个 trait 以标记它为动态。 - 整个特征引用被包裹在一个
Box
中。 - 最后,我们明确指出
futures
是一个包含这些项的Vec
。
这已经产生了很大的不同。现在当我们运行编译器时,我们只有提到Unpin
的错误。虽然有三个,但请注意,每个错误的内容都非常相似。
error[E0308]: mismatched types
--> src/main.rs:46:46
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0308]: mismatched types
--> src/main.rs:46:64
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
30 | let tx_fut = async move {
| ---------- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:30:22: 30:32}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:24
|
48 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:9
|
48 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
那有很多需要消化的内容,所以我们来拆解一下。消息的第一部分
告诉我们第一个异步块(src/main.rs:8:23: 20:10
)没有
实现Unpin
特质,并建议使用pin!
或Box::pin
来解决
它。在本章的后面,我们将深入探讨更多关于Pin
和
Unpin
的细节。不过,现在我们只需按照编译器的建议来解决问题!在清单17-18中,我们首先更新
futures
的类型注解,用Pin
包裹每个Box
。其次,我们使用Box::pin
来固定
这些未来对象。
如果我们编译并运行这个程序,我们最终会得到我们希望的输出:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
呼!
这里还有更多可以探讨的内容。一方面,使用Pin<Box<T>>
会因为使用 Box
将这些 future 放在堆上而带来一些额外的开销——而我们这样做只是为了使类型对齐。毕竟,我们实际上并不 需要 堆分配:这些 future 仅限于这个特定的函数。如上所述,Pin
本身是一个包装类型,因此我们可以在 Vec
中拥有单一类型的好处——这正是我们最初选择 Box
的原因——而无需进行堆分配。我们可以直接使用 Pin
与每个 future,使用 std::pin::pin
宏。
然而,我们必须明确指出固定引用的类型;否则 Rust 仍然不知道将这些解释为动态特征对象,而这正是我们需要它们在 Vec
中成为的。因此,我们在定义每个未来时都使用 pin!
,并将 futures
定义为包含指向动态 Future
类型的固定可变引用的 Vec
,如清单 17-19 所示。
我们通过忽略我们可能有不同的Output
类型这一事实才走到这一步。例如,在示例 17-20 中,a
的匿名未来实现了Future<Output = u32>
,b
的匿名未来实现了Future<Output = &str>
,c
的匿名未来实现了Future<Output = bool>
。
我们可以使用trpl::join!
来等待它们,因为它允许你传入多个未来类型并生成这些类型的元组。我们不能使用trpl::join_all
,因为它要求传入的所有未来都具有相同的类型。记住,那个错误是我们开始这次与Pin
的冒险的原因!
这是一个基本的权衡:我们可以使用join_all
处理动态数量的未来对象,只要它们都具有相同的类型,或者我们可以使用join
函数或join!
宏处理固定数量的未来对象,即使它们具有不同的类型。这与在Rust中处理任何其他类型是一样的。未来对象并不特殊,尽管我们有一些不错的语法来处理它们,这是一件好事。
竞速未来
当我们使用 join
系列函数和宏“连接”未来值时,我们要求 所有 未来值都完成之后才继续前进。然而,有时我们只需要 某些 未来值从一组中完成之后就可以继续前进——有点类似于让一个未来值与另一个未来值竞赛。
在清单 17-21 中,我们再次使用 trpl::race
来运行两个未来,slow
和 fast
,彼此竞争。每个未来在开始运行时都会打印一条消息,通过调用并等待 sleep
暂停一段时间,然后在完成时打印另一条消息。然后我们将两者传递给 trpl::race
并等待其中一个完成。(这里的结局不会太令人惊讶:fast
赢了!)与我们在 “我们的第一个异步程序” 中使用 race
时不同,这里我们只是忽略它返回的 Either
实例,因为所有有趣的行为都发生在异步块的主体中。
请注意,如果你调换 race
函数参数的顺序,“started” 消息的顺序也会改变,即使 fast
未来总是首先完成。这是因为这个特定的 race
函数的实现是不公平的。它总是按照参数传递的顺序运行传递的未来。其他实现 是 公平的,会随机选择首先轮询哪个未来。然而,无论我们使用的 race
实现是否公平,一个 未来将在另一个任务开始之前运行到其主体中的第一个 await
。
回想 我们的第一个异步程序 中,在每个 await 点,Rust 会给运行时一个机会,如果正在等待的未来值尚未准备好,可以暂停任务并切换到另一个任务。相反的情况也成立:Rust 仅 在 await 点暂停异步块并将控制权交还给运行时。await 点之间的所有内容都是同步的。
这意味着,如果你在一个异步块中进行大量工作而没有 await 点, 该未来将阻止其他未来取得进展。你有时可能会听到这种情况被称为一个未来 饿死 其他未来。在某些情况下, 这可能不是什么大问题。然而,如果你正在进行某种昂贵的设置或长时间运行的工作,或者如果你有一个将无限期地继续执行某项特定任务的未来,你将需要考虑何时以及在哪里 将控制权交还给运行时。
同样地,如果你有长时间运行的阻塞操作,异步可以是一个有用的工具,为程序的不同部分提供相互关联的方式。
但是在这些情况下,你如何将控制权交还给运行时?
生成
让我们模拟一个长时间运行的操作。列表 17-22 引入了一个 slow
函数。它使用 std::thread::sleep
而不是 trpl::sleep
,因此调用 slow
将会阻塞当前线程一段时间(以毫秒为单位)。我们可以使用 slow
来代表那些既长时间运行又阻塞的现实操作。
在清单 17-23 中,我们使用 slow
来模拟在一对 future 中执行这种 CPU 密集型工作。首先,每个 future 只有在执行了一堆慢操作 之后 才将控制权交还给运行时。
如果您运行此代码,您将看到以下输出:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
与我们之前的例子一样,race
仍然在 a
完成时结束。
不过,这两个未来之间没有交错。a
未来在其 trpl::sleep
调用被等待之前完成所有工作,然后 b
未来在其自己的 trpl::sleep
调用被等待之前完成所有工作,最后 a
未来完成。为了在它们的慢任务之间让两个未来都能取得进展,我们需要等待点,以便我们可以将控制权交还给运行时。这意味着我们需要一些可以等待的东西!
我们已经可以在清单 17-23 中看到这种交接的发生:如果我们移除 a
未来末尾的 trpl::sleep
,它将在 b
未来完全不运行的情况下完成。也许我们可以将 sleep
函数作为起点?
在清单 17-24 中,我们在每次调用 slow
之间添加了带有 await 点的 trpl::sleep
调用。现在两个未来的任务是交错进行的:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
a
未来在将控制权交给 b
之前仍然运行一段时间,因为它在调用 trpl::sleep
之前先调用了 slow
,但在那之后,每当其中一个到达 await 点时,这些未来就会来回交换。在这种情况下,我们在每次调用 slow
之后都这样做了,但我们可以根据自己的需要以任何最合理的方式分配工作。
我们并不真的想在这里睡眠:我们希望尽可能快地取得进展。我们只需要将控制权交还给运行时。我们可以直接使用yield_now
函数来实现。在清单17-25中,我们将所有那些sleep
调用替换为yield_now
。
这既更清楚地表达了实际意图,也可以比使用sleep
显著更快,因为像sleep
所使用的计时器通常有其粒度限制。例如,我们正在使用的sleep
版本,即使我们传递一个Duration
为一纳秒,它也会至少休眠一毫秒。同样,现代计算机很快:它们在一毫秒内可以完成很多事情!
你可以通过设置一个小基准测试来自己查看,例如列表 17-26 中的基准测试。(这并不是一种特别严谨的性能测试方法,但足以显示这里的差异。)在这里,我们跳过所有状态打印,将一个纳秒的Duration
传递给trpl::sleep
,并让每个未来单独运行,不在这两个未来之间切换。然后我们运行 1,000 次迭代,看看使用trpl::sleep
的未来与使用trpl::yield_now
的未来相比需要多长时间。
带有 yield_now
的版本 快得多!
这意味着,即使对于计算密集型任务,async 也可以非常有用,这取决于程序的其他部分在做什么,因为它提供了一种有用的工具来结构化程序不同部分之间的关系。这是一种 协作式多任务处理,每个 future 都有权决定何时通过 await 点交出控制权。因此,每个 future 也有责任避免阻塞时间过长。在某些基于 Rust 的嵌入式操作系统中,这是 唯一 的多任务处理方式!
在实际代码中,你通常不会在每一行代码中交替使用函数调用和 await 点,当然。虽然以这种方式让出控制权相对便宜,但并不是免费的!在许多情况下,尝试将计算密集型任务分解可能会使其显著变慢,因此有时为了 整体 性能,让一个操作短暂阻塞会更好。你应该始终测量以了解代码的实际性能瓶颈是什么。然而,如果你确实看到很多你期望并发执行的工作实际上是串行进行的,那么底层的动态是一个重要的考虑因素!
构建我们自己的异步抽象
我们还可以将未来组合在一起以创建新的模式。例如,我们可以
使用我们已经拥有的异步构建块来构建一个timeout
函数。当我们完成时,结果将是一个我们可以用来构建
更进一步的异步抽象的构建块。
列表 17-27 显示了我们期望这个 timeout
如何与一个慢的未来一起工作。
让我们来实现这个!首先,让我们思考一下 timeout
的 API:
- 它本身需要是一个异步函数,这样我们才能等待它。
- 它的第一个参数应该是要运行的 future。我们可以将其泛化以允许与任何 future 一起使用。
- 其第二个参数将是最大等待时间。如果我们使用
Duration
,这将使其易于传递给trpl::sleep
。 - 它应该返回一个
Result
。如果未来完成成功,Result
将是Ok
,包含未来产生的值。如果超时先发生,Result
将是Err
,包含超时等待的持续时间。
列表 17-28 显示了此声明。
这满足了我们对类型的要求。现在让我们考虑需要的行为:我们希望将传入的未来与持续时间进行竞赛。我们可以使用trpl::sleep
从持续时间创建一个计时器未来,并使用trpl::race
来运行计时器与调用者传入的未来。
我们也知道 race
是不公平的,并且按照传递的顺序轮询参数。因此,我们首先将 future_to_try
传递给 race
,以便即使 max_time
是一个非常短的持续时间,它也有机会完成。如果 future_to_try
首先完成,race
将返回 Left
,并带有 future
的输出。如果 timer
首先完成,race
将返回 Right
,并带有计时器的输出 ()
。
在清单 17-29 中,我们匹配 trpl::race
的结果。如果 future_to_try
成功并且我们得到一个 Left(output)
,我们返回 Ok(output)
。如果睡眠计时器到期并且我们得到一个 Right(())
,我们使用 _
忽略 ()
并返回 Err(max_time)
。
至此,我们已经用两个其他异步助手构建了一个可用的timeout
。如果
我们运行我们的代码,它将在超时后打印失败模式:
Failed after 2 seconds
因为 Future 可以与其他 Future 组合,所以你可以使用较小的异步构建块来构建非常强大的工具。例如,你可以使用相同的方法将超时与重试结合起来,然后将这些与网络调用等事物结合使用——这是本章开头的一个例子!
在实际中,你通常会直接使用 async
和 await
,其次会使用诸如 join
、join_all
、race
等函数和宏。你只需要偶尔使用 pin
来与这些 API 一起使用。
我们现在看到了同时处理多个未来的几种方法。接下来,我们将看看如何随着时间在一个序列中处理多个未来,使用流。不过,在此之前,这里还有几件你可能想要先考虑的事情:
- 我们使用了
Vec
和join_all
来等待某组中所有未来的完成。你如何使用Vec
来按顺序处理一组未来的?这样做有什么权衡? - 查看
futures::stream::FuturesUnordered
类型,来自futures
库。使用它与使用Vec
有什么不同?(不要担心它是来自库的stream
部分;它可以很好地与任何未来的集合一起使用。)