共享状态

使用 Mutex 来保护共享状态

https://tokio.rs/tokio/tutorial/shared-state

到目前为止,我们有一个键值服务器在工作。然而,有一个重大的缺陷:状态没有在不同的连接中共享。我们将在这篇文章中解决这个问题。

策略

在Tokio中,共享状态有几种不同的方式:

  1. 用Mutex来保护共享状态。
  2. 生成一个任务来管理状态,并使用消息传递来操作它。

一般来说,对于简单的数据使用第一种方法,而对于需要异步工作的东西使用第二种方法,比如I/O原语。在本章中,共享状态是一个HashMap,操作是 insert 和 get。这两种操作都不是异步的,所以我们将使用Mutex。

后一种方法将在下一章中介绍。

添加 bytes 依赖

Mini-Redis板块没有使用 Vec<u8>,而是使用 bytes crate的 Bytes。Bytes的目标是为网络编程提供一个强大的字节数组结构。相比 Vec<u8> 最大的特点是浅层克隆。换句话说,在 Bytes 实例上调用 clone() 并不复制基础数据。相反,Bytes 实例是对一些底层数据的一个引用计数的句柄。Bytes类型大致是一个Arc<Vec<u8>,但有一些附加功能。

要依赖字节,请在 Cargo.toml[dependencies] 部分添加以下内容:

bytes = "1"

初始化HashMap

HashMap将在许多任务和可能的许多线程之间共享。为了支持这一点,它被包裹在 Arc<Mutex<_>> 中。

首先,为方便起见,在use语句后添加以下类型别名:

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

然后,更新主函数以初始化HashMap,并将一个Arc句柄传递给 process 函数。使用Arc允许从许多任务中并发地引用HashMap,可能在许多线程上运行。在整个Tokio中,术语 handle 被用来引用一个提供对一些共享状态的访问的值。

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

关于 std::sync::Mutex 的使用

注意,使用 std::sync::Mutex 而不是 tokio::Mutex 来保护 HashMap。一个常见的错误是在异步代码中无条件地使用 tokio::sync::Mutex。异步Mutex是一个跨调用 .await 而被锁定的Mutex。

同步的mutex在等待获得锁的时候会阻塞当前线程。这反过来又会阻塞其他任务的处理。然而,切换到 tokio::sync::Mutex 通常没有帮助,因为异步mutex内部使用同步mutex。

作为一个经验法则,在异步代码中使用同步的mutex是可以的,只要竞争保持在较低的水平,并且在调用 .await 时不保持锁。此外,可以考虑使用parking_lot::Mutex 作为 std::sync::Mutex 的更快的替代品。

更新process()

process函数不再初始化一个HashMap。相反,它将 HashMap 的共享句柄作为一个参数。它还需要在使用 HashMap 之前锁定它。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

任务、线程和争用

当争夺最小的时候,使用一个阻塞的mutex来保护简短的关键部分是一个可以接受的策略。当锁被争夺时,执行任务的线程必须阻塞并等待mutex。这不仅会阻塞当前的任务,也会阻塞当前线程上安排的所有其他任务。

默认情况下,Tokio运行时使用一个多线程调度器。任务被安排在由运行时管理的任何数量的线程上。如果大量的任务被安排执行,并且它们都需要访问mutex,那么就会出现争夺。另一方面,如果使用 current_thread 运行时风味,那么mutex将永远不会被争夺。

current_thread 运行时是一个轻量级的、单线程的运行时。当只生成几个任务并打开少量的套接字时,它是一个很好的选择。例如,当在异步客户端库之上提供一个同步API桥接时,这个选项很好用。

如果同步 mutex 的争夺成为问题,最好的解决办法很少是切换到Tokio mutex。相反,要考虑的选项是。

  • 切换到一个专门的任务来管理状态并使用消息传递。
  • 分片 mutex。
  • 重组代码以避免使用mutex。

在我们的案例中,由于每个 key 都是独立的,mutex分片将很好地工作。为了做到这一点,我们将引入N个不同的实例,而不是一个Mutex<HashMap<_, _»实例。

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

然后,为任何给定的密钥寻找单元成为一个两步过程。首先,key 被用来识别它属于哪个分片。然后,在HashMap中查找该 key。

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

dashmap crate提供了一个分片哈希图的实现。

.await中持有MutexGuard

你可能会写这样的代码:

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

当你试图生成调用该函数的东西时,你会遇到以下错误信息:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

发生这种情况是因为 std::sync::MutexGuard 类型不是 Send。这意味着你不能把一个mutex锁发送到另一个线程,而错误的发生是因为Tokio运行时可以在每个 .await 的线程之间移动一个任务。为了避免这种情况,你应该重组你的代码,使互斥锁的析构器在.await之前运行。

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

请注意,这不起作用:

use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

这是因为编译器目前只根据作用域信息来计算一个future是否是Send。编译器有望在将来更新以支持显式丢弃,但现在,你必须显式地使用一个范围。

你不应该试图通过以不需要 Send 的方式催生任务来规避这个问题,因为如果Tokio在任务持有锁的时候将你的任务暂停在一个.await,一些其他的任务可能会被安排在同一个线程上运行,而这个其他的任务也可能试图锁定那个突变体,这将导致一个死锁,因为等待锁定突变体的任务会阻止持有突变体的任务释放突变体。

我们将在下面讨论一些方法来修复这个错误信息。

重组代码,使其不在一个.await中保持锁

我们已经在上面的片段中看到了一个例子,但还有一些更强大的方法可以做到这一点。例如,你可以将mutex包裹在一个结构中,并且只在该结构的非同步方法中锁定mutex。

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

这种模式保证你不会遇到Send错误,因为mutex guard不会出现在异步函数的任何地方。

生成任务来管理状态,并使用消息传递来操作它

这是本章开头提到的第二种方法,通常在共享资源是I/O资源时使用。更多细节见下一章。

使用Tokio的异步mutex

也可以使用 Tokio 提供的 tokio::sync::Mutex 类型。Tokio mutex的主要特点是,它可以跨 .await 持有,而没有任何问题。也就是说,异步的mutex比普通的mutex更昂贵,通常使用其他两种方法会更好。

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here