使用异步应用并发

在本节中,我们将把异步应用到我们在第16章用线程解决的一些相同的并发挑战上。因为我们已经在那讨论了很多关键概念,所以在本节中我们将重点关注线程和未来的不同之处。

在许多情况下,使用 async 进行并发操作的 API 与使用线程的 API 非常相似。在其他情况下,它们最终会变得相当不同。即使线程和 async 之间的 API 看起来相似,它们通常也有不同的行为——而且它们几乎总是有不同的性能特征。

使用spawn_task创建新任务

我们在使用spawn创建新线程中处理的第一个操作是在两个独立的线程上进行计数。 让我们使用async来做同样的事情。trpl库提供了一个spawn_task函数 它看起来非常类似于thread::spawn API,以及一个sleep函数 这是thread::sleep API的异步版本。我们可以将这些一起使用 来实现计数示例,如列表17-6所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: Creating a new task to print one thing while the main task prints something else

作为起点,我们使用 trpl::run 设置我们的 main 函数,以便我们的顶级函数可以是异步的。

注意:从本章的这一点开始,每个示例都将包含这个完全相同的包装代码,在main中使用trpl::run,因此我们通常会跳过它,就像我们对main所做的那样。不要忘记在你的代码中包含它!

然后我们在该块中编写两个循环,每个循环都包含一个 trpl::sleep 调用,等待半秒(500毫秒)后发送下一条消息。我们将一个循环放在 trpl::spawn_task 的主体中,另一个放在顶级 for 循环中。我们还在 sleep 调用后添加了一个 await

这段代码的行为与基于线程的实现类似——包括当你运行它时,可能会在你自己的终端中看到消息以不同的顺序出现:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

这个版本会在主异步块体中的 for 循环结束时停止,因为由 spawn_task 启动的任务在 main 函数结束时会被关闭。如果你想让它一直运行到任务完成,你需要使用一个 join 句柄来等待第一个任务完成。对于线程,我们使用 join 方法来“阻塞”直到线程运行完毕。在示例 17-7 中,我们可以使用 await 来做同样的事情,因为任务句柄本身就是一个未来。它的 Output 类型是一个 Result,所以在等待它之后我们还需要解开它。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: Using await with a join handle to run a task to completion

这个更新的版本运行直到两个循环都完成。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

到目前为止,看起来异步和线程给我们提供了相同的基本结果,只是语法不同:使用await而不是在连接句柄上调用join,以及等待sleep调用。

更大的不同在于我们不需要为此启动另一个操作系统线程。事实上,我们甚至不需要在这里启动一个任务。因为异步块编译为匿名未来,我们可以将每个循环放在一个异步块中,并使用trpl::join函数让运行时完成它们的执行。

在章节 使用 join 句柄等待所有线程完成 中,我们展示了如何在调用 std::thread::spawn 时返回的 JoinHandle 类型上使用 join 方法。trpl::join 函数类似,但用于未来值。当你给它两个未来值时,它会产生一个新的未来值,其输出是一个包含你传入的每个未来值输出的元组,一旦它们 完成。因此,在清单 17-8 中,我们使用 trpl::join 等待 fut1fut2 都完成。我们 等待 fut1fut2,而是等待由 trpl::join 产生的新未来值。我们忽略输出,因为这只是包含两个单元值的元组。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: Using trpl::join to await two anonymous futures

当我们运行这个时,我们看到两个未来都运行到了完成:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

现在,您每次都会看到完全相同的顺序,这与我们使用线程时看到的情况非常不同。这是因为 trpl::join 函数是 公平的,意味着它会同样频繁地检查每个未来,交替进行,而不会让一个在未来准备好时领先。对于线程,操作系统决定检查哪个线程以及让它运行多长时间。对于异步 Rust,运行时决定检查哪个任务。(实际上,细节会变得复杂,因为异步运行时可能在底层使用操作系统线程作为管理并发的一部分,因此保证公平性对运行时来说可能更复杂——但仍然是可能的!)运行时不必为任何给定操作保证公平性,它们通常提供不同的 API 让您选择是否需要公平性。

尝试这些关于等待未来的变体,看看它们的作用:

  • 移除围绕任一或两个循环的 async 块。
  • 在定义每个异步块后立即等待。
  • 仅将第一个循环包装在异步块中,并在第二个循环的主体之后等待生成的未来。

为了增加挑战性,看看你是否能在运行代码之前弄清楚每种情况下的输出!

使用消息传递在两个任务上计数

在 futures 之间共享数据也会很熟悉:我们将再次使用消息传递,但这次使用异步版本的类型和函数。我们将采取与使用消息传递在线程之间传输数据中稍有不同的路径,以说明基于线程的并发和基于 futures 的并发之间的一些关键差异。在示例 17-9 中,我们将从一个单一的 async 块开始——像我们之前创建单独的线程那样创建单独的任务。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}
Listing 17-9: Creating an async channel and assigning the two halves to tx and rx

这里,我们使用trpl::channel,这是我们在第16章使用线程时使用的多生产者、单消费者通道API的异步版本。异步版本的API与基于线程的版本只有 slight 不同:它使用可变而非不可变的接收者rx,并且其recv方法生成一个我们需要等待的 future,而不是直接生成值。现在我们可以从发送者向接收者发送消息。请注意,我们不需要启动一个单独的线程甚至任务;我们只需要等待rx.recv调用。

std::mpsc::channel 中的同步 Receiver::recv 方法会阻塞,直到收到消息。而 trpl::Receiver::recv 方法不会阻塞,因为它是一个异步方法。它不会阻塞,而是将控制权交还给运行时,直到收到消息或通道的发送端关闭。相比之下,我们不会等待 send 调用,因为它不会阻塞。它不需要阻塞,因为我们发送到的通道是无界的。

注意:因为所有这些异步代码都在 trpl::run 调用中的异步块中运行,所以其中的所有内容都可以避免阻塞。然而,外部的代码将在 run 函数返回时阻塞。这就是 trpl::run 函数的全部意义:它让你可以选择在何处阻塞一组异步代码,从而在同步和异步代码之间进行转换。在大多数异步运行时中,run 实际上被命名为 block_on,原因正是如此。

注意这个例子的两个方面。首先,消息会立即到达。 其次,虽然我们在这里使用了 future,但还没有并发。列表中的所有内容都是按顺序发生的,就像没有涉及 future 一样。

让我们先处理第一部分,通过发送一系列消息并在它们之间休眠,如清单 17-10 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: Sending and receiving multiple messages over the async channel and sleeping with an await between each message

除了发送消息外,我们还需要接收它们。在这种情况下, 因为我们知道有多少消息传入,我们可以通过手动调用rx.recv().await四次来实现。然而,在现实世界中,我们通常会等待一些未知数量的消息,所以我们需要一直等待,直到确定没有更多消息。

在清单 16-10 中,我们使用了一个 for 循环来处理从同步通道接收到的所有项。然而,Rust 尚未提供一种方法来编写一个 for 循环来处理 异步 的一系列项,因此我们需要使用一个我们之前未见过的循环:while let 条件循环。这是我们在 使用 if letlet else 简化控制流 部分中看到的 if let 构造的循环版本。只要循环指定的模式继续匹配值,循环就会继续执行。

rx.recv 调用生成一个未来,我们等待它。运行时会暂停这个未来,直到它准备就绪。一旦消息到达,未来将解析为 Some(message),每次消息到达都会如此。当通道关闭时,无论是否 任何 消息已到达,未来将解析为 None,以表示没有更多值,因此我们应该停止轮询——也就是说,停止等待。

while let 循环将所有这些内容整合在一起。如果调用 rx.recv().await 的结果是 Some(message),我们可以访问消息并在循环体中使用它,就像使用 if let 一样。如果结果是 None,循环结束。每次循环完成时,它都会再次遇到 await 点,因此运行时会再次暂停,直到另一条消息到达。

代码现在成功地发送和接收了所有的消息。不幸的是,仍然存在一些问题。首先,消息并不是每半秒到达一次。它们是在我们启动程序2秒(2,000毫秒)后一次性到达的。其次,这个程序也永远不会退出!相反,它会一直等待新的消息。您需要使用ctrl-c来关闭它。

让我们先来分析为什么消息会在完整的延迟后一次性到达,而不是每个消息之间有延迟。在给定的异步块中,await 关键字在代码中出现的顺序也是它们在程序运行时执行的顺序。

在清单 17-10 中只有一个异步块,因此其中的所有内容都是线性运行的。仍然没有并发。所有的 tx.send 调用都会发生,穿插着所有的 trpl::sleep 调用及其相关的 await 点。只有这样,while let 循环才能通过任何 recv 调用的 await 点。

为了获得我们想要的行为,即在每条消息之间发生睡眠延迟, 我们需要将txrx操作放在它们自己的异步块中,如清单17-11所示。然后运行时可以使用trpl::join分别执行每个操作,就像在计数示例中一样。同样,我们等待调用trpl::join的结果,而不是单独的未来。如果我们按顺序等待单独的未来,我们最终会回到顺序流程——这正是我们试图避免的。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: Separating send and recv into their own async blocks and awaiting the futures for those blocks

随着列表 17-11 中更新的代码,消息以 500 毫秒的间隔打印,而不是在 2 秒后一口气全部打印。

程序仍然永远不会退出,因为 while let 循环与 trpl::join 的交互方式:

  • trpl::join 返回的 future 只有在传递给它的 两个 future 都完成时才会完成。
  • tx 未来在发送 vals 中的最后一条消息后完成睡眠。
  • rx 未来不会完成,直到 while let 循环结束。
  • while let 循环不会结束,直到等待 rx.recv 产生 None
  • 等待 rx.recv 仅在通道的另一端关闭时返回 None
  • 通道只有在我们调用rx.close或发送方tx被丢弃时才会关闭。
  • 我们 nowhere 调用 rx.close,并且 tx 不会在传递给 trpl::run 的最外层异步块结束之前被丢弃。
  • 块无法结束,因为它被阻塞在 trpl::join 完成上,这又把我们带回到了这个列表的顶部。

我们可以通过在某个地方调用rx.close来手动关闭rx,但这没有太大意义。在处理了一些任意数量的消息后停止会使程序关闭,但我们可能会错过消息。我们需要某种其他方法来确保tx在函数结束之前被丢弃。

目前,我们发送消息的异步块只借用 tx,因为发送消息不需要所有权,但如果我们可以将 tx 移动到该异步块中,那么该块结束时 tx 就会被丢弃。在第 13 章的 捕获引用或移动所有权 部分,你学习了如何在闭包中使用 move 关键字,正如在第 16 章的 使用 move 闭包与线程 部分所讨论的,我们在使用线程时通常需要将数据移动到闭包中。同样的基本动态也适用于异步块,因此 move 关键字在异步块中与在闭包中一样有效。

在清单 17-12 中,我们将用于发送消息的块从 async 更改为 async move。当我们运行 版本的代码时,它会在发送和接收最后一条消息后优雅地关闭。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: A revision of the code from Listing 17-11 that correctly shuts down when complete

这个异步通道也是一个多生产者通道,因此如果我们要从多个 future 发送消息,可以对 tx 调用 clone,如清单 17-13 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}
Listing 17-13: Using multiple producers with async blocks

首先,我们克隆 tx,在第一个异步块外部创建 tx1。我们将 tx1 移入该块,就像之前处理 tx 一样。然后,稍后,我们将原始的 tx 移入一个 新的 异步块,在那里我们以稍微慢一点的延迟发送更多消息。我们碰巧将这个新的异步块放在接收消息的异步块之后,但它也可以放在前面。关键在于这些未来的等待顺序,而不是它们的创建顺序。

两个用于发送消息的异步块都需要是 async move 块,以便在这些块结束时 txtx1 都能被丢弃。否则,我们最终会回到最初的那个无限循环中。最后,我们从 trpl::join 切换到 trpl::join3 以处理额外的未来。

现在我们可以看到来自两个发送未来的所有消息,因为发送未来在发送后使用了略有不同的延迟,所以消息也在这些不同的时间间隔内被接收。

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

这是一个好的开始,但它将我们限制在只有少数几个未来:两个使用join,或三个使用join3。让我们看看如何处理更多的未来。