将我们的单线程服务器转换为多线程服务器

目前,服务器会依次处理每个请求,这意味着它不会在第一个请求处理完成之前处理第二个连接。如果服务器收到越来越多的请求,这种串行执行将变得越来越不高效。如果服务器收到一个需要长时间处理的请求,后续的请求将不得不等待,直到长时间的请求处理完成,即使新的请求可以快速处理。我们需要解决这个问题,但首先,我们将看看问题的实际表现。

在当前服务器实现中模拟慢请求

我们将研究一个处理缓慢的请求如何影响向我们当前服务器实现发出的其他请求。清单 21-10 实现了处理对 /sleep 的请求,该请求具有模拟的缓慢响应,将导致服务器在响应前休眠 5 秒。

Filename: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: Simulating a slow request by sleeping for 5 seconds

我们从 if 切换到 match,因为我们现在有三种情况。我们需要显式地匹配 request_line 的切片,以对字符串字面值进行模式匹配;match 不像等式方法那样自动引用和取消引用。

第一个分支与列表 21-9 中的 if 块相同。第二个分支 匹配对 /sleep 的请求。当收到该请求时,服务器将在呈现成功的 HTML 页面之前 休眠 5 秒。第三个分支与列表 21-9 中的 else 块相同。

你可以看到我们的服务器是多么原始:真正的库会以一种简洁得多的方式处理多个请求的识别!

使用 cargo run 启动服务器。然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果你多次输入 / URI,就像之前一样,你会看到它响应很快。但如果你输入 /sleep 然后加载 /,你会看到 / 会等待 sleep 完成其完整的 5 秒后才加载。

我们可以使用多种技术来避免请求在慢请求后面堆积,包括在第 17 章中使用的方法异步;我们将要实现的是线程池。

使用线程池提高吞吐量

一个线程池是一组已创建的线程,它们在等待并准备好处理任务。当程序接收到新任务时,它会将线程池中的一个线程分配给该任务,该线程将处理任务。线程池中剩余的线程可用于处理在第一个线程处理期间到来的任何其他任务。当第一个线程完成其任务处理后,它会被返回到空闲线程池中,准备处理新任务。线程池允许你并发处理连接,提高服务器的吞吐量。

我们将线程池中的线程数量限制为一个小数目,以保护我们免受拒绝服务(DoS)攻击;如果我们为每个传入的请求创建一个新线程,那么有人对我们服务器发起1000万个请求就可能会耗尽我们服务器的所有资源,导致请求处理停滞。

而不是生成无限的线程,我们将有一个固定数量的线程在池中等待。进入的请求将被发送到池中进行处理。池将维护一个传入请求的队列。池中的每个线程将从这个队列中弹出一个请求,处理该请求,然后向队列请求另一个请求。通过这种设计,我们可以同时处理多达 N 个请求,其中 N 是线程的数量。如果每个线程都在响应一个长时间运行的请求,后续的请求仍然可以在队列中积压,但我们已经增加了在达到这一点之前可以处理的长时间运行请求的数量。

这种技术只是提高Web服务器吞吐量的众多方法之一。其他你可以探索的选项包括fork/join模型单线程异步I/O模型多线程异步I/O模型。如果你对这个话题感兴趣,可以阅读更多关于其他解决方案的信息并尝试实现它们;使用像Rust这样的低级语言,所有这些选项都是可能的。

在我们开始实现线程池之前,让我们先谈谈使用线程池应该是什么样子。当你试图设计代码时,先编写客户端接口可以帮助指导你的设计。编写代码的API,使其结构符合你希望调用的方式;然后在该结构内实现功能,而不是先实现功能再设计公共API。

类似于我们在第12章的项目中使用测试驱动开发的方式,我们将在这里使用编译器驱动开发。我们将编写调用我们想要的函数的代码,然后查看编译器的错误以确定我们应该做哪些更改使代码正常工作。不过,在此之前,我们将探讨我们不会使用的技术作为起点。

为每个请求创建一个线程

首先,让我们看看如果为每个连接创建一个新线程,代码可能会是什么样子。如前所述,这不是我们的最终计划,因为可能会产生无限数量的线程,但这是一个起点,首先实现一个多线程服务器。然后我们将添加线程池作为改进,对比两种解决方案将更容易。清单 21-11 显示了对 main 的更改,以在 for 循环中为每个流生成一个新线程。

Filename: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: Spawning a new thread for each stream

正如你在第 16 章中学到的,thread::spawn 将创建一个新线程,然后在新线程中运行闭包中的代码。如果你运行此代码并在浏览器中加载 /sleep,然后在另外两个浏览器标签中加载 /,你确实会看到对 / 的请求不必等待 /sleep 完成。然而,正如我们提到的,这最终会压垮系统,因为你将无限制地创建新线程。

您可能还记得第 17 章中提到的,这正是 async 和 await 大放异彩的情况!在我们构建线程池时,请记住这一点,并思考如果使用 async 事情会有什么不同或相同。

创建有限数量的线程

我们希望我们的线程池以类似且熟悉的方式工作,以便从线程切换到线程池时,不需要对使用我们API的代码进行大的更改。清单21-12展示了我们希望使用的ThreadPool结构体的假设接口,用以替代thread::spawn

Filename: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: Our ideal ThreadPool interface

我们使用ThreadPool::new来创建一个具有可配置线程数的新线程池,在这种情况下是四个。然后,在for循环中,pool.execute的接口与thread::spawn类似,因为它接受一个闭包,池中的每个流都应该运行这个闭包。我们需要实现pool.execute,以便它接受闭包并将其交给池中的一个线程来运行。这段代码还不能编译,但我们会尝试编译,以便编译器可以指导我们如何修复它。

使用编译器驱动开发构建 ThreadPool

在清单 21-12 中对 src/main.rs 做出更改,然后让我们使用 cargo check 的编译错误来驱动我们的开发。这是我们得到的第一个错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

很好!这个错误告诉我们需要一个ThreadPool类型或模块,所以我们现在就来构建一个。我们的ThreadPool实现将独立于我们的Web服务器正在执行的工作类型。因此,让我们将hello crate从一个二进制crate转换为一个库crate来保存我们的ThreadPool实现。在转换为库crate之后,我们还可以使用单独的线程池库来执行我们想要使用线程池的任何工作,而不仅仅是处理Web请求。

创建一个 src/lib.rs,其中包含以下内容,这是我们目前可以拥有的最简单的 ThreadPool 结构体定义:

Filename: src/lib.rs
pub struct ThreadPool;

然后编辑 main.rs 文件,通过在 src/main.rs 的顶部添加以下代码将 ThreadPool 从库 crate 引入作用域:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

这段代码仍然无法工作,但让我们再次检查它以获取需要解决的下一个错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

这个错误表明接下来我们需要为 ThreadPool 创建一个名为 new 的关联函数。我们还知道 new 需要有一个参数,可以接受 4 作为参数,并且应该返回一个 ThreadPool 实例。让我们实现一个最简单的 new 函数,它将具有这些特征:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

我们选择usize作为size参数的类型,因为我们知道负数的线程数量是没有意义的。我们还知道我们将使用这个4作为线程集合中元素的数量,这就是usize类型的用途,如在第3章的“整数类型”部分所讨论的。

让我们再次检查代码:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

现在错误发生是因为我们在 ThreadPool 上没有 execute 方法。 回想 “创建有限数量的线程” 部分,我们 决定我们的线程池应该有一个类似于 thread::spawn 的接口。此外,我们将实现 execute 函数,使其接收给定的闭包并将其交给池中的空闲线程来运行。

我们将为 ThreadPool 定义 execute 方法,以闭包作为参数。回想第 13 章的 “将捕获的值移出闭包和 Fn 特性” 部分,我们可以使用三种不同的特性来接受闭包作为参数:FnFnMutFnOnce。我们需要决定在这里使用哪种闭包。我们知道最终会做类似于标准库 thread::spawn 实现的事情,因此可以查看 thread::spawn 的参数签名有哪些约束。文档显示如下:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

F 类型参数是我们这里关注的;T 类型参数与返回值相关,我们不关心这个。我们可以看到 spawn 使用 FnOnce 作为 F 的特征约束。这可能也是我们想要的,因为最终我们会将 execute 中接收到的参数传递给 spawn。我们可以更加确信 FnOnce 是我们想要使用的特征,因为运行请求的线程只会执行该请求的闭包一次,这与 FnOnce 中的 Once 相符。

F 类型参数也有 Send 特性边界和 'static 生命周期边界,这些在我们的情况下非常有用:我们需要 Send 来将闭包从一个线程传递到另一个线程,以及 'static 是因为我们不知道线程执行需要多长时间。让我们在 ThreadPool 上创建一个 execute 方法,该方法将接受一个具有这些边界泛型参数 F

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

我们仍然在 FnOnce 后使用 (),因为这个 FnOnce 表示一个不接受任何参数并返回单元类型 () 的闭包。就像函数定义一样,返回类型可以从签名中省略,但即使我们没有参数,仍然需要括号。

再次,这是 execute 方法的最简单实现:它什么也不做,但我们只是试图让我们的代码编译。让我们再次检查它:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

它编译了!但请注意,如果您尝试cargo run并在浏览器中发出请求,您会看到我们在本章开头看到的错误。我们的库实际上还没有调用传递给execute的闭包!

注意:你可能会听到关于像 Haskell 和 Rust 这样具有严格编译器的语言的说法是,“如果代码编译,它就能工作。” 但这句话并不普遍正确。我们的项目编译了,但它什么也不做!如果我们正在构建一个真实、完整的项目,现在是开始编写单元测试以检查代码是否编译 并且 具有我们想要的行为的好时机。

考虑:如果我们执行的是一个future而不是闭包,这里会有哪些不同?

验证 new 中的线程数量

我们没有对 newexecute 的参数做任何处理。让我们实现这些函数的主体,以实现我们想要的行为。首先,让我们思考 new。早些时候,我们为 size 参数选择了一个无符号类型,因为拥有负数个线程的池是没有意义的。然而,拥有零个线程的池也没有意义,但零是一个完全有效的 usize。我们将在返回 ThreadPool 实例之前添加代码来检查 size 是否大于零,并使用 assert! 宏在接收到零时使程序崩溃,如清单 21-13 所示。

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: Implementing ThreadPool::new to panic if size is zero

我们还为我们的ThreadPool 添加了一些文档,使用了文档注释。 请注意,我们遵循了良好的文档实践,添加了一个部分,指出了我们的函数可能 panic 的情况,正如在第 14 章中讨论的那样。 尝试运行 cargo doc --open 并点击 ThreadPool 结构体,看看生成的 new 方法的文档是什么样的!

而不是像这里所做的那样添加 assert! 宏,我们可以将 new 改为 build 并返回一个 Result,就像我们在列表 12-9 的 I/O 项目中对 Config::build 所做的那样。但在这种情况下,我们决定尝试创建一个没有任何线程的线程池应该是一个不可恢复的错误。如果你雄心勃勃,试着编写一个名为 build 的函数,其签名如下,以便与 new 函数进行比较:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

为线程创建存储空间

现在我们有了一个方法来确定存储在池中的线程数量是有效的,我们可以在返回结构体之前创建这些线程并将它们存储在ThreadPool结构体中。但是我们如何“存储”一个线程?让我们再来看看thread::spawn的签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn 函数返回一个 JoinHandle<T>,其中 T 是闭包返回的类型。让我们也尝试使用 JoinHandle 并看看会发生什么。在我们的情况下,传递给线程池的闭包将处理连接并且不返回任何内容,因此 T 将是单元类型 ()

列表 21-14 中的代码可以编译,但还没有创建任何线程。 我们已经将 ThreadPool 的定义更改为持有 thread::JoinHandle<()> 实例的向量,用 size 初始化了向量的容量,设置了一个 for 循环来运行一些创建线程的代码,并返回了一个包含这些线程的 ThreadPool 实例。

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: Creating a vector for ThreadPool to hold the threads

我们已经将std::thread引入到库crate中,因为我们正在使用thread::JoinHandle作为ThreadPool中向量项的类型。

一旦收到有效的大小,我们的ThreadPool将创建一个可以容纳size个项目的向量。with_capacity函数执行与Vec::new相同的任务,但有一个重要的区别:它预先分配向量中的空间。因为我们知道需要在向量中存储size个元素,所以预先进行这种分配比使用Vec::new稍微更有效,后者在插入元素时会自行调整大小。

当你再次运行 cargo check 时,它应该成功。

一个负责从ThreadPool向线程发送代码的Worker结构体

我们在清单 21-14 的 for 循环中留下了一个关于创建线程的注释。在这里,我们将看看我们实际上是如何创建线程的。标准库提供了 thread::spawn 作为创建线程的一种方式,thread::spawn 期望在创建线程时立即运行的代码。然而,在我们的情况下,我们希望创建线程并让它们 等待 我们稍后发送的代码。标准库的线程实现不包括任何实现这一点的方法;我们必须手动实现它。

我们将通过在ThreadPool和线程之间引入一个新的数据结构来实现这种行为,这个数据结构将管理这种新行为。我们将这个数据结构称为Worker,这是池化实现中常用的一个术语。Worker会拾取需要运行的代码,并在Worker的线程中运行这些代码。想象一下餐厅厨房里的工作人员:工作人员会等待顾客的订单到来,然后他们负责接收这些订单并完成它们。

而不是在线程池中存储 JoinHandle<()> 实例的向量,我们将存储 Worker 结构的实例。每个 Worker 将存储一个 JoinHandle<()> 实例。然后我们将在 Worker 上实现一个方法,该方法将接受一个要运行的闭包代码,并将其发送到已经运行的线程中执行。我们还将给每个工作线程一个 id,以便在日志记录或调试时区分池中的不同工作线程。

这是我们将要在创建ThreadPool时发生的新流程。我们将在Worker以这种方式设置好之后,实现将闭包发送到线程的代码。

  1. 定义一个 Worker 结构体,其中包含一个 id 和一个 JoinHandle<()>
  2. ThreadPool 更改为持有 Worker 实例的向量。
  3. 定义一个 Worker::new 函数,该函数接受一个 id 编号,并返回一个持有 id 和使用空闭包创建的线程的 Worker 实例。
  4. ThreadPool::new 中,使用 for 循环计数器生成一个 id,使用该 id 创建一个新的 Worker,并将工作线程存储在向量中。

如果您喜欢挑战,可以尝试在查看清单 21-15 中的代码之前自己实现这些更改。

准备好?以下是带有其中一种方法的清单 21-15,用于进行上述修改。

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-15: Modifying ThreadPool to hold Worker instances instead of holding threads directly

我们将 ThreadPool 上的字段名从 threads 更改为 workers,因为现在它保存的是 Worker 实例而不是 JoinHandle<()> 实例。我们在 for 循环中使用计数器作为 Worker::new 的参数,并将每个新的 Worker 存储在名为 workers 的向量中。

外部代码(如 src/main.rs 中的服务器)不需要知道在 ThreadPool 内部使用 Worker 结构体的实现细节,因此我们将 Worker 结构体及其 new 函数设为私有。Worker::new 函数使用我们提供的 id,并存储一个通过使用空闭包创建的新线程生成的 JoinHandle<()> 实例。

注意:如果操作系统因为系统资源不足而无法创建线程,thread::spawn 将会 panic。这将导致我们的整个服务器 panic,即使某些线程的创建可能成功。为了简单起见,这种行为是可以接受的,但在生产环境的线程池实现中,你可能希望使用 std::thread::Builder 及其 spawn 方法,该方法返回 Result

这段代码将编译,并将我们作为 ThreadPool::new 的参数指定的 Worker 实例的数量存储起来。但我们 仍然 没有处理我们在 execute 中接收到的闭包。接下来让我们看看如何处理这个问题。

通过通道向线程发送请求

我们接下来要解决的问题是,传递给thread::spawn的闭包什么也不做。目前,我们在execute方法中获取要执行的闭包。但是我们需要在创建ThreadPool时,为每个Worker的创建提供一个闭包,让thread::spawn运行。

我们希望我们刚刚创建的Worker结构体能够从ThreadPool中持有的队列中获取要运行的代码,并将该代码发送到其线程以运行。

我们在第16章中学到的通道——一种在两个线程之间进行简单通信的方式——非常适合这种情况。我们将使用一个通道作为作业队列,execute 将从 ThreadPoolWorker 实例发送作业,后者将作业发送到其线程。以下是计划:

  1. ThreadPool 将创建一个通道并持有发送者。
  2. 每个 Worker 都将持有接收者。
  3. 我们将创建一个新的Job结构体,用于保存我们想要通过通道发送的闭包。
  4. execute 方法会通过发送者发送它想要执行的任务。
  5. 在其线程中,Worker 将在其接收器上循环并执行它收到的任何作业的闭包。

让我们先在 ThreadPool::new 中创建一个通道,并在 ThreadPool 实例中持有发送者,如清单 21-16 所示。Job 结构体目前不持有任何内容,但将是我们在通道中发送的项的类型。

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-16: Modifying ThreadPool to store the sender of a channel that transmits Job instances

ThreadPool::new 中,我们创建了新的通道,并让线程池持有发送者。这将成功编译。

让我们尝试在每个工作线程中传递通道的接收者,因为线程池创建了通道。我们知道我们希望在工作线程启动的线程中使用接收者,所以我们将引用receiver参数在闭包中。清单21-17中的代码还不能完全编译。

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-17: Passing the receiver to the workers

我们做了一些小而简单的更改:我们将接收者传递给Worker::new,然后在闭包中使用它。

当我们尝试检查这段代码时,我们得到了这个错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

代码试图将receiver传递给多个Worker实例。这 不会奏效,正如你在第16章中所回忆的:Rust提供的通道实现是多个生产者,单个消费者。这意味着我们不能 仅仅克隆通道的消费端来修复这段代码。我们也不希望将消息多次发送给多个消费者;我们希望有一个消息列表和多个工作线程,使得每个消息只被处理一次。

此外,从通道队列中获取任务涉及修改receiver,因此线程需要一种安全的方式来共享和修改receiver;否则,我们可能会遇到竞争条件(如第16章所述)。

回顾第 16 章讨论的线程安全智能指针:要跨多个线程共享所有权并允许线程修改值,我们需要使用 Arc<Mutex<T>>Arc 类型将让多个工作线程拥有接收者,而 Mutex 将确保一次只有一个工作线程从接收者那里获取任务。列表 21-18 显示了我们需要进行的更改。

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-18: Sharing the receiver among the workers using Arc and Mutex

ThreadPool::new 中,我们将接收者放入一个 Arc 和一个 Mutex 中。对于每个新的工作线程,我们克隆 Arc 以增加引用计数,这样工作线程就可以共享接收者的所有权。

有了这些更改,代码编译成功了!我们快到了!

实现 execute 方法

让我们最终在 ThreadPool 上实现 execute 方法。我们还将 Job 从一个结构体改为一个类型别名,该别名表示 execute 接收的闭包类型。正如在第 20 章的 “使用类型别名创建类型同义词” 部分讨论的那样,类型别名允许我们将长类型缩短以方便使用。请参见示例 21-19。

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-19: Creating a Job type alias for a Box that holds each closure and then sending the job down the channel

在使用 execute 中获得的闭包创建一个新的 Job 实例后,我们将该任务通过通道的发送端发送出去。我们在 send 上调用 unwrap 以处理发送失败的情况。例如,如果我们停止所有线程的执行,接收端停止接收新消息时,这种情况可能会发生。目前,我们无法停止线程的执行:只要线程池存在,我们的线程就会继续执行。我们使用 unwrap 的原因是我们知道失败的情况不会发生,但编译器不知道这一点。

但我们的工作还没有完全完成!在工作线程中,传递给 thread::spawn 的闭包仍然只是 引用 通道的接收端。相反,我们需要闭包无限循环,向通道的接收端请求任务,并在接收到任务时运行任务。让我们对 Worker::new 进行如清单 21-20 所示的更改。

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}
Listing 21-20: Receiving and executing the jobs in the worker’s thread

这里,我们首先调用 receiver 上的 lock 来获取互斥锁,然后我们调用 unwrap 来在任何错误时引发 panic。获取锁可能会失败,如果互斥锁处于 中毒 状态,这可能是因为其他线程在持有锁时引发了 panic 而不是释放锁。在这种情况下,调用 unwrap 使当前线程 panic 是正确的处理方式。你可以自由地将这个 unwrap 更改为带有对你有意义的错误消息的 expect

如果我们获得了互斥锁,我们调用recv从通道接收一个Job。最后的unwrap也会跳过这里可能出现的任何错误,这可能发生在持有发送方的线程已关闭的情况下,类似于send方法在接收方关闭时返回Err

recv 的调用是阻塞的,因此如果没有任务,当前线程将等待任务变得可用。Mutex<T> 确保一次只有一个 Worker 线程尝试请求任务。

我们的线程池现在处于工作状态!给它一个cargo run并发出一些请求:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功!我们现在有一个线程池,可以异步执行连接。 线程池中永远不会创建超过四个线程,因此即使服务器收到大量请求,我们的系统也不会过载。如果我们向 /sleep 发出请求,服务器将能够通过其他线程运行请求来为其他请求提供服务。

注意:如果您同时在多个浏览器窗口中打开/sleep,它们可能会每隔5秒依次加载。一些网络浏览器出于缓存原因会顺序执行同一请求的多个实例。此限制并非由我们的网络服务器引起。

这是暂停并考虑如果使用 future 而不是闭包来完成工作,清单 21-18、21-19 和 21-20 中的代码会有什么不同的好时机。哪些类型会改变?方法签名会有哪些不同,如果有的话?代码的哪些部分会保持不变?

在学习了第17章和第18章的while let循环之后,你可能会疑惑为什么我们没有像清单21-21那样编写工作线程代码。

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-21: An alternative implementation of Worker::new using while let

这段代码可以编译和运行,但不会产生预期的线程行为:一个慢请求仍然会导致其他请求等待处理。原因比较微妙:Mutex 结构体没有公共的 unlock 方法,因为锁的所有权基于 MutexGuard<T>lock 方法返回的 LockResult<MutexGuard<T>> 中的生命周期。在编译时,借用检查器可以强制执行规则,即除非我们持有锁,否则不能访问由 Mutex 保护的资源。然而,如果我们在意 MutexGuard<T> 的生命周期,这种实现也可能导致锁被持有时间超过预期。

列表 21-20 中的代码使用 let job = receiver.lock().unwrap().recv().unwrap(); 可以工作,因为使用 let 时,等号右侧表达式中使用的任何临时值在 let 语句结束时会立即被丢弃。然而,while let(以及 if letmatch)不会在关联块结束之前丢弃临时值。在列表 21-21 中,锁在整个调用 job() 的期间一直被持有,这意味着其他工作线程无法接收任务。