共享状态
到目前为止,我们有一个键值服务器在工作。然而,有一个重大的缺陷:状态没有在不同的连接中共享。我们将在这篇文章中解决这个问题。
策略
在Tokio中,共享状态有几种不同的方式:
- 用Mutex来保护共享状态。
- 生成一个任务来管理状态,并使用消息传递来操作它。
一般来说,对于简单的数据使用第一种方法,而对于需要异步工作的东西使用第二种方法,比如I/O原语。在本章中,共享状态是一个HashMap,操作是 insert 和 get。这两种操作都不是异步的,所以我们将使用Mutex。
后一种方法将在下一章中介绍。
添加 bytes 依赖
Mini-Redis板块没有使用 Vec<u8>
,而是使用 bytes
crate的 Bytes
。Bytes的目标是为网络编程提供一个强大的字节数组结构。相比 Vec<u8>
最大的特点是浅层克隆。换句话说,在 Bytes 实例上调用 clone() 并不复制基础数据。相反,Bytes 实例是对一些底层数据的一个引用计数的句柄。Bytes类型大致是一个Arc<Vec<u8>
,但有一些附加功能。
要依赖字节,请在 Cargo.toml
的 [dependencies]
部分添加以下内容:
bytes = "1"
初始化HashMap
HashMap将在许多任务和可能的许多线程之间共享。为了支持这一点,它被包裹在 Arc<Mutex<_>>
中。
首先,为方便起见,在use语句后添加以下类型别名:
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
然后,更新主函数以初始化HashMap,并将一个Arc句柄传递给 process 函数。使用Arc允许从许多任务中并发地引用HashMap,可能在许多线程上运行。在整个Tokio中,术语 handle 被用来引用一个提供对一些共享状态的访问的值。
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
关于 std::sync::Mutex
的使用
注意,使用 std::sync::Mutex
而不是 tokio::Mutex
来保护 HashMap。一个常见的错误是在异步代码中无条件地使用 tokio::sync::Mutex
。异步Mutex是一个跨调用 .await
而被锁定的Mutex。
同步的mutex在等待获得锁的时候会阻塞当前线程。这反过来又会阻塞其他任务的处理。然而,切换到 tokio::sync::Mutex
通常没有帮助,因为异步mutex内部使用同步mutex。
作为一个经验法则,在异步代码中使用同步的mutex是可以的,只要竞争保持在较低的水平,并且在调用 .await
时不保持锁。此外,可以考虑使用parking_lot::Mutex
作为 std::sync::Mutex
的更快的替代品。
更新process()
process函数不再初始化一个HashMap。相反,它将 HashMap 的共享句柄作为一个参数。它还需要在使用 HashMap 之前锁定它。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
任务、线程和争用
当争夺最小的时候,使用一个阻塞的mutex来保护简短的关键部分是一个可以接受的策略。当锁被争夺时,执行任务的线程必须阻塞并等待mutex。这不仅会阻塞当前的任务,也会阻塞当前线程上安排的所有其他任务。
默认情况下,Tokio运行时使用一个多线程调度器。任务被安排在由运行时管理的任何数量的线程上。如果大量的任务被安排执行,并且它们都需要访问mutex,那么就会出现争夺。另一方面,如果使用 current_thread 运行时风味,那么mutex将永远不会被争夺。
current_thread
运行时是一个轻量级的、单线程的运行时。当只生成几个任务并打开少量的套接字时,它是一个很好的选择。例如,当在异步客户端库之上提供一个同步API桥接时,这个选项很好用。
如果同步 mutex 的争夺成为问题,最好的解决办法很少是切换到Tokio mutex。相反,要考虑的选项是。
- 切换到一个专门的任务来管理状态并使用消息传递。
- 分片 mutex。
- 重组代码以避免使用mutex。
在我们的案例中,由于每个 key 都是独立的,mutex分片将很好地工作。为了做到这一点,我们将引入N个不同的实例,而不是一个Mutex<HashMap<_, _»实例。
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
然后,为任何给定的密钥寻找单元成为一个两步过程。首先,key 被用来识别它属于哪个分片。然后,在HashMap中查找该 key。
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
dashmap crate提供了一个分片哈希图的实现。
在.await
中持有MutexGuard
你可能会写这样的代码:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
当你试图生成调用该函数的东西时,你会遇到以下错误信息:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
发生这种情况是因为 std::sync::MutexGuard
类型不是 Send
。这意味着你不能把一个mutex锁发送到另一个线程,而错误的发生是因为Tokio运行时可以在每个 .await
的线程之间移动一个任务。为了避免这种情况,你应该重组你的代码,使互斥锁的析构器在.await
之前运行。
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
请注意,这不起作用:
use std::sync::{Mutex, MutexGuard};
// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
这是因为编译器目前只根据作用域信息来计算一个future是否是Send。编译器有望在将来更新以支持显式丢弃,但现在,你必须显式地使用一个范围。
你不应该试图通过以不需要 Send 的方式催生任务来规避这个问题,因为如果Tokio在任务持有锁的时候将你的任务暂停在一个.await
,一些其他的任务可能会被安排在同一个线程上运行,而这个其他的任务也可能试图锁定那个突变体,这将导致一个死锁,因为等待锁定突变体的任务会阻止持有突变体的任务释放突变体。
我们将在下面讨论一些方法来修复这个错误信息。
重组代码,使其不在一个.await中保持锁
我们已经在上面的片段中看到了一个例子,但还有一些更强大的方法可以做到这一点。例如,你可以将mutex包裹在一个结构中,并且只在该结构的非同步方法中锁定mutex。
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
这种模式保证你不会遇到Send错误,因为mutex guard不会出现在异步函数的任何地方。
生成任务来管理状态,并使用消息传递来操作它
这是本章开头提到的第二种方法,通常在共享资源是I/O资源时使用。更多细节见下一章。
使用Tokio的异步mutex
也可以使用 Tokio 提供的 tokio::sync::Mutex
类型。Tokio mutex的主要特点是,它可以跨 .await
持有,而没有任何问题。也就是说,异步的mutex比普通的mutex更昂贵,通常使用其他两种方法会更好。
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here