通道
现在我们已经学习了一些关于Tokio并发的知识,让我们在客户端应用这些知识。把我们之前写的服务器代码放到一个明确的二进制文件中:
mkdir src/bin
mv src/main.rs src/bin/server.rs
然后创建一个包含客户端代码的二进制文件:
touch src/bin/client.rs
本节中的代码将被写入到这个文件中。需要运行时,首先要在另外一个终端窗口中启动服务:
cargo run --bin server
然后在启动客户端,注意是分别启动在不同的终端中:
cargo run --bin client
好了,让我们开始编码。
假设我们想运行两个并发的Redis命令。我们可以为每个命令生成一个任务。那么这两条命令就会同时发生。
起初,我们可能会尝试类似的做法:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
这不会被编译,因为两个任务都需要以某种方式访问 client
。由于Client没有实现Copy,如果没有一些代码来促进这种共享,它将不会被编译。此外,Client::set
需要 &mut self
,这意味着调用它需要独占访问。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex
,因为 .await
需要在持有锁的情况下被调用。我们可以使用 tokio::sync::Mutex
,但这只允许一个飞行中的请求。如果客户端实现了管道化,那么异步Mutex会导致连接的利用率不足。
消息传递
答案是使用消息传递。这种模式包括产生一个专门的任务来管理 client 资源。任何希望发出请求的任务都会向 client 任务发送一个消息。client 任务代表发送者发出请求,并将响应发回给发送者。
使用这种策略,就可以建立一个单一的连接。管理 client 的任务能够获得排他性的访问,以便调用get和set。此外,通道作为一个缓冲区工作。在 client 任务忙碌的时候,操作可以被发送到 client 任务。一旦 client 任务可以处理新的请求,它就从通道中拉出下一个请求。这可以带来更好的吞吐量,并且可以扩展到支持连接池。
Tokio的通道原语
Tokio提供一些通道(channel),每个通道都有不同的用途:
- mpsc:多生产者,单消费者通道。可以发送许多数值。
- oneshot:单生产者,单消费者通道。可以发送一个单一的值。
- broadcast: 多生产者,多消费者。可以发送许多值。每个接收者看到每个值。
- watch:单生产者,多消费者。可以发送许多值,但不保留历史。接收者只看到最近的值。
如果你需要一个多生产者多消费者的通道,只有一个消费者看到每个消息,你可以使用 async-channel
crate。也有一些通道用于异步Rust之外,比如 std::sync::mpsc
和crossbeam::channel
。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。
在本节中,我们将使用 mpsc
和 oneshot
。其他类型的消息传递通道将在后面的章节中进行探讨。本节的完整代码可在此找到。
定义消息类型
在大多数情况下,当使用消息传递时,接收消息的任务会对一个以上的命令做出响应。在我们的例子中,该任务将对GET和SET命令做出响应。为了对此进行建模,我们首先定义一个Command枚举,并为每个命令类型包含一个变量。
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
创建通道
在主函数中,创建 mpsc 通道:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Create a new channel with a capacity of at most 32.
let (tx, mut rx) = mpsc::channel(32);
// ... Rest comes here
}
mpsc 通道被用来向管理 redis 连接的任务发送命令。多生产者的能力允许从许多任务发送消息。创建通道会返回两个值,一个发送者和一个接收者。这两个句柄是单独使用的。它们可以被转移到不同的任务。
通道的创建容量为32。如果消息的发送速度比接收速度快,通道将储存这些消息。一旦通道中存储了32条消息,调用 send(...).await
将进入睡眠状态,直到有消息被接收者删除。
从多个任务中发送是通过克隆 sender 来完成的。比如说。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
两条信息都被发送到单一的 Receiver 句柄。mpsc
通道的Receiver 不能克隆。
当每个 sender 都超出了范围或被放弃时,就不再可能向通道发送更多的消息了。在这一点上,接收器上的 recv 调用将返回None,这意味着所有的发送者都消失了,通道被关闭。
在我们管理 Redis 连接的任务中,它知道一旦通道关闭,它就可以关闭 Redis 连接,因为该连接将不会再被使用。
生成管理器任务
接下来,生成处理来自通道的消息的任务。首先,建立到 Redis 客户端的连接。然后,通过 Redis 连接发出收到的命令。
use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Start receiving messages
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
现在,更新这两个任务,通过通道发送命令,而不是直接在 Redis 连接上发布。
// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
在 main 函数的底部,我们 .await
join 句柄,以确保在进程退出前完全完成命令。
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
接收响应
最后一步是接收来自管理器任务的响应。GET命令需要获得数值,SET命令需要知道操作是否成功完成。
为了传递响应,使用了一个 oneshot 通道。oneshot 通道是一个单一生产者、单一消费者的通道,为发送单一数值而优化。在我们的例子中,这个单一的值就是响应。
与mpsc类似,oneshot::channel()
返回一个 sender 和 receiver 句柄。
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
与 mpsc 不同,没有指定容量,因为容量总是1。此外,两个句柄都不能被克隆。
为了接收来自管理任务的响应,在发送命令之前,创建了一个 oneshot 通道。该通道的 Sender 部分被包含在给管理任务的命令中。receive 部分用来接收响应。
首先,更新 Command 以包括 Sender。为了方便起见,使用一个类型别名来引用 Sender。
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Vec<u8>,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
现在,更新发布命令的任务,包括 oneshot::Sender
。
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: b"bar".to_vec(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
最后,更新管理任务,通过 oneshot 的通道发送响应:
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val.into()).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
在 oneshot::Sender
上调用 send 会立即完成,不需要 .await
。这是因为在 oneshot 通道上的发送将总是立即失败或成功,而不需要任何形式的等待。
在 oneshot 通道上发送一个值,当接收方的一半已经放弃时,返回 Err。这表明接收方对响应不再感兴趣了。在我们的方案中,接收方取消兴趣是一个可接受的事件。resp.send(...)
返回的Err不需要被处理。
背压和有界通道
无论何时引入并发或队列,都必须确保队列是有界的,系统将优雅地处理负载。无界的队列最终会占用所有可用的内存,导致系统以不可预测的方式失败。
Tokio 小心避免隐性队列。这其中很大一部分是由于异步操作是 lazy 的。请考虑以下情况:
loop {
async_op();
}
如果异步操作急切地运行,循环将重复排队运行一个新的 async_op
,而不确保之前的操作完成。这就导致了隐性的无边界队列。基于回调的系统和基于急切的 future 的系统特别容易受此影响。
然而,在Tokio和异步Rust中,上述片段根本不会导致 async_op
的运行。这是因为 .await
从未被调用。如果该片段被更新为使用 .await
,那么循环会等待操作完成后再重新开始。
loop {
// Will not repeat until `async_op` completes
async_op().await;
}
必须明确地引入并发和队列。做到这一点的方法包括:
tokio::spawn
select!
join!
mpsc::channel
在这样做的时候,要注意确保并发的总量是有界限的。例如,当编写一个TCP接受循环时,要确保打开的套接字的总数是有限制的。当使用 mpsc::channel
时,选择一个可管理的通道容量。具体的约束值将是特定于应用的。
注意和挑选好的界限是编写可靠的 tokio 应用程序的一个重要部分。