到目前为止,在本章中,我们主要关注的是单个的 future。唯一的重大例外是我们使用的异步通道。回想一下我们如何在本章的“消息传递” 部分中使用异步通道的接收器。异步recv
方法随时间生成一系列项目。这是一个更普遍模式的实例,通常称为流 。
一系列项目是我们之前在第 13 章讨论 Iterator
特性时见过的,但迭代器和异步通道接收器之间有两个不同点。第一个不同点是时间因素:迭代器是同步的,而通道接收器是异步的。第二个不同点是 API。当我们直接使用 Iterator
时,我们调用其同步的 next
方法。而对于 trpl::Receiver
流,我们调用的是异步的 recv
方法,但这些 API 否则感觉非常相似。
这种相似性并非偶然。流类似于异步形式的迭代。虽然 trpl::Receiver
特定地等待接收消息,但通用的流 API 要广泛得多:它以 Iterator
的方式提供下一个项目,但异步进行。Rust 中迭代器和流之间的相似性意味着我们实际上可以从任何迭代器创建一个流。与迭代器一样,我们可以通过调用流的 next
方法并等待输出来处理流,如示例 17-30 所示。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Listing 17-30: Creating a stream from an iterator and printing its values
我们从一个数字数组开始,将其转换为迭代器,然后调用map
来将所有值翻倍。然后我们使用trpl::stream_from_iter
函数将迭代器转换为流。然后我们使用while let
循环遍历流中的项目。
不幸的是,当我们尝试运行代码时,它无法编译。相反,正如我们在输出中看到的,它报告说没有可用的next
方法。
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
正如输出所建议的,编译器错误的原因是我们需要正确的特质在作用域内才能使用 next
方法。根据我们到目前为止的讨论,你可能会合理地认为这应该是 Stream
,但这里我们需要的特质实际上是 StreamExt
。这里的 Ext
表示“扩展”:这是 Rust 社区中扩展一个特质的常见模式。
为什么我们需要StreamExt
而不是Stream
,Stream
特质本身做了什么?简而言之,答案是在整个Rust生态系统中,Stream
特质定义了一个低级别的接口,有效地结合了Iterator
和Future
特质。StreamExt
特质在Stream
之上提供了一组更高层次的API,包括next
方法以及其他类似于Iterator
特质提供的实用方法。我们将在本章末尾更详细地回到Stream
和StreamExt
特质。现在,这些足以让我们继续前进。
修复编译器错误的方法是添加一个 use
语句,用于 trpl::StreamExt
,如列表 17-31 所示。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Listing 17-31: Successfully using an iterator as the basis for a stream
将所有这些部分组合在一起,这段代码就能按我们想要的方式工作!更重要的是,现在我们已经将 StreamExt
引入作用域,我们可以使用它的所有实用方法,就像使用迭代器一样。例如,在示例 17-32 中,我们使用 filter
方法来过滤掉除三和五的倍数之外的所有内容。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::run(async {
let values = 1..101;
let iter = values.map(|n| n * 2);
let stream = trpl::stream_from_iter(iter);
let mut filtered =
stream.filter(|value| value % 3 == 0 || value % 5 == 0);
while let Some(value) = filtered.next().await {
println!("The value was: {value}");
}
});
}
Listing 17-32: Filtering a Stream
with the StreamExt::filter
method
当然,这并不是非常有趣。我们可以通过普通的迭代器来实现,完全不需要任何异步。所以让我们看看我们可以做的一些其他事情,这些是流独有的。
许多概念自然地表示为流:项目在队列中变得可用,或者通过一次从文件系统中提取一部分数据来处理超出计算机内存的数据,或者数据随时间通过网络到达。因为流是未来的,我们可以将它们与任何其他类型的未来一起使用,也可以以有趣的方式将它们组合起来。例如,我们可以批量处理事件以避免触发过多的网络调用,为长时间运行的操作序列设置超时,或限制用户界面事件以避免不必要的工作。
让我们先构建一个消息流,作为我们可能从WebSocket或其他实时通信协议中看到的数据流的替代品。在清单17-33中,我们创建了一个函数get_messages
,它返回impl Stream<Item = String>
。在其实现中,我们创建了一个异步通道,遍历英文字母表的前十个字母,并将它们通过通道发送。
我们还使用了一种新的类型:ReceiverStream
,它将来自trpl::channel
的rx
接收器转换为具有next
方法的Stream
。在main
中,我们使用while let
循环来打印流中的所有消息。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages = get_messages();
while let Some(message) = messages.next().await {
println!("{message}");
}
});
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for message in messages {
tx.send(format!("Message: '{message}'")).unwrap();
}
ReceiverStream::new(rx)
}
Listing 17-33: Using the rx
receiver as a ReceiverStream
当我们运行这段代码时,我们得到的正是我们预期的结果:
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
我们可以通过常规的Receiver
API,或者甚至是常规的Iterator
API来实现这一点。不过,让我们添加一些需要流的东西:为流中的每个项目添加一个超时,并在我们发出的项目上添加延迟。
在清单 17-34 中,我们首先使用来自 StreamExt
特性的 timeout
方法为流添加超时。然后我们更新 while let
循环的主体,因为流现在返回一个 Result
。Ok
变体表示消息及时到达;Err
变体表示在任何消息到达之前超时已过期。我们对这个结果进行 match
,当成功接收到消息时打印消息,或者打印超时通知。最后,注意我们在应用超时后固定消息,因为超时助手生成的流需要被固定才能被轮询。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages =
pin!(get_messages().timeout(Duration::from_millis(200)));
while let Some(result) = messages.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for message in messages {
tx.send(format!("Message: '{message}'")).unwrap();
}
ReceiverStream::new(rx)
}
Listing 17-34: Using the StreamExt::timeout
method to set a time limit on the items in a stream
然而,因为消息之间没有延迟,这个超时并不会改变程序的行为。让我们给发送的消息添加一个可变的延迟。在get_messages
中,我们使用enumerate
迭代器方法与messages
数组,这样我们可以获取我们发送的每个项目的索引以及项目本身。然后我们对偶数索引的项目应用100毫秒的延迟,对奇数索引的项目应用300毫秒的延迟,以模拟在现实世界中我们可能从消息流中看到的不同延迟。因为我们的超时时间是200毫秒,这应该会影响一半的消息。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages =
pin!(get_messages().timeout(Duration::from_millis(200)));
while let Some(result) = messages.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
Listing 17-35: Sending messages through tx
with an async delay without making get_messages
an async function
为了在 get_messages
函数中不阻塞地在消息之间休眠,我们需要使用 async。但是,我们不能将 get_messages
本身变成一个 async 函数,因为那样我们会返回一个 Future<Output = Stream<Item = String>>
而不是一个 Stream<Item = String>>
。调用者必须等待 get_messages
本身才能访问流。但请记住:给定未来中的所有事情都是线性发生的;并发发生在未来之间。等待 get_messages
会要求它在返回接收者流之前发送所有消息,包括在发送每条消息之间休眠。因此,超时将变得无用。流本身不会有延迟:所有延迟都会在流可用之前发生。
相反,我们将get_messages
保留为一个返回流的普通函数,并启动一个任务来处理异步sleep
调用。
注意:以这种方式调用spawn_task
可以工作,因为我们已经设置了我们的运行时。在没有首先设置运行时的情况下调用此特定实现的spawn_task
会导致恐慌。其他实现选择了不同的权衡:它们可能会启动一个新的运行时,从而避免恐慌,但最终会增加一些额外的开销,或者根本不会提供一种独立于运行时来启动任务的方法。您应该确保了解您的运行时选择了哪种权衡,并相应地编写代码!
现在我们的代码有了一个更有趣的结果!在每一对消息之间,我们看到一个错误报告:Problem: Elapsed(())
。
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
超时并不会阻止消息最终到达——我们仍然会收到所有原始消息。这是因为我们的通道是无界的:它可以容纳我们内存中能容纳的尽可能多的消息。如果消息在超时前没有到达,我们的流处理器会对此进行处理,但当它再次轮询流时,消息可能已经到达。
你可以通过使用其他类型的通道,或更广泛地说,使用其他类型的流来获得所需的不同行为。让我们在本节的最后一个例子中通过结合时间间隔流和消息流来实际看看其中一个。
首先,让我们创建另一个流,如果让它直接运行,它将每毫秒发出一个项目。为了简单起见,我们可以使用sleep
函数来延迟发送消息,并结合我们在get_messages
中使用的方法,从通道创建流。不同的是,这次我们将返回已过去的时间间隔的计数,因此返回类型将是impl Stream<Item = u32>
,我们可以将此函数称为get_intervals
。
在清单 17-36 中,我们首先在任务中定义一个 count
。(我们也可以在任务外部定义它,但限制任何给定变量的作用域会更清晰。)然后我们创建一个无限循环。循环的每次迭代都会异步休眠一毫秒,增加计数,然后通过通道发送。因为所有这些都封装在由 spawn_task
创建的任务中,所以所有这些都会与运行时一起被清理,包括无限循环。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let mut messages =
pin!(get_messages().timeout(Duration::from_millis(200)));
while let Some(result) = messages.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
Listing 17-36: Creating a stream with a counter that will be emitted once every millisecond
这种无限循环,只有在运行时被完全拆除时才会结束,在异步 Rust 中相当常见:许多程序需要无限期地运行。使用异步,只要循环的每次迭代中至少有一个 await 点,就不会阻塞其他任何内容。
在我们的主函数的异步块中,我们首先调用get_intervals
。然后我们使用merge
方法将messages
和intervals
流合并,该方法将多个流合并为一个流,该流会立即从任何源流中生成项目,而不会强加任何特定的顺序。最后,我们遍历这个合并的流而不是messages
(清单17-37)。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
while let Some(result) = merged.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
Listing 17-37: Attempting to merge streams of messages and intervals
在这一点上,messages
和 intervals
都不需要被固定或可变,因为它们将被合并成单一的 merged
流。然而,这个 merge
调用无法编译!(while let
循环中的 next
调用也无法编译,但我们在解决这个问题后会回到这一点。)这两个流有不同的类型。messages
流的类型为 Timeout<impl Stream<Item = String>>
,其中 Timeout
是为 timeout
调用实现 Stream
的类型。同时,intervals
流的类型为 impl Stream<Item = u32>
。为了合并这两个流,我们需要将其中一个转换为与另一个匹配。
在清单 17-38 中,我们重新处理 intervals
流,因为 messages
已经是我们想要的基本格式,并且需要处理超时错误。首先,我们可以使用 map
辅助方法将 intervals
转换为字符串。其次,我们需要匹配来自 messages
的 Timeout
。然而,我们实际上并不希望 intervals
有超时,因此我们可以创建一个比我们使用的其他持续时间更长的超时。这里,我们使用 Duration::from_secs(10)
创建一个 10 秒的超时。最后,我们需要将 stream
设置为可变的,以便 while let
循环的 next
调用可以遍历流,并将其固定,以确保这样做是安全的。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
Listing 17-38: Aligning the types of the the intervals
stream with the type of the messages
stream
这让我们几乎 达到了我们需要的状态。所有类型都检查通过。但是,如果你运行这段代码,将会出现两个问题。首先,它永远不会停止!你需要用ctrl-c 来停止它。其次,来自英文字母的消息将被大量的时间间隔计数器消息所淹没:
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
列表 17-39 展示了解决最后两个问题的一种方法。首先,我们在 intervals
流上使用 throttle
方法,以防止它压垮 messages
流。节流是一种限制函数调用频率的方法——或者在这种情况下,限制流被轮询的频率。每一百毫秒一次应该足够了,因为这与我们的消息到达的频率大致相同。
为了限制我们从流中接受的项目数量,我们可以使用take
方法。我们将其应用于合并的 流,因为我们要限制最终的输出,而不仅仅是其中一个流或另一个流。
Filename: src/main.rs
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.throttle(Duration::from_millis(100))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
Listing 17-39: Using throttle
and take
to manage the merged streams
现在当我们运行程序时,它在从流中拉取二十个项目后停止,而且间隔不会压倒消息。我们也不会得到Interval: 100
或Interval: 200
等,而是得到Interval: 1
、Interval: 2
等——尽管我们有一个可以 每毫秒生成一个事件的源流。这是因为throttle
调用生成了一个新的流,包裹了原始流,使得原始流只在节流速率下被轮询,而不是其自身的“原生”速率。我们没有一堆未处理的间隔消息需要选择忽略。相反,我们从一开始就从未生成这些间隔消息!这是Rust的未来机制固有的“惰性”再次发挥作用,允许我们选择性能特性。
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
还有最后一件事需要处理:错误!对于这两个基于通道的流,当通道的另一端关闭时,send
调用可能会失败——这仅仅是运行时执行构成流的未来对象的方式的问题。到目前为止,我们通过调用 unwrap
忽略了这一点,但在一个行为良好的应用程序中,我们应该显式地处理错误,至少通过结束循环来避免尝试发送更多消息!列表 17-40 显示了一个简单的错误处理策略:打印问题,然后从循环中 break
。和往常一样,处理消息发送错误的正确方法会有所不同——只需确保你有一个策略。
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval #{count}"))
.throttle(Duration::from_millis(500))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(item) => println!("{item}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
});
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}
Listing 17-40: Handling errors and shutting down the loops
现在我们已经看到了很多异步的实际应用,让我们退一步,深入探讨一下 Rust 用于实现异步的关键特质 Future
、Stream
以及其他一些细节。