共享状态并发

消息传递是处理并发的一种好方法,但并不是唯一的方法。另一种方法是多个线程访问相同的共享数据。再次考虑 Go 语言文档中的这句口号:“不要通过共享内存来通信。”

共享内存进行通信会是什么样子?此外,为什么消息传递的支持者会警告不要使用内存共享?

在某种程度上,任何编程语言中的通道都类似于单一所有权,因为一旦你通过通道传递一个值,你就不再应该使用该值。共享内存并发类似于多重所有权:多个线程可以同时访问同一内存位置。正如你在第 15 章中看到的,智能指针使得多重所有权成为可能,多重所有权可以增加复杂性,因为这些不同的所有者需要管理。Rust 的类型系统和所有权规则在正确管理这些所有者方面提供了很大帮助。作为一个例子,让我们看看互斥锁,这是共享内存中最常见的并发原语之一。

使用 Mutex 允许一次只有一个线程访问数据

Mutex互斥 的缩写,也就是说,一个 mutex 只允许一个线程在任何时候访问某些数据。要访问 mutex 中的数据,线程必须首先通过请求获取 mutex 的 来表明它希望访问。锁是 mutex 的一部分,是一个跟踪谁当前拥有数据独占访问权的数据结构。因此,mutex 通过锁定系统 保护 它持有的数据。

Mutexes 有 Reputation 为难以使用,因为您必须记住两条规则:

  • 您必须在使用数据之前尝试获取锁。
  • 当你完成对互斥锁保护的数据的操作后,必须解锁数据,以便其他线程可以获取锁。

为了将互斥锁比喻成现实生活中的例子,想象一下在会议上的一个小组讨论,只有一个麦克风。在小组成员可以发言之前,他们必须请求或示意他们想要使用麦克风。当他们拿到麦克风时,他们可以随心所欲地讲话,然后将麦克风交给下一个请求发言的小组成员。如果一个小组成员在完成发言后忘记传递麦克风,其他人就无法发言。如果共享麦克风的管理出现问题,小组讨论将无法按计划进行!

管理互斥锁可能非常棘手,这就是为什么那么多人对通道充满热情。然而,得益于 Rust 的类型系统和所有权规则,你不会把加锁和解锁弄错。

互斥锁 Mutex<T> 的 API

作为如何使用互斥锁的一个示例,让我们先在一个单线程环境中使用互斥锁,如清单 16-12 所示:

Filename: src/main.rs
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {m:?}");
}
Listing 16-12: Exploring the API of Mutex<T> in a single-threaded context for simplicity

与许多类型一样,我们使用关联函数 new 创建一个 Mutex<T>。 要访问互斥锁中的数据,我们使用 lock 方法来获取锁。此调用将阻塞当前线程,直到轮到我们拥有锁为止。

调用 lock 会在另一个线程持有锁时发生恐慌而失败。在这种情况下,将没有人能够获取锁,所以我们选择 unwrap,如果我们在那种情况下,让这个线程发生恐慌。

在获取锁之后,我们可以将返回值(在这个例子中命名为num)视为对内部数据的可变引用。类型系统确保我们在使用m中的值之前获取锁。m的类型是Mutex<i32>,而不是i32,所以我们必须调用lock才能使用i32值。我们不能忘记;类型系统不会让我们以其他方式访问内部的i32

正如你可能猜到的,Mutex<T> 是一个智能指针。更准确地说,对 lock 的调用 返回 一个名为 MutexGuard 的智能指针,该指针被包装在一个我们通过调用 unwrap 处理的 LockResult 中。MutexGuard 智能指针实现了 Deref 以指向我们的内部数据;该智能指针还具有一个 Drop 实现,当 MutexGuard 超出作用域时会自动释放锁,这发生在内部作用域结束时。因此,我们不会忘记释放锁,从而阻止其他线程使用互斥锁,因为锁的释放是自动进行的。

释放锁后,我们可以打印互斥锁的值并看到我们能够将内部i32更改为6。

在多个线程之间共享 Mutex<T>

现在,让我们尝试使用Mutex<T>在多个线程之间共享一个值。 我们将启动10个线程,让它们各自将计数器值加1,从而使计数器从0增加到10。列表16-13中的下一个示例将出现编译器错误,我们将利用该错误来更多地了解使用Mutex<T>以及Rust如何帮助我们正确使用它。

Filename: src/main.rs
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Listing 16-13: Ten threads each increment a counter guarded by a Mutex<T>

我们创建一个 counter 变量来在 Mutex<T> 中保存一个 i32,就像我们在清单 16-12 中所做的那样。接下来,我们通过遍历一个数字范围来创建 10 个线程。我们使用 thread::spawn 并给所有线程相同的闭包:一个将计数器移动到线程中,通过调用 lock 方法获取 Mutex<T> 的锁,然后将互斥锁中的值加 1。当线程完成运行其闭包时,num 将超出作用域并释放锁,以便其他线程可以获取它。

在主线程中,我们收集所有的 join 句柄。然后,像我们在清单 16-2 中所做的那样,我们对每个句柄调用 join 以确保所有线程都完成。在那之后,主线程将获取锁并打印此程序的结果。

我们暗示过这个例子无法编译。现在让我们找出原因!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8  |     for _ in 0..10 {
   |     -------------- inside of this loop
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
8  ~     let mut value = counter.lock();
9  ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

错误消息指出 counter 的值在循环的前一次迭代中被移动了。Rust 告诉我们不能将 counter 的所有权移动到多个线程中。让我们用我们在第 15 章讨论的多所有权方法来修复编译器错误。

多线程中的多重所有权

在第 15 章中,我们通过使用智能指针 Rc<T> 创建引用计数的值来为一个值赋予多个所有者。让我们在这里做同样的事情,看看会发生什么。我们将在清单 16-14 中将 Mutex<T> 包装在 Rc<T> 中,并在将所有权移动到线程之前克隆 Rc<T>

Filename: src/main.rs
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Listing 16-14: Attempting to use Rc<T> to allow multiple threads to own the Mutex<T>

再一次,我们编译并得到……不同的错误!编译器正在教我们很多。

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
   --> src/main.rs:11:36
    |
11  |           let handle = thread::spawn(move || {
    |                        ------------- ^------
    |                        |             |
    |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
    | |                      |
    | |                      required by a bound introduced by this call
12  | |             let mut num = counter.lock().unwrap();
13  | |
14  | |             *num += 1;
15  | |         });
    | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
    |
    = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`, which is required by `{closure@src/main.rs:11:36: 11:43}: Send`
note: required because it's used within this closure
   --> src/main.rs:11:36
    |
11  |         let handle = thread::spawn(move || {
    |                                    ^^^^^^^
note: required by a bound in `spawn`
   --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/std/src/thread/mod.rs:675:8
    |
672 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    |        ----- required by a bound in this function
...
675 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

哇,这个错误信息非常冗长!这里需要关注的重要部分是:`Rc<Mutex<i32>>` 不能在线程间安全地传递。编译器还告诉我们原因:`Rc<Mutex<i32>>` 没有实现 `Send` 特性。我们将在下一节讨论 Send:它是确保我们在线程中使用的类型适用于并发情况的特性之一。

不幸的是,Rc<T> 不能安全地在多个线程间共享。当 Rc<T> 管理引用计数时,它会在每次调用 clone 时增加计数,并在每个克隆被销毁时减少计数。但它没有使用任何 并发原语来确保计数的更改不会被另一个线程中断。这可能会导致计数错误——细微的错误可能会导致内存泄漏或在我们尚未完成使用某个值时将其销毁。我们需要一个类型,就像 Rc<T> 一样,但能够以线程安全的方式更改引用计数。

使用 Arc<T> 进行原子引用计数

Fortunately, Arc<T> 一个像 Rc<T> 一样的类型,可以在并发情况下安全使用。a 代表 原子,意味着它是一个 原子引用计数 类型。原子是另一种并发原语,我们在这里不会详细讨论:有关更多详细信息,请参阅标准库文档 std::sync::atomic。目前,你只需要知道原子像基本类型一样工作,但可以在线程间安全共享。

你可能会想知道为什么所有原始类型都不是原子的,为什么标准库类型默认没有实现使用Arc<T>。原因是线程安全会带来性能开销,你只在真正需要时才愿意承担。如果你只是在单个线程内对值进行操作,你的代码在不需要强制执行原子性提供的保证时可以运行得更快。

让我们回到我们的例子:Arc<T>Rc<T> 有相同的 API,所以我们通过更改 use 行、对 new 的调用以及对 clone 的调用来修复我们的程序。清单 16-15 中的代码最终将编译并运行:

Filename: src/main.rs
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Listing 16-15: Using an Arc<T> to wrap the Mutex<T> to be able to share ownership across multiple threads

这段代码将打印以下内容:

Result: 10

我们做到了!我们从0数到了10,这可能看起来并不太了不起,但这确实让我们学到了很多关于Mutex<T>和线程安全的知识。你也可以利用这个程序的结构来执行比简单递增计数器更复杂的操作。使用这种策略,你可以将计算分解为独立的部分,将这些部分分配到不同的线程上,然后使用Mutex<T>让每个线程用其部分结果更新最终结果。

请注意,如果您正在进行简单的数值操作,标准库的std::sync::atomic 模块提供了比Mutex<T>更简单的类型。这些类型提供了对基本类型的线程安全、并发、原子访问。我们在这个例子中选择使用带有基本类型的Mutex<T>,以便集中讲解Mutex<T>的工作原理。

RefCell/Rc 和 Mutex/Arc 之间的相似性

你可能已经注意到 counter 是不可变的,但我们能够获取其内部值的可变引用;这意味着 Mutex<T> 提供了内部可变性,就像 Cell 系列一样。同样地,我们在第 15 章中使用 RefCell<T> 允许我们修改 Rc<T> 内部的内容,我们使用 Mutex<T> 来修改 Arc<T> 内部的内容。

另一个需要注意的细节是,Rust 不能保护你免受所有类型的逻辑错误,当你使用 Mutex<T> 时。回想第 15 章,使用 Rc<T> 会带来创建引用循环的风险,即两个 Rc<T> 值相互引用,导致内存泄漏。同样,Mutex<T> 也存在创建 死锁 的风险。这些情况发生在某个操作需要锁定两个资源,而两个线程各自获取了一个锁,导致它们无限期地相互等待。如果你对死锁感兴趣,可以尝试创建一个包含死锁的 Rust 程序;然后研究任何语言中互斥锁的死锁缓解策略,并尝试在 Rust 中实现它们。Mutex<T>MutexGuard 的标准库 API 文档提供了有用的信息。

我们将通过讨论 SendSync 特性以及如何将它们与自定义类型一起使用来结束本章。