使用异步实现并发

在本节中,我们将把异步应用到我们在第16章用线程解决的一些相同的并发挑战上。因为我们已经在那讨论了很多关键概念,所以在本节中我们将重点关注线程和未来的不同之处。

在许多情况下,使用 async 进行并发操作的 API 与使用线程的 API 非常相似。在其他情况下,它们的形状会相当不同。即使线程和 async 之间的 API 看起来相似,它们通常也有不同的行为——而且它们几乎总是有不同的性能特征。

计数

我们在第16章中解决的第一个任务是在两个独立的线程上进行计数。 让我们使用异步来完成同样的任务。trpl crate 提供了一个 spawn_task 函数, 它看起来非常类似于 thread::spawn API,以及一个 sleep 函数, 这是 thread::sleep API 的异步版本。我们可以将这些一起使用, 在列表17-6中实现与使用线程相同的计数示例。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: Using spawn_task to count with two

作为起点,我们使用trpl::run设置我们的main函数,以便我们的顶级函数可以是异步的。

注意:从本章的这一点开始,每个示例都将包含这个完全相同的包装代码,在main中使用trpl::run,因此我们通常会跳过它,就像我们对main所做的那样。不要忘记在你的代码中包含它!

然后我们在该块中编写两个循环,每个循环中都有一个trpl::sleep调用, 它等待半秒(500毫秒)后发送下一条 消息。我们将一个循环放在trpl::spawn_task的主体中,另一个放在顶级for循环中。我们还在sleep调用后添加了一个await

这与基于线程的实现类似——包括当你运行它时,可能会在你自己的终端中看到消息以不同的顺序出现。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

这个版本在主异步块主体中的 for 循环完成后立即停止,因为由 spawn_task 启动的任务在主函数结束时被关闭。如果你想运行到任务完成,你需要使用 join 句柄来等待第一个任务完成。对于线程,我们使用 join 方法来“阻塞”直到线程运行完毕。在示例 17-7 中,我们可以使用 await 来做同样的事情,因为任务句柄本身是一个未来值。它的 Output 类型是一个 Result,所以在等待它之后我们还需要解开它。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: Using await with a join handle to run a task to completion

这个更新的版本运行直到两个循环都完成。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

到目前为止,看起来异步和线程给我们提供了相同的基本结果,只是语法不同:使用await而不是在连接句柄上调用join,以及等待sleep调用。

更大的不同在于我们不需要为此启动另一个操作系统线程。事实上,我们甚至不需要在这里启动一个任务。因为异步块编译为匿名未来,我们可以将每个循环放在一个异步块中,并使用trpl::join函数让运行时完成它们的执行。

在第 16 章中,我们展示了如何在调用 std::thread::spawn 时返回的 JoinHandle 类型上使用 join 方法。 trpl::join 函数类似,但用于 futures。当你给它两个 futures 时,它会产生一个新的单一 future,其输出是一个包含你传入的每个 future 的输出的元组,两者 都完成时。因此,在清单 17-8 中,我们使用 trpl::join 等待 fut1fut2 完成。我们等待 fut1fut2,而是等待由 trpl::join 产生的新 future。我们忽略输出,因为这只是包含两个单元值的元组。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: Using trpl::join to await two anonymous futures

当我们运行这个时,我们看到两个未来都运行到了完成:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

这里,你会看到完全相同的顺序,这与我们使用线程时看到的情况非常不同。那是因为 trpl::join 函数是 公平的,意味着它会同样频繁地检查每个未来值,交替进行,而不会让一个在未来值准备好时领先。对于线程,操作系统决定检查哪个线程以及让它运行多长时间。对于异步 Rust,运行时决定检查哪个任务。(实际上,细节会变得复杂,因为异步运行时可能在管理并发的过程中在底层使用操作系统线程,因此保证公平性对运行时来说可能更复杂——但仍然是可能的!)运行时不必为任何给定操作保证公平性,而且运行时通常提供不同的 API 让你选择是否需要公平性。

尝试这些不同的等待未来的变体,看看它们的作用:

  • 移除围绕任一或两个循环的 async 块。
  • 在定义每个异步块后立即等待。
  • 仅将第一个循环包装在异步块中,并在第二个循环的主体之后等待生成的未来。

为了增加挑战性,看看你是否能在运行代码之前弄清楚每种情况下的输出!

消息传递

在 futures 之间共享数据也会很熟悉:我们将再次使用消息传递,但这次使用类型和函数的异步版本。我们将采取与第 16 章稍有不同的路径,以说明基于线程的并发和基于 futures 的并发之间的一些关键差异。在示例 17-9 中,我们将从一个单独的 async 块开始—像我们之前启动一个单独的线程那样启动一个单独的任务。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}
Listing 17-9: Creating an async channel and assigning the two halves to tx and rx

这里,我们使用trpl::channel,这是我们在第16章使用线程时使用的多生产者、单消费者通道API的异步版本。异步版本的API与基于线程的版本只有 slight 不同:它使用可变而非不可变的接收者rx,并且其recv方法生成一个我们需要等待的 future,而不是直接生成值。现在我们可以从发送者向接收者发送消息。请注意,我们不需要启动一个单独的线程甚至任务;我们只需要等待rx.recv调用。

std::mpsc::channel 中的同步 Receiver::recv 方法会阻塞,直到收到消息。而 trpl::Receiver::recv 方法不会阻塞,因为它是一个异步方法。它不会阻塞,而是将控制权交还给运行时,直到收到消息或通道的发送端关闭。相比之下,我们不会等待 send 调用,因为它不会阻塞。它不需要阻塞,因为我们发送到的通道是无界的。

注意:因为所有这些异步代码都在 trpl::run 调用中的异步块中运行,所以其中的所有内容都可以避免阻塞。然而,外部 代码将在 run 函数返回时阻塞。这就是 trpl::run 函数的全部意义:它让你 选择 在哪里阻塞某些异步代码集,从而在同步和异步代码之间进行转换。在大多数异步运行时中,run 实际上被命名为 block_on,原因正是如此。

注意这个例子中的两点:首先,消息会立即到达! 其次,虽然我们在这里使用了未来对象,但还没有并发。列表中的所有内容都是按顺序发生的,就像没有涉及未来对象一样。

让我们通过发送一系列消息来处理第一部分,并在它们之间休眠,如清单 17-10 所示:

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: Sending and receiving multiple messages over the async channel and sleeping with an await between each message

除了发送消息外,我们还需要接收它们。在这种情况下,我们可以通过手动执行rx.recv().await四次来实现,因为我们知道将要接收多少条消息。然而,在现实世界中,我们通常会等待一些未知数量的消息。在这种情况下,我们需要持续等待,直到确定没有更多的消息。

在清单 16-10 中,我们使用了一个 for 循环来处理从同步通道接收到的所有项。然而,Rust 还没有提供一种方法来编写一个 for 循环来处理 异步 的一系列项。相反,我们需要使用一种我们之前没有见过的新类型的循环,即 while let 条件循环。while let 循环是我们在第 6 章中看到的 if let 构造的循环版本。只要循环指定的模式继续匹配值,循环就会继续执行。

rx.recv 调用生成一个 Future,我们等待它。运行时将暂停 Future 直到它准备就绪。一旦消息到达,future 将解析为 Some(message),每次消息到达时都会如此。当通道关闭时,无论是否 任何 消息已到达,future 将解析为 None 以表示没有更多值,我们应该停止轮询——也就是说,停止等待。

while let 循环将所有这些内容整合在一起。如果调用 rx.recv().await 的结果是 Some(message),我们可以访问消息并在循环体中使用它,就像使用 if let 一样。如果结果是 None,循环结束。每次循环完成时,它都会再次遇到 await 点,因此运行时会再次暂停,直到另一条消息到达。

代码现在成功地发送和接收了所有消息。不幸的是,仍然存在一些问题。首先,消息并不是每半秒到达一次。它们是在我们启动程序两秒(2,000 毫秒)后一次性到达的。其次,这个程序也永远不会退出!相反,它会永远等待新消息。您需要使用 ctrl-c 来关闭它。

让我们先来理解为什么所有的消息都在完整的延迟后一起到达,而不是在每条消息之间有延迟。在一个给定的 async 块中,await 关键字在代码中出现的顺序也是它们在运行程序时发生的顺序。

在清单 17-10 中只有一个异步块,因此其中的所有内容都是线性运行的。仍然没有并发。所有的 tx.send 调用都会发生,穿插着所有的 trpl::sleep 调用及其相关的 await 点。只有这样,while let 循环才能通过任何 recv 调用的 await 点。

为了获得我们想要的行为,即在接收每条消息之间发生睡眠延迟,我们需要将txrx操作放在它们自己的异步块中。然后,运行时可以使用trpl::join分别执行它们,就像在计数示例中一样。同样,我们等待调用trpl::join的结果,而不是单独的未来。如果我们按顺序等待单独的未来,我们最终会回到顺序流程——这正是我们试图避免的。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: Separating send and recv into their own async blocks and awaiting the futures for those blocks

随着清单 17-11 中更新的代码,消息以 500 毫秒的间隔打印,而不是在两秒后一口气全部打印。

程序仍然永远不会退出,因为 while let 循环与 trpl::join 的交互方式:

  • trpl::join 返回的 future 只有在传递给它的 两个 future 都完成时才会完成。
  • tx 未来在发送 vals 中的最后一条消息后完成睡眠。
  • rx 未来不会完成,直到 while let 循环结束。
  • while let 循环不会结束,直到等待 rx.recv 产生 None
  • 等待 rx.recv 仅在通道的另一端关闭时返回 None
  • 通道只有在我们调用rx.close或发送方tx被丢弃时才会关闭。
  • 我们 nowhere 调用 rx.close,并且 tx 不会在传递给 trpl::run 的最外层异步块结束之前被丢弃。
  • 这个块无法结束,因为它被阻塞在 trpl::join 完成上,这又把我们带回到了这个列表的顶部!

我们可以通过在某个地方调用rx.close来手动关闭rx,但这没有太大意义。在处理了一些任意数量的消息后停止会使程序关闭,但我们可能会错过消息。我们需要某种其他方法来确保tx在函数结束之前被丢弃。

目前,我们发送消息的异步块只借用了tx,因为发送消息不需要所有权,但如果我们可以将tx移动到那个异步块中,那么一旦该块结束,tx就会被丢弃。在第13章中,我们学习了如何在闭包中使用move关键字,而在第16章中,我们看到在使用线程时通常需要将数据移动到闭包中。同样的基本动态也适用于异步块,因此move关键字在异步块中的工作方式与在闭包中相同。

在清单 17-12 中,我们将发送消息的异步块从普通的 async 块更改为 async move 块。当我们运行 版本的代码时,它会在最后一条消息发送和接收后优雅地关闭。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                eprintln!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: A working example of sending and receiving messages between futures which correctly shuts down when complete

这个异步通道也是一个多生产者通道,因此如果我们想从多个 future 发送消息,可以对 tx 调用 clone。在示例 17-13 中,我们在第一个异步块外部克隆 tx,创建了 tx1。我们像之前对 tx 那样将 tx1 移入该块。然后,稍后,我们将原始的 tx 移入一个 新的 异步块,在稍慢的延迟下发送更多消息。我们碰巧将这个新的异步块放在接收消息的异步块之后,但它也可以放在之前。关键在于 future 被等待的顺序,而不是它们被创建的顺序。

两个用于发送消息的异步块都需要是 async move 块,这样当这些块结束时,txtx1 都会被丢弃。否则,我们将会回到最初的那个无限循环中。最后,我们从 trpl::join 切换到 trpl::join3 以处理额外的未来。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}
Listing 17-13: Using multiple producers with async blocks

现在我们可以看到来自两个发送未来的所有消息。因为发送未来在发送后使用了略有不同的延迟,所以消息也在这些不同的时间间隔内被接收。

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

这是一个好的开始,但它将我们限制在只有少数几个未来:两个使用join,或三个使用join3。让我们看看如何处理更多的未来。