流:按顺序的 Future

到目前为止,在本章中,我们主要关注的是单个的 future。唯一的重大例外是我们使用的异步通道。回想一下我们在本章前面的“消息传递”部分中如何使用异步通道的接收器。异步recv方法随时间生成一系列项目。这是一个更普遍的模式的实例,称为

我们在第 13 章迭代器特质和 next 方法部分讨论了 Iterator 特质时,看到了一系列的项目,但迭代器和异步通道接收器之间有两个区别。第一个区别是时间:迭代器是同步的,而通道接收器是异步的。第二个区别是 API。当我们直接使用 Iterator 时,我们调用其同步的 next 方法。而对于 trpl::Receiver 流,我们调用的是异步的 recv 方法。除此之外,这些 API 感觉非常相似,这种相似性并非偶然。流就像是迭代的一种异步形式。虽然 trpl::Receiver 特定地等待接收消息,但通用的流 API 范围更广:它以 Iterator 的方式提供下一个项目,但它是异步的。

迭代器和流在 Rust 中的相似性意味着我们可以从任何迭代器创建一个流。与迭代器一样,我们可以通过调用其 next 方法并等待输出来处理流,如列表 17-30 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
Listing 17-30: Creating a stream from an iterator and printing its values

我们从一个数字数组开始,将其转换为迭代器,然后调用 map 来将所有值翻倍。然后我们使用 trpl::stream_from_iter 函数将迭代器转换为流。接下来,我们使用 while let 循环来循环处理流中到达的项目。


error[E0599]: no method named `next` found for struct `Iter` in the current scope
  --> src/main.rs:10:40
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   = note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
   = note: consider using `--verbose` to print the full type name to the console
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
1  + use crate::trpl::StreamExt;
1  + use futures_util::stream::stream::StreamExt;
1  + use std::iter::Iterator;
1  + use std::str::pattern::Searcher;
help: there is a method `try_next` with a similar name
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~


我们将在本章末尾更详细地解释 StreamStreamExt 特性,但目前你需要知道的是,Stream 特性定义了一个低级别的接口,有效地结合了 IteratorFuture 特性。StreamExtStream 上提供了一组更高层次的 API,包括 next 方法以及其他类似于 Iterator 特性提供的实用方法。StreamStreamExt 尚未成为 Rust 标准库的一部分,但大多数生态系统 crate 使用相同的定义。

修复编译器错误的方法是添加一个 use 语句,用于 trpl::StreamExt,如列表 17-31 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
Listing 17-31: Successfully using an iterator as the basis for a stream

将所有这些部分组合在一起,这段代码就能按我们想要的方式工作!更重要的是,现在我们已经将 StreamExt 引入作用域,我们可以使用它的所有实用方法,就像使用迭代器一样。例如,在示例 17-32 中,我们使用 filter 方法来过滤掉除三和五的倍数之外的所有内容。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
Listing 17-32: Filtering a stream with the StreamExt::filter method





Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();

Listing 17-33: Using the rx receiver as a ReceiverStream

首先,我们创建一个名为 get_messages 的函数,该函数返回 impl Stream<Item = String>。对于其实现,我们创建一个异步通道,遍历英文字母表的前10个字母,并将它们通过通道发送。

我们还使用了一种新的类型:ReceiverStream,它将来自trpl::channelrx接收器转换为具有next方法的Stream。在main中,我们使用while let循环来打印流中的所有消息。


Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

再次,我们可以通过常规的Receiver API 或者甚至是常规的Iterator API 来实现这一点,所以让我们添加一个需要流的功能:为流中的每个项目添加一个超时,并在我们发出的项目上添加延迟,如列表 17-34 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();

Listing 17-34: Using the StreamExt::timeout method to set a time limit on the items in a stream

我们首先通过 timeout 方法为流添加超时,该方法来自 StreamExt 特性。然后我们更新 while let 循环的主体,因为流现在返回一个 ResultOk 变体表示消息及时到达;Err 变体表示在任何消息到达之前超时已过期。我们对这个结果进行 match,当成功接收到消息时打印消息,或者在超时情况下打印通知。最后,注意我们在应用超时后固定消息,因为超时助手生成的流需要被固定才能被轮询。

然而,由于消息之间没有延迟,这个超时并不会改变程序的行为。让我们给发送的消息添加一个可变的延迟,如清单 17-35 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };

            tx.send(format!("Message: '{message}'")).unwrap();

Listing 17-35: Sending messages through tx with an async delay without making get_messages an async function

get_messages 中,我们使用 enumerate 迭代器方法与 messages 数组,以便我们可以获取每个项目的索引以及项目本身。然后我们对索引为偶数的项目应用 100 毫秒的延迟,对索引为奇数的项目应用 300 毫秒的延迟,以模拟在现实世界中从消息流中可能看到的不同延迟。因为我们的超时时间是 200 毫秒,这应该会影响一半的消息。

为了在 get_messages 函数中不阻塞地在消息之间休眠,我们需要使用 async。但是,我们不能将 get_messages 本身变成一个 async 函数,因为那样我们会返回一个 Future<Output = Stream<Item = String>> 而不是一个 Stream<Item = String>>。调用者必须等待 get_messages 本身才能访问流。但请记住:给定未来中的所有事情都是线性发生的;并发发生在未来之间。等待 get_messages 会要求它在返回接收者流之前发送所有消息,包括每条消息之间的休眠延迟。因此,超时将毫无用处。流本身将没有延迟;所有延迟都会在流可用之前发生。

相反,我们将get_messages保留为一个返回流的普通函数, 并启动一个任务来处理异步sleep调用。


现在我们的代码有了一个更有趣的结果。在每一对消息之间,出现了一个 Problem: Elapsed(()) 错误。

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'




首先,让我们创建另一个流,如果让它直接运行,它将每毫秒发出一个项目。为了简单起见,我们可以使用sleep函数来延迟发送消息,并将其与我们在get_messages中使用的方法结合起来,即从通道创建流。不同之处在于,这次我们将返回已过去的时间间隔的计数,因此返回类型将是impl Stream<Item = u32>,我们可以将这个函数称为get_intervals(参见清单17-36)。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };

            tx.send(format!("Message: '{message}'")).unwrap();


fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            count += 1;

Listing 17-36: Creating a stream with a counter that will be emitted once every millisecond

我们首先在任务中定义一个 count。(我们也可以在任务外部定义它,但限制任何给定变量的作用域会更清晰。)然后我们创建一个无限循环。循环的每次迭代都会异步休眠一毫秒,增加计数,然后通过通道发送。因为所有这些都包裹在由 spawn_task 创建的任务中,所以包括无限循环在内的所有内容都会与运行时一起被清理。


现在,回到我们主函数的 async 块中,我们可以尝试合并 messagesintervals 流,如清单 17-37 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals();
        let merged = messages.merge(intervals);

        while let Some(result) = merged.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };

            tx.send(format!("Message: '{message}'")).unwrap();


fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            count += 1;

Listing 17-37: Attempting to merge the messages and intervals streams

我们首先调用 get_intervals。然后我们使用 merge 方法合并 messagesintervals 流,该方法将多个流合并为一个流,该流在项目可用时立即从任何源流生成项目,而不强制执行任何特定顺序。最后,我们遍历这个组合流而不是遍历 messages

在这一点上,messagesintervals 都不需要被固定或可变, 因为它们将被合并成单一的 merged 流。然而,这个 对 merge 的调用无法编译!(while let 循环中的 next 调用也无法编译,但我们会稍后讨论这一点。)这是因为两个流具有 不同的类型。messages 流的类型为 Timeout<impl Stream<Item = String>>,其中 Timeout 是为 timeout 调用实现 Stream 的类型。intervals 流的类型为 impl Stream<Item = u32>。为了合并 这两个流,我们需要将其中一个转换为与另一个匹配。我们将 重新处理 intervals 流,因为 messages 已经是我们想要的基本格式,并且必须处理超时错误(参见清单 17-38)。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
        let merged = messages.merge(intervals);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };

            tx.send(format!("Message: '{message}'")).unwrap();


fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            count += 1;

Listing 17-38: Aligning the type of the the intervals stream with the type of the messages stream

首先,我们可以使用 map 辅助方法将 intervals 转换为字符串。其次,我们需要从 messages 中匹配 Timeout。然而,我们实际上并不希望 intervals 有超时,因此我们可以创建一个比我们使用的其他持续时间更长的超时。在这里,我们使用 Duration::from_secs(10) 创建了一个 10 秒的超时。最后,我们需要将 stream 设置为可变的,以便 while let 循环的 next 调用可以遍历流,并将其固定,以确保这样做是安全的。这让我们 几乎 达到了我们需要的地方。所有类型都检查无误。但是,如果你运行这个代码,会有两个问题。首先,它永远不会停止!你将需要使用 ctrl-c 来停止它。其次,来自英语字母的消息将被埋没在所有间隔计数器消息中:

Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43

列表 17-39 展示了解决最后两个问题的一种方法。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };

            tx.send(format!("Message: '{message}'")).unwrap();


fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            count += 1;

Listing 17-39: Using throttle and take to manage the merged streams

首先,我们使用 intervals 流上的 throttle 方法,以防止其压倒 messages 流。节流 是一种限制函数调用频率的方法——或者在这种情况下,限制流被轮询的频率。每 100 毫秒一次应该足够了,因为这大约是我们消息到达的频率。


现在当我们运行程序时,它在从流中拉取 20 个项目后停止, 并且时间间隔不会压倒消息。我们也不会得到 Interval: 100Interval: 200 等,而是得到 Interval: 1Interval: 2, 等等——尽管我们有一个 可以 每毫秒生成一个事件的源流。这是因为 throttle 调用生成了一个新的流,该流包装了原始流,使得原始流仅以节流速率被轮询,而不是其自身的“原生”速率。我们没有一堆未处理的时间间隔消息需要选择忽略。相反,我们从一开始就从未生成这些时间间隔消息!这是 Rust 的 futures 固有的“惰性”再次发挥作用,允许我们选择性能特性。

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

还有最后一件事需要处理:错误!对于这两个基于通道的流,当通道的另一端关闭时,send 调用可能会失败——这仅仅是运行时执行构成流的未来对象的方式的问题。到目前为止,我们通过调用 unwrap 忽略了这种可能性,但在一个行为良好的应用程序中,我们应该显式地处理错误,至少通过结束循环来避免尝试发送更多消息。列表 17-40 显示了一个简单的错误处理策略:打印问题,然后从循环中 break

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");


fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");

Listing 17-40: Handling errors and shutting down the loops


现在我们已经看到了很多异步的实际应用,让我们退一步,深入探讨一下 FutureStream 以及 Rust 用于实现异步的其他关键特质的一些细节。