Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

流:按顺序的 Future

回想我们在本章前面的“消息传递”部分中如何使用异步通道的接收者。异步recv方法随时间生成一系列项。这是一个更广泛模式的实例,称为。许多概念自然地表示为流:队列中变得可用的项、当完整数据集太大而无法放入计算机内存时从文件系统中逐步提取的数据块,或随时间通过网络到达的数据。因为流是未来对象,我们可以将它们与任何其他类型的未来对象一起使用,并以有趣的方式组合它们。例如,我们可以批量处理事件以避免触发过多的网络调用,为长时间运行的操作序列设置超时,或限制用户界面事件以避免不必要的工作。

我们在第 13 章“迭代器特质和 next 方法”部分讨论了序列项,但迭代器和异步通道接收器之间有两个区别。第一个区别是时间:迭代器是同步的,而通道接收器是异步的。第二个区别是 API。在直接使用 Iterator 时,我们调用其同步的 next 方法。而对于 trpl::Receiver 流,我们调用的是异步的 recv 方法。除此之外,这些 API 感觉非常相似,这种相似性并非偶然。流就像是异步形式的迭代。虽然 trpl::Receiver 特定地等待接收消息,但通用的流 API 范围更广:它以异步方式提供下一个项,就像 Iterator 那样。

迭代器和流在 Rust 中的相似性意味着我们实际上可以从任何迭代器创建一个流。与迭代器一样,我们可以通过调用其 next 方法并等待输出来处理流,如清单 17-21 所示,这还不能编译。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-21: Creating a stream from an iterator and printing its values

我们从一个数字数组开始,将其转换为迭代器,然后调用 map 来将所有值翻倍。然后我们使用 trpl::stream_from_iter 函数将迭代器转换为流。接下来,我们使用 while let 循环来循环处理流中到达的项目。

不幸的是,当我们尝试运行代码时,它无法编译,而是报告说没有可用的next方法:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

正如此输出解释的那样,编译器错误的原因是我们需要正确的特质在作用域内才能使用next方法。根据我们到目前为止的讨论,你可能会合理地认为这个特质是Stream,但实际上它是StreamExt扩展的简称,Ext是在Rust社区中扩展一个特质的常见模式。

Stream 特性定义了一个低级别的接口,有效地结合了 IteratorFuture 特性。StreamExtStream 上提供了一组更高层次的 API,包括 next 方法以及其他类似于 Iterator 特性提供的实用方法。StreamStreamExt 尚未成为 Rust 标准库的一部分,但大多数生态系统 crate 使用类似的定义。

修复编译器错误的方法是添加一个 use 语句,用于 trpl::StreamExt,如列表 17-22 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --snip--
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-22: Successfully using an iterator as the basis for a stream

将所有这些部分组合在一起,这段代码就能按我们想要的方式工作!更重要的是,现在我们已经将StreamExt引入作用域,我们可以使用它的所有实用方法,就像使用迭代器一样。