Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

将控制权交给运行时

回想在“我们的第一个异步程序”部分中,每个await点,Rust都会给运行时一个机会,如果正在等待的未来尚未准备好,可以暂停任务并切换到另一个任务。相反的情况也是如此:Rust 在await点暂停异步块并将控制权交还给运行时。await点之间的所有内容都是同步的。

这意味着,如果你在一个异步块中进行大量工作而没有 await 点, 这个未来对象将会阻塞其他未来对象的进展。有时你可能会听到这种情况被称为一个未来对象 饿死 其他未来对象。在某些情况下, 这可能不是什么大问题。然而,如果你正在进行某种昂贵的设置或长时间运行的工作,或者如果你有一个未来对象将无限期地持续执行某项特定任务,你就需要考虑何时何地将控制权交还给运行时。

让我们模拟一个长时间运行的操作来说明饥饿问题,然后探讨如何解决它。清单 17-14 引入了一个 slow 函数。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-14: Using thread::sleep to simulate slow operations

这段代码使用 std::thread::sleep 而不是 trpl::sleep,因此调用 slow 将会阻塞当前线程一段时间(毫秒)。我们可以使用 slow 来模拟现实世界中既耗时又阻塞的操作。

在清单 17-15 中,我们使用 slow 来模拟在一对 future 中执行这种 CPU 密集型工作。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-15: Calling slow to simulate running slow operations

每个 future 只有在执行了一组慢操作之后才将控制权交还给运行时。如果你运行这段代码,你会看到如下输出:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

与清单 17-5 中我们使用 trpl::select 来竞速获取两个 URL 的 futures 一样,select 仍然在 a 完成时结束。不过,两个 futures 中的 slow 调用之间没有交错。a future 会一直执行直到 trpl::sleep 调用被等待,然后 b future 会一直执行直到它自己的 trpl::sleep 调用被等待,最后 a future 完成。为了允许两个 futures 在它们的慢任务之间取得进展,我们需要等待点以便我们可以将控制权交还给运行时。这意味着我们需要一些可以等待的东西!

我们已经在清单 17-15 中看到了这种交接:如果我们移除 a 未来末尾的 trpl::sleep,它将在 b 未来完全不运行的情况下完成。让我们尝试使用 trpl::sleep 函数作为让操作交替进行的起点,如清单 17-16 所示。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-16: Using trpl::sleep to let operations switch off making progress

我们已经在每次调用slow之间添加了trpl::sleep调用,并带有await点。 现在两个未来的任务是交错进行的:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

a 未来在将控制权交给 b 之前仍然运行一段时间,因为它在调用 trpl::sleep 之前先调用了 slow,但在那之后,每当其中一个到达 await 点时,这些未来就会来回交换。在这种情况下,我们在每次调用 slow 之后都这样做了,但我们可以根据需要以任何最合理的方式分配工作。

我们在这里其实并不想休眠:我们希望尽可能快地取得进展。我们只需要将控制权交还给运行时。我们可以直接使用trpl::yield_now函数来实现。在清单17-17中,我们将所有那些trpl::sleep调用替换为trpl::yield_now

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-17: Using yield_now to let operations switch off making progress

这段代码不仅更清楚地表达了实际意图,而且由于像 sleep 这样的定时器通常有粒度限制,因此可以显著快于使用 sleep。例如,我们使用的 sleep 版本,即使我们传递一个 Duration 为一纳秒,也会至少休眠一毫秒。再次强调,现代计算机 非常快:它们在一毫秒内可以完成很多事情!

这意味着,即使对于计算密集型任务,async 也可以非常有用,这取决于程序的其他部分在做什么,因为它提供了一种有用的工具来结构化程序不同部分之间的关系(但会带来异步状态机的开销)。这是一种协作式多任务处理,每个 future 都有权决定何时通过 await 点交出控制权。因此,每个 future 也有责任避免阻塞时间过长。在某些基于 Rust 的嵌入式操作系统中,这是唯一的多任务处理方式!

在实际代码中,你通常不会在每一行代码中交替使用函数调用和 await 点,当然。虽然以这种方式让出控制权相对便宜,但并非免费。在许多情况下,尝试将计算密集型任务分解可能会使其显著变慢,因此有时为了 整体 性能,让一个操作短暂阻塞会更好。始终测量以了解代码的实际性能瓶颈是什么。然而,如果你 确实 看到很多你期望并发执行的工作实际上是串行执行的,那么底层的动态机制就很重要了!

构建我们自己的异步抽象

我们还可以将多个 future 组合在一起以创建新的模式。例如,我们可以 使用我们已经拥有的异步构建块来构建一个 timeout 函数。完成后, 结果将是一个新的构建块,我们可以用它来创建更多的异步抽象。

列表 17-18 显示了我们期望这个 timeout 如何与一个慢的未来一起工作。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}
Listing 17-18: Using our imagined timeout to run a slow operation with a time limit

让我们来实现这个!首先,让我们思考一下 timeout 的 API:

  • 它本身需要是一个异步函数,这样我们才能等待它。
  • 它的第一个参数应该是要运行的 future。我们可以将其泛化以允许与任何 future 一起使用。
  • 其第二个参数将是最大等待时间。如果我们使用 Duration,这将使其易于传递给 trpl::sleep
  • 它应该返回一个 Result。如果未来完成成功,Result 将是 Ok,包含未来产生的值。如果超时先发生,Result 将是 Err,包含超时等待的持续时间。

列表 17-19 显示了此声明。

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}
Listing 17-19: Defining the signature of timeout

这满足了我们对类型的要求。现在让我们考虑需要的行为:我们希望将传入的未来与持续时间进行竞赛。我们可以使用trpl::sleep从持续时间创建一个计时器未来,并使用trpl::select来运行计时器与调用者传入的未来。

在清单 17-20 中,我们通过匹配 trpl::select 等待结果来实现 timeout

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::select(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
Listing 17-20: Defining timeout with select and sleep

trpl::select 的实现是不公平的:它总是按照传递的顺序轮询参数(其他 select 实现会随机选择首先轮询哪个参数)。因此,我们将 future_to_try 传递给 select 作为第一个参数,以便即使 max_time 非常短,它也有机会完成。如果 future_to_try 首先完成,select 将返回带有 future_to_try 输出的 Left。如果 timer 首先完成,select 将返回带有计时器输出 ()Right

如果 future_to_try 成功并且我们得到一个 Left(output),我们返回 Ok(output)。如果睡眠计时器到期并且我们得到一个 Right(()),我们用 _ 忽略 () 并返回 Err(max_time)

至此,我们已经使用两个其他异步助手构建了一个工作的timeout。如果我们运行我们的代码,它将在超时后打印失败模式:

Failed after 2 seconds

因为 Future 可以与其他 Future 组合,所以你可以使用较小的异步构建块来构建非常强大的工具。例如,你可以使用相同的方法将超时与重试组合,并进而将这些与网络调用(如清单 17-5 中的那些)一起使用。

在实际中,你通常会直接使用 asyncawait,并次要使用如 select 这样的函数和 join! 这样的宏来控制最外层的未来对象如何执行。

我们现在看到了同时处理多个未来的几种方法。 接下来,我们将看看如何随着时间的推移,使用来按顺序处理多个未来。