Tokio教程
- 1: Tokio教程概况
- 2: 教程准备工作
- 3: Hello Tokio
- 4: spawning
- 5: 共享状态
- 6: 通道
- 7: I/O
- 8: 分帧
- 9: 深入异步
- 10: select
- 11: stream
- 12: 桥接同步代码
1 - Tokio教程概况
Tokio是Rust编程语言的一个异步运行时。它提供了编写网络应用所需的构建模块。它提供了针对各种系统的灵活性,从有几十个内核的大型服务器到小型嵌入式设备。
在高层次上,Tokio提供了几个主要组件:
- 一个用于执行异步代码的多线程运行时。
- 一个标准库的异步版本。
- 一个庞大的库生态系统。
Tokio 在项目中的作用
当你以异步方式编写你的应用程序时,你可以通过减少在同一时间做许多事情的成本,使它能够更好地扩展。然而,异步的Rust代码不会自己运行,所以你必须选择一个运行时来执行它。Tokio库是使用最广泛的运行时,在使用量上超过了所有其他运行时的总和。
此外,Tokio提供了许多有用的工具。在编写异步代码时,你不能使用Rust标准库提供的普通阻塞API,而必须使用它们的异步版本。这些替代版本是由Tokio提供的,在有意义的地方反映了Rust标准库的API。
Tokio的优势
本节将概述Tokio的一些优势。
快速
Tokio 是快速的,它建立在 Rust 编程语言之上,而 Rust 编程语言本身是快速的。这是按照 Rust 的精神来做的,目标是你不应该通过手工编写同等的代码来提高性能。
Tokio是可扩展的,建立在 async/await 语言特性之上,而这本身就是可扩展的。当处理网络时,由于延迟的原因,你能处理一个连接的速度是有限的,所以唯一的扩展方式是一次处理许多连接。有了 async/await 语言功能,增加并发操作的数量变得异常便宜,使你可以扩展到大量的并发任务。
可靠的
Tokio 是用 Rust 构建的,Rust 是一种使每个人都能构建可靠和高效软件的语言。一些研究发现,大约有70%的高严重度安全漏洞是由内存不安全造成的。使用Rust可以在你的应用程序中消除这整类错误。
Tokio也非常注重提供一致的行为,没有任何意外。Tokio的主要目标是让用户部署可预测的软件,使其每天都有相同的表现,有可靠的响应时间,没有不可预知的延迟峰值。
简单
有了Rust的 async/await 功能,编写异步应用程序的复杂性就大大降低了。与Tokio的实用程序和充满活力的生态系统搭配,编写应用程序是一件轻而易举的事。
Tokio在合理的情况下遵循标准库的命名惯例。这使得只用标准库编写的代码很容易转换为用Tokio编写的代码。有了Rust强大的类型系统,轻松提供正确代码的能力是无可比拟的。
灵活
Tokio提供了多种运行时的变化。从多线程的、work-stealing 的运行时到轻量级的、单线程的运行时都有。每个运行时都有许多旋钮,允许用户根据自己的需要进行调整。
什么时候不使用Tokio
虽然Tokio对许多需要同时做很多事情的项目很有用,但也有一些Tokio不适合的使用情况。
-
通过在几个线程上并行运行来加速由CPU控制的计算。Tokio是为IO绑定的应用而设计的,在这种情况下,每个单独的任务大部分时间都在等待IO。如果你的应用程序唯一做的事情是并行运行计算,你应该使用rayon。也就是说,如果你需要同时做这两件事,还是可以 “混合搭配” 的。
-
读取大量的文件。虽然看起来Tokio对那些仅仅需要读取大量文件的项目很有用,但与普通线程池相比,Tokio在这里没有提供任何优势。这是因为操作系统一般不提供异步文件API。
-
发送单个网络请求。Tokio给你带来优势的地方是当你需要同时做很多事情时。如果你需要使用一个用于异步Rust的库,如reqwest,但你不需要同时做很多事情,你应该选择该库的阻塞版本,因为它将使你的项目更简单。当然,使用Tokio仍然可以工作,但与阻塞式API相比,没有提供真正的优势。如果该库没有提供阻塞式的API,请看关于用同步代码桥接的章节。
2 - 教程准备工作
内容出处:https://tokio.rs/tokio/tutorial/setup
本教程将带领你一步一步地完成构建Redis客户端和服务器的过程。我们将从Rust的异步编程的基础知识开始,并在此基础上建立起来。我们将实现Redis命令的一个子集,但会对Tokio进行全面考察。
Mini-Redis
你将在本教程中构建的项目在 GitHub 上以 Mini-Redis 的形式提供。Mini-Redis是以学习Tokio为主要目的而设计的,因此注释得非常好,但这也意味着Mini-Redis缺少一些你希望在真正的Redis库中实现的功能。你可以在 crates.io 上找到可用于生产的 Redis 库。
我们将在本教程中直接使用Mini-Redis。这允许我们在教程中使用Mini-Redis的部分功能,然后再在后面的教程中实现它们。
获得帮助
在任何时候,如果你遇到困难,你都可以在Discord或GitHub的讨论中得到帮助。不要担心问 “初学者” 的问题。我们都是从某处开始的,并且很乐意提供帮助。
前提条件
读者应该已经熟悉了Rust。Rust-book 是一个很好的入门资源。
虽然不是必须的,但使用Rust标准库或其他语言编写网络代码的一些经验可能会有所帮助。
不需要对Redis有任何预先了解。
rust
在开始之前,你应该确保你已经安装了Rust工具链并准备好了。如果你没有,最简单的方法是使用rustup来安装它。
本教程要求至少有1.45.0版本的Rust,但建议使用最新的稳定版本的Rust。
要检查你的电脑上是否安装了Rust,请运行以下程序。
$ rustc --version
Mini-Redis服务器
接下来,安装Mini-Redis服务器。这将被用来测试我们的客户端,因为我们正在构建它。
cargo install mini-redis
3 - Hello Tokio
我们将通过编写一个非常基本的Tokio应用程序开始。它将连接到Mini-Redis服务器,将 key hello的值设置为world。然后它将读回key。这将使用Mini-Redis客户端库来完成。
代码
生成一个新的crate
让我们从生成一个新的Rust应用程序开始:
$ cargo new my-redis
$ cd my-redis
添加依赖项
接下来,打开Cargo.toml
,在 [dependencies]
下面添加以下内容:
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"
编写代码
然后,打开main.rs,将该文件的内容替换为:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;
// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;
// Get key "hello"
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
确保Mini-Redis服务器正在运行。在一个单独的终端窗口,运行:
$ mini-redis-server
如果你还没有安装mini-redis,你可以用
$ cargo install mini-redis
现在,运行my-redis应用程序:
$ cargo run
got value from the server; result=Some(b"world")
代码分解
让我们花点时间来看看我们刚刚做了什么。没有太多的代码,但有很多事情正在发生。
let mut client = client::connect("127.0.0.1:6379").await?;
client::connect
函数是由 mini-redis crate提供的。它异步地与指定的远程地址建立了一个TCP连接。一旦连接建立起来,就会返回一个 client
句柄。尽管操作是异步进行的,但我们写的代码看起来是同步的。唯一表明该操作是异步的是 .await
操作符。
什么是异步编程?
大多数计算机程序的执行顺序与它的编写顺序相同。第一行执行,然后是下一行,以此类推。在同步编程中,当程序遇到不能立即完成的操作时,它就会阻塞,直到操作完成。例如,建立一个TCP连接需要在网络上与一个对等体进行交换,这可能需要相当长的时间。在这段时间内,线程会被阻塞。
通过异步编程,不能立即完成的操作被暂停到后台。线程没有被阻塞,可以继续运行其他事情。一旦操作完成,任务就会被取消暂停,并继续从它离开的地方处理。我们之前的例子中只有一个任务,所以在它被暂停的时候什么都没有发生,但异步程序通常有许多这样的任务。
尽管异步编程可以带来更快的应用,但它往往导致更复杂的程序。程序员需要跟踪所有必要的状态,以便在异步操作完成后恢复工作。从历史上看,这是一项繁琐且容易出错的任务。
编译时绿色线程
Rust使用一个叫做 async/await
的功能实现了异步编程。执行异步操作的函数都标有 async
关键字。在我们的例子中,connect函数是这样定义的:
use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
// ...
}
async fn
的定义看起来像一个普通的同步函数,但却以异步方式运行。Rust在编译时将 async fn 转化为一个异步运行的routine。在 async fn
中对 .await
的任何调用都会将控制权交还给线程。当操作在后台进行时,线程可以做其他工作。
尽管其他语言也实现了async/await,但Rust采取了一种独特的方法。主要是,Rust的异步操作是 lazy 的。这导致了与其他语言不同的运行时语义。
如果这还不是很有意义,不要担心。我们将在本指南中更多地探讨async/await。
使用 async/await
异步函数的调用与其他Rust函数一样。然而,调用这些函数并不会导致函数主体的执行。相反,调用 async fn
会返回一个代表操作的值。这在概念上类似于一个零参数闭包。要实际运行该操作,你应该在返回值上使用 .await
操作符。
例如,给定的程序:
async fn say_world() {
println!("world");
}
#[tokio::main]
async fn main() {
// Calling `say_world()` does not execute the body of `say_world()`.
let op = say_world();
// This println! comes first
println!("hello");
// Calling `.await` on `op` starts executing `say_world`.
op.await;
}
输出为:
hello
world
async fn
的返回值是一个匿名类型,它实现了 Future trait。
异步main函数
用于启动应用程序的main函数与大多数Rust工具箱中的常见函数不同。
- 它是
async fn
- 它被注解为
#[tokio::main]
。
使用 async fn
是因为我们想进入一个异步上下文。然而,异步函数必须由一个运行时来执行。运行时包含异步任务调度器,提供事件化I/O、计时器等。运行时不会自动启动,所以主函数需要启动它。
#[tokio::main]
函数是一个宏。它将 async fn main()
转换为同步 fn main()
,初始化一个运行时实例并执行异步main函数。
例如,下面的例子:
#[tokio::main]
async fn main() {
println!("hello");
}
被转化为:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
Tokio运行时的细节将在后面介绍。
Cargo features
在本教程中依赖Tokio时,启用了 full
的功能标志:
tokio = { version = "1", features = ["full"] }
Tokio 有很多功能(TCP、UDP、Unix 套接字、定时器、同步工具、多种调度器类型等)。不是所有的应用程序都需要所有的功能。当试图优化编译时间或最终应用程序的足迹时,应用程序可以决定只选择进入它所使用的功能。
目前,在依赖 tokio 时,请使用 “full” feature。
4 - spawning
内容出处:https://tokio.rs/tokio/tutorial/spawning
我们将换个角度,开始在Redis服务器上工作。
首先,把上一节中的客户端SET/GET代码移到一个示例文件中。这样,我们就可以针对我们的服务器运行它:
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
然后创建一个新的、空的 src/main.rs
并继续。
接受套接字
我们的Redis服务器需要做的第一件事是接受入站的TCP套接字。这是用 tokio::net::TcpListener
完成的。
Tokio的许多类型与Rust标准库中的同步类型命名相同。在合理的情况下,Tokio暴露了与std相同的API,但使用了async fn。
TcpListener被绑定到6379端口,然后在一个循环中接受socket。每个套接字都被处理,然后关闭。现在,我们将读取命令,将其打印到stdout,并回应一个错误。
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
现在,运行这个 accept 循环:
$ cargo run
在一个单独的终端窗口,运行 hello-redis 的例子(上一节的SET/GET命令):
$ cargo run --example hello-redis
输出应该是:
Error: "unimplemented"
在服务器终端,输出是:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发
我们的服务器有一个小问题(除了只回应错误)。它一次处理一个入站请求。当一个连接被接受时,服务器停留在接受循环块内,直到响应被完全写入套接字。
我们希望我们的Redis服务器能够处理许多并发的请求。要做到这一点,我们需要增加一些并发性。
并发和并行是不一样的。如果你在两个任务之间交替进行,那么你就是在同时进行两个任务,但不是并行的。要想获得并行的资格,你需要两个人,一个人专门负责每个任务。
使用Tokio的一个优点是,异步代码允许你在许多任务上并发工作,而不必使用普通线程并行工作。事实上,Tokio可以在一个单线程上并发运行许多任务
为了并发地处理连接,为每个入站连接生成一个新的任务。连接在这个任务中被处理。
接受循环变成:
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}
任务
Tokio任务是一个异步的绿色线程。它们是通过传递一个异步块给 tokio::spawn
来创建的。tokio::spawn
函数返回 JoinHandle
,调用者可以用它来与生成的任务进行交互。该异步块可以有一个返回值。调用者可以使用 JoinHandle
上的 .await
获取返回值。
比如说:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
对JoinHandle的等待会返回一个 Result
。当任务在执行过程中遇到错误时,JoinHandle将返回 Err
。这发生在任务恐慌的时候,或者任务被运行时关闭而强行取消的时候。
任务是由调度器管理的执行单位。生成的任务提交给 Tokio 调度器,然后确保该任务在有工作要做时执行。生成的任务可以在它被生成的同一线程上执行,也可以在不同的运行时线程上执行。任务在被催生后也可以在线程之间移动。
Tokio中的任务是非常轻便的。在引擎盖下,它们只需要一次分配和64字节的内存。应用程序可以自由地生成数千,甚至数百万个任务。
'static
bound
当你在Tokio运行时中催生一个任务时,它的类型必须是 'static
的。这意味着生成的任务不能包含对任务之外拥有的数据的任何引用。
一个常见的误解是,
'static
总是意味着 “永远活着”,但事实并非如此。仅仅因为一个值是'static
,并不意味着你有内存泄漏。你可以在《常见的Rust寿命误解》中阅读更多内容。
例如,以下内容将无法编译:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
试图编译时,出现了以下错误:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
发生这种情况是因为,默认情况下,变量不会被 move 到异步块中。v向量仍然归主函数所有。rust编译器很好地解释了这一点,甚至还提出了解决方法 将第7行改为 task::spawn(async move {
将指示编译器将v move 到被催生的任务中。这样,该任务拥有其所有的数据,使其成为 ‘static。
如果数据必须被多个任务同时访问,那么它必须使用同步原语(如Arc)进行共享。
请注意,错误信息提到了参数类型超过了 'static
生命周期。这个术语可能相当令人困惑,因为 'static
生命周期一直持续到程序结束,所以如果它超过了这个生命周期,你不就有内存泄漏了吗?解释是,必须超过 'static
生命周期的是类型,而不是值,而且值可能在其类型不再有效之前被销毁。
当我们说一个值是 'static
的时候,所有的意思是,把这个值永远留在身边是不对的。这一点很重要,因为编译器无法推理出一个新产生的任务会停留多长时间,所以它能确保任务不会活得太久的唯一方法就是确保它可能永远存在。
上面信息框中的链接使用了 “bounded by 'static
” 的术语,而不是 “its type outlives 'static
” 或 “the value is 'static
” for T: 'static
。这些都是同一个意思,与 &'static T
中的注解 'static
是不同的。
Send
bound
由 tokio::spawn
产生的任务必须实现 Send
。这允许Tokio运行时在线程之间 move 任务,而这些任务在一个 .await
中被暂停。
当所有跨 .await
调用的数据都是Send时,任务就是Send。这有点微妙。当 .await
被调用时,任务就回到了调度器中。下一次任务被执行时,它将从最后的让出点恢复。为了使其正常工作,所有在 .await
之后使用的状态都必须由任务保存。如果这个状态是 Send
,即可以跨线程移动,那么任务本身也可以跨线程移动。反过来说,如果这个状态不是 Send
,那么任务也不是。
例如,这样就可以了:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
这并不是:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}
试图编译该片段的结果是:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将在下一章更深入地讨论这个错误的一个特例。
存储数值
我们现在将实现处理传入命令的 process
函数。我们将使用 HashMap 来存储值。SET命令将插入到HashMap中,GET 值将加载它们。此外,我们将使用一个循环来接受每个连接的一个以上的命令。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// A hashmap is used to store data
let mut db = HashMap::new();
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
// Use `read_frame` to receive a command from the connection.
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// The value is stored as `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
现在,启动服务器:
$ cargo run
并在一个单独的终端窗口中,运行hello-redis的例子:
$ cargo run --example hello-redis
现在,输出将是:
got value from the server; result=Some(b"world")
我们现在可以获取和设置值了,但有一个问题:这些值在连接之间是不共享的。如果另一个套接字连接并试图获取Hello键,它将不会发现任何东西。
你可以在这里找到完整的代码。
在下一节中,我们将实现对所有套接字的数据进行持久化。
5 - 共享状态
到目前为止,我们有一个键值服务器在工作。然而,有一个重大的缺陷:状态没有在不同的连接中共享。我们将在这篇文章中解决这个问题。
策略
在Tokio中,共享状态有几种不同的方式:
- 用Mutex来保护共享状态。
- 生成一个任务来管理状态,并使用消息传递来操作它。
一般来说,对于简单的数据使用第一种方法,而对于需要异步工作的东西使用第二种方法,比如I/O原语。在本章中,共享状态是一个HashMap,操作是 insert 和 get。这两种操作都不是异步的,所以我们将使用Mutex。
后一种方法将在下一章中介绍。
添加 bytes 依赖
Mini-Redis板块没有使用 Vec<u8>
,而是使用 bytes
crate的 Bytes
。Bytes的目标是为网络编程提供一个强大的字节数组结构。相比 Vec<u8>
最大的特点是浅层克隆。换句话说,在 Bytes 实例上调用 clone() 并不复制基础数据。相反,Bytes 实例是对一些底层数据的一个引用计数的句柄。Bytes类型大致是一个Arc<Vec<u8>
,但有一些附加功能。
要依赖字节,请在 Cargo.toml
的 [dependencies]
部分添加以下内容:
bytes = "1"
初始化HashMap
HashMap将在许多任务和可能的许多线程之间共享。为了支持这一点,它被包裹在 Arc<Mutex<_>>
中。
首先,为方便起见,在use语句后添加以下类型别名:
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
然后,更新主函数以初始化HashMap,并将一个Arc句柄传递给 process 函数。使用Arc允许从许多任务中并发地引用HashMap,可能在许多线程上运行。在整个Tokio中,术语 handle 被用来引用一个提供对一些共享状态的访问的值。
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
关于 std::sync::Mutex
的使用
注意,使用 std::sync::Mutex
而不是 tokio::Mutex
来保护 HashMap。一个常见的错误是在异步代码中无条件地使用 tokio::sync::Mutex
。异步Mutex是一个跨调用 .await
而被锁定的Mutex。
同步的mutex在等待获得锁的时候会阻塞当前线程。这反过来又会阻塞其他任务的处理。然而,切换到 tokio::sync::Mutex
通常没有帮助,因为异步mutex内部使用同步mutex。
作为一个经验法则,在异步代码中使用同步的mutex是可以的,只要竞争保持在较低的水平,并且在调用 .await
时不保持锁。此外,可以考虑使用parking_lot::Mutex
作为 std::sync::Mutex
的更快的替代品。
更新process()
process函数不再初始化一个HashMap。相反,它将 HashMap 的共享句柄作为一个参数。它还需要在使用 HashMap 之前锁定它。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
任务、线程和争用
当争夺最小的时候,使用一个阻塞的mutex来保护简短的关键部分是一个可以接受的策略。当锁被争夺时,执行任务的线程必须阻塞并等待mutex。这不仅会阻塞当前的任务,也会阻塞当前线程上安排的所有其他任务。
默认情况下,Tokio运行时使用一个多线程调度器。任务被安排在由运行时管理的任何数量的线程上。如果大量的任务被安排执行,并且它们都需要访问mutex,那么就会出现争夺。另一方面,如果使用 current_thread 运行时风味,那么mutex将永远不会被争夺。
current_thread
运行时是一个轻量级的、单线程的运行时。当只生成几个任务并打开少量的套接字时,它是一个很好的选择。例如,当在异步客户端库之上提供一个同步API桥接时,这个选项很好用。
如果同步 mutex 的争夺成为问题,最好的解决办法很少是切换到Tokio mutex。相反,要考虑的选项是。
- 切换到一个专门的任务来管理状态并使用消息传递。
- 分片 mutex。
- 重组代码以避免使用mutex。
在我们的案例中,由于每个 key 都是独立的,mutex分片将很好地工作。为了做到这一点,我们将引入N个不同的实例,而不是一个Mutex<HashMap<_, _»实例。
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
然后,为任何给定的密钥寻找单元成为一个两步过程。首先,key 被用来识别它属于哪个分片。然后,在HashMap中查找该 key。
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
dashmap crate提供了一个分片哈希图的实现。
在.await
中持有MutexGuard
你可能会写这样的代码:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
当你试图生成调用该函数的东西时,你会遇到以下错误信息:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
发生这种情况是因为 std::sync::MutexGuard
类型不是 Send
。这意味着你不能把一个mutex锁发送到另一个线程,而错误的发生是因为Tokio运行时可以在每个 .await
的线程之间移动一个任务。为了避免这种情况,你应该重组你的代码,使互斥锁的析构器在.await
之前运行。
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
请注意,这不起作用:
use std::sync::{Mutex, MutexGuard};
// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
这是因为编译器目前只根据作用域信息来计算一个future是否是Send。编译器有望在将来更新以支持显式丢弃,但现在,你必须显式地使用一个范围。
你不应该试图通过以不需要 Send 的方式催生任务来规避这个问题,因为如果Tokio在任务持有锁的时候将你的任务暂停在一个.await
,一些其他的任务可能会被安排在同一个线程上运行,而这个其他的任务也可能试图锁定那个突变体,这将导致一个死锁,因为等待锁定突变体的任务会阻止持有突变体的任务释放突变体。
我们将在下面讨论一些方法来修复这个错误信息。
重组代码,使其不在一个.await中保持锁
我们已经在上面的片段中看到了一个例子,但还有一些更强大的方法可以做到这一点。例如,你可以将mutex包裹在一个结构中,并且只在该结构的非同步方法中锁定mutex。
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
这种模式保证你不会遇到Send错误,因为mutex guard不会出现在异步函数的任何地方。
生成任务来管理状态,并使用消息传递来操作它
这是本章开头提到的第二种方法,通常在共享资源是I/O资源时使用。更多细节见下一章。
使用Tokio的异步mutex
也可以使用 Tokio 提供的 tokio::sync::Mutex
类型。Tokio mutex的主要特点是,它可以跨 .await
持有,而没有任何问题。也就是说,异步的mutex比普通的mutex更昂贵,通常使用其他两种方法会更好。
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
6 - 通道
现在我们已经学习了一些关于Tokio并发的知识,让我们在客户端应用这些知识。把我们之前写的服务器代码放到一个明确的二进制文件中:
mkdir src/bin
mv src/main.rs src/bin/server.rs
然后创建一个包含客户端代码的二进制文件:
touch src/bin/client.rs
本节中的代码将被写入到这个文件中。需要运行时,首先要在另外一个终端窗口中启动服务:
cargo run --bin server
然后在启动客户端,注意是分别启动在不同的终端中:
cargo run --bin client
好了,让我们开始编码。
假设我们想运行两个并发的Redis命令。我们可以为每个命令生成一个任务。那么这两条命令就会同时发生。
起初,我们可能会尝试类似的做法:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
这不会被编译,因为两个任务都需要以某种方式访问 client
。由于Client没有实现Copy,如果没有一些代码来促进这种共享,它将不会被编译。此外,Client::set
需要 &mut self
,这意味着调用它需要独占访问。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex
,因为 .await
需要在持有锁的情况下被调用。我们可以使用 tokio::sync::Mutex
,但这只允许一个飞行中的请求。如果客户端实现了管道化,那么异步Mutex会导致连接的利用率不足。
消息传递
答案是使用消息传递。这种模式包括产生一个专门的任务来管理 client 资源。任何希望发出请求的任务都会向 client 任务发送一个消息。client 任务代表发送者发出请求,并将响应发回给发送者。
使用这种策略,就可以建立一个单一的连接。管理 client 的任务能够获得排他性的访问,以便调用get和set。此外,通道作为一个缓冲区工作。在 client 任务忙碌的时候,操作可以被发送到 client 任务。一旦 client 任务可以处理新的请求,它就从通道中拉出下一个请求。这可以带来更好的吞吐量,并且可以扩展到支持连接池。
Tokio的通道原语
Tokio提供一些通道(channel),每个通道都有不同的用途:
- mpsc:多生产者,单消费者通道。可以发送许多数值。
- oneshot:单生产者,单消费者通道。可以发送一个单一的值。
- broadcast: 多生产者,多消费者。可以发送许多值。每个接收者看到每个值。
- watch:单生产者,多消费者。可以发送许多值,但不保留历史。接收者只看到最近的值。
如果你需要一个多生产者多消费者的通道,只有一个消费者看到每个消息,你可以使用 async-channel
crate。也有一些通道用于异步Rust之外,比如 std::sync::mpsc
和crossbeam::channel
。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。
在本节中,我们将使用 mpsc
和 oneshot
。其他类型的消息传递通道将在后面的章节中进行探讨。本节的完整代码可在此找到。
定义消息类型
在大多数情况下,当使用消息传递时,接收消息的任务会对一个以上的命令做出响应。在我们的例子中,该任务将对GET和SET命令做出响应。为了对此进行建模,我们首先定义一个Command枚举,并为每个命令类型包含一个变量。
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
创建通道
在主函数中,创建 mpsc 通道:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Create a new channel with a capacity of at most 32.
let (tx, mut rx) = mpsc::channel(32);
// ... Rest comes here
}
mpsc 通道被用来向管理 redis 连接的任务发送命令。多生产者的能力允许从许多任务发送消息。创建通道会返回两个值,一个发送者和一个接收者。这两个句柄是单独使用的。它们可以被转移到不同的任务。
通道的创建容量为32。如果消息的发送速度比接收速度快,通道将储存这些消息。一旦通道中存储了32条消息,调用 send(...).await
将进入睡眠状态,直到有消息被接收者删除。
从多个任务中发送是通过克隆 sender 来完成的。比如说。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
两条信息都被发送到单一的 Receiver 句柄。mpsc
通道的Receiver 不能克隆。
当每个 sender 都超出了范围或被放弃时,就不再可能向通道发送更多的消息了。在这一点上,接收器上的 recv 调用将返回None,这意味着所有的发送者都消失了,通道被关闭。
在我们管理 Redis 连接的任务中,它知道一旦通道关闭,它就可以关闭 Redis 连接,因为该连接将不会再被使用。
生成管理器任务
接下来,生成处理来自通道的消息的任务。首先,建立到 Redis 客户端的连接。然后,通过 Redis 连接发出收到的命令。
use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Start receiving messages
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
现在,更新这两个任务,通过通道发送命令,而不是直接在 Redis 连接上发布。
// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
在 main 函数的底部,我们 .await
join 句柄,以确保在进程退出前完全完成命令。
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
接收响应
最后一步是接收来自管理器任务的响应。GET命令需要获得数值,SET命令需要知道操作是否成功完成。
为了传递响应,使用了一个 oneshot 通道。oneshot 通道是一个单一生产者、单一消费者的通道,为发送单一数值而优化。在我们的例子中,这个单一的值就是响应。
与mpsc类似,oneshot::channel()
返回一个 sender 和 receiver 句柄。
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
与 mpsc 不同,没有指定容量,因为容量总是1。此外,两个句柄都不能被克隆。
为了接收来自管理任务的响应,在发送命令之前,创建了一个 oneshot 通道。该通道的 Sender 部分被包含在给管理任务的命令中。receive 部分用来接收响应。
首先,更新 Command 以包括 Sender。为了方便起见,使用一个类型别名来引用 Sender。
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Vec<u8>,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
现在,更新发布命令的任务,包括 oneshot::Sender
。
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: b"bar".to_vec(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
最后,更新管理任务,通过 oneshot 的通道发送响应:
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val.into()).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
在 oneshot::Sender
上调用 send 会立即完成,不需要 .await
。这是因为在 oneshot 通道上的发送将总是立即失败或成功,而不需要任何形式的等待。
在 oneshot 通道上发送一个值,当接收方的一半已经放弃时,返回 Err。这表明接收方对响应不再感兴趣了。在我们的方案中,接收方取消兴趣是一个可接受的事件。resp.send(...)
返回的Err不需要被处理。
背压和有界通道
无论何时引入并发或队列,都必须确保队列是有界的,系统将优雅地处理负载。无界的队列最终会占用所有可用的内存,导致系统以不可预测的方式失败。
Tokio 小心避免隐性队列。这其中很大一部分是由于异步操作是 lazy 的。请考虑以下情况:
loop {
async_op();
}
如果异步操作急切地运行,循环将重复排队运行一个新的 async_op
,而不确保之前的操作完成。这就导致了隐性的无边界队列。基于回调的系统和基于急切的 future 的系统特别容易受此影响。
然而,在Tokio和异步Rust中,上述片段根本不会导致 async_op
的运行。这是因为 .await
从未被调用。如果该片段被更新为使用 .await
,那么循环会等待操作完成后再重新开始。
loop {
// Will not repeat until `async_op` completes
async_op().await;
}
必须明确地引入并发和队列。做到这一点的方法包括:
tokio::spawn
select!
join!
mpsc::channel
在这样做的时候,要注意确保并发的总量是有界限的。例如,当编写一个TCP接受循环时,要确保打开的套接字的总数是有限制的。当使用 mpsc::channel
时,选择一个可管理的通道容量。具体的约束值将是特定于应用的。
注意和挑选好的界限是编写可靠的 tokio 应用程序的一个重要部分。
7 - I/O
Tokio中的 I/O 操作方式与 std 中大致相同,但是是异步的。有一个特质用于读取(AsyncRead)和一个特质用于写入(AsyncWrite)。特定的类型根据情况实现这些特性(TcpStream、File、Stdout)。AsyncRead 和 AsyncWrite 也由一些数据结构实现,如 Vec<u8>
和 &[u8]
。这允许在期望有读写器的地方使用字节数组。
本页将介绍 Tokio 的基本 I/O 读取和写入,并通过几个例子进行说明。下一页将介绍一个更高级的 I/O 例子。
AsyncRead 和 AsyncWrite
这两个特性提供了异步读取和写入字节流的设施。这些特质上的方法通常不被直接调用,类似于你不手动调用 Future 特质的 poll 方法。相反,你将通过AsyncReadExt
和 AsyncWriteExt
提供的实用方法使用它们。
让我们简单地看看这些方法中的几个。所有这些函数都是异步的,必须与 .await
一起使用。
async fn read()
AsyncReadExt::read
提供了一个向缓冲区读取数据的异步方法,返回读取的字节数。
注意:当 read() 返回 Ok(0) 时,这表示流已经关闭。任何对 read() 的进一步调用将立即以 Ok(0) 完成。对于TcpStream实例,这意味着套接字的读取部分被关闭。
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
async fn read_to_end()
AsyncReadExt::read_to_end
从流中读取所有字节直到EOF。
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}
async fn write()
AsyncWriteExt::write 将缓冲区写入写入器,返回写入的字节数。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
async fn write_all()
AsyncWriteExt::write_all 将整个缓冲区写入写入器。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = File::create("foo.txt").await?;
buffer.write_all(b"some bytes").await?;
Ok(())
}
这两个特性都包括许多其他有用的方法。请参阅API文档以获得一个全面的列表。
辅助函数
此外,就像std一样,tokio::io模块包含一些有用的实用函数,以及用于处理标准输入、标准输出和标准错误的API。例如,tokio::io::copy 异步地将一个 reader 的全部内容复制到一个 writer。
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
注意,这使用了字节数组也实现了 AsyncRead 这一事实。
Echo server
让我们练习做一些异步I/O。我们将编写一个 echo 服务器。
echo服务器绑定了一个 TcpListener 并在一个循环中接受入站连接。对于每个入站连接,数据被从套接字中读取并立即写回套接字。客户端向服务器发送数据,并接收完全相同的数据回来。
我们将使用稍微不同的策略实现两次 echo 服务器。
使用io::copy()
这是一个TCP服务器,需要一个接受循环。一个新的任务被生成以处理每个被接受的套接字。
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Copy data here
});
}
}
正如前面所看到的,这个实用程序需要一个 reader 和一个 writer,并将数据从一个复制到另一个。然而,我们只有一个单一的TcpStream。这个单一的值同时实现了 AsyncRead 和 AsyncWrite。因为 io::copy
对 reader 和 writer 两者都要求&mut,所以socket不能用于两个参数。
// This fails to compile
io::copy(&mut socket, &mut socket).await
拆分reader + writer
为了解决这个问题,我们必须把套接字分成一个reader句柄和一个writer句柄。拆分读写器组合的最佳方法取决于具体的类型。
任何读写器类型都可以使用 io::split
函数进行分割。这个函数接受一个单一的值,并返回单独的 reader 和 writer 句柄。这两个句柄可以独立使用,包括来自不同的任务。
例如,echo客户端可以这样处理并发的读和写。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Write data in the background
let write_task = tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Sometimes, the rust type inferencer needs
// a little help
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
因为 io::split
支持任何实现 AsyncRead + AsyncWrite
并返回独立句柄的值,在内部 io::split
使用一个Arc和一个Mutex。这种开销可以通过TcpStream来避免。TcpStream提供了两个专门的分割函数。
TcpStream::split
接收流的 reference ,并返回 reader 和 writer 句柄。因为使用了reference,所以这两个句柄必须留在 split() 被调用的同一个任务上。这种专门的 split 是零成本的。不需要Arc或Mutex。TcpStream 还提供了 into_split,它支持可以跨任务移动的句柄,但只需要一个Arc。
因为 io::copy()
是在拥有TcpStream的同一个任务中调用的,我们可以使用 TcpStream::split
。处理 echo 逻辑的任务变成了。
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});
手动复制
现在让我们来看看我们如何通过手动复制数据来编写 echo 服务器。为了做到这一点,我们使用 AsyncReadExt::read
和 AsyncWriteExt::write_all
。
完整的 echo 服务器如下:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
// Unexpected socket error. There isn't much we can
// do here so just stop processing.
return;
}
}
Err(_) => {
// Unexpected socket error. There isn't much we can do
// here so just stop processing.
return;
}
}
}
});
}
}
让我们把它分解一下。首先,由于使用了AsyncRead 和 AsyncWrite 工具,扩展特性必须被带入范围。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
分配缓冲区
该策略是将一些数据从套接字中读入一个缓冲区,然后将缓冲区的内容写回套接字。
let mut buf = vec![0; 1024];
堆栈缓冲区被明确地避免了。回顾前面,我们注意到,所有跨调用 .await
的任务数据都必须由任务存储。在这种情况下,buf 被用于跨越 .await
的调用。所有的任务数据被存储在一个单一的分配中。你可以把它看作是一个枚举,每个变量都是需要为特定的 .await
调用存储的数据。
如果缓冲区由堆栈数组表示,每个接受的套接字所产生的任务的内部结构可能看起来像。
struct Task {
// internal task fields here
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}
}
}
如果使用堆栈数组作为缓冲区类型,它将被内联存储在任务结构中。这将使任务结构变得非常大。此外,缓冲区的大小通常是以页为单位的。这将反过来使任务成为一个尴尬的大小:$page-size + a-few-bytes。
编译器对异步块的布局的优化比基本枚举更进一步。在实践中,变量不会像枚举所要求的那样在变体之间移动。然而,任务结构的大小至少和最大的变量一样大。
正因为如此,为缓冲区使用专门的分配通常更有效率。
处理EOF
当TCP流的读取部分被关闭时,对 read() 的调用返回 Ok(0) 。在这一点上退出读循环是很重要的。忘记在EOF时退出读循环是一个常见的错误来源。
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
// ... other cases handled here
}
}
忘记从读循环中断开,通常会导致100%的CPU无限循环情况。由于套接字被关闭,socket.read()立即返回。循环就会永远重复下去。
8 - 分帧
我们现在将应用我们刚刚学到的关于I/O的知识,实现 Mini-Redis 的分帧层。成帧是将一个字节流转换为一个分帧流的过程。一个帧是两个对等体之间传输的数据单位。Redis协议的帧定义如下。
use bytes::Bytes;
enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}
请注意,该分帧只由数据组成,没有任何语义。命令的解析和执行发生在更高的层次。
对于HTTP,分帧可能看起来像:
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}
为了实现 Mini-Redis 的分帧,我们将实现一个 Connection 结构,它包裹着一个TcpStream并读写 mini_redis::Frame
值。
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};
struct Connection {
stream: TcpStream,
// ... other fields here
}
impl Connection {
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// implementation here
}
/// Write a frame to the connection.
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// implementation here
}
}
带缓冲的读取
read_frame 方法在返回之前会等待一整帧的接收。对 TcpStream::read() 的一次调用可以返回一个任意数量的数据。它可能包含一整个帧,一个部分帧,或者多个帧。如果收到一个部分帧,数据被缓冲,并从套接字中读取更多数据。如果收到多个帧,则返回第一个帧,其余的数据被缓冲,直到下次调用 read_frame。
为了实现这一点,Connection需要一个读取缓冲区字段。数据从套接字中读入读取缓冲区。当一个帧被解析后,相应的数据就会从缓冲区中移除。
我们将使用BytesMut作为缓冲区类型。这是Bytes的一个可变版本。
use bytes::BytesMut;
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}
以及Connection上的read_frame()函数:
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}
// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}
在处理字节数组和 read 时,我们还必须维护一个游标,跟踪有多少数据被缓冲。我们必须确保将缓冲区的空部分传递给 read()。否则,我们会覆盖缓冲区的数据。如果我们的缓冲区被填满了,我们必须增加缓冲区,以便继续读取。在 parse_frame() 中(不包括),我们需要解析 self.buffer[..self.cursor]
所包含的数据。
因为将字节数组与游标配对是非常常见的,bytes
crate提供了一个代表字节数组和游标的抽象概念。Buf
特性是由可以读取数据的类型实现的。BufMut
特性是由可以写入数据的类型实现的。当把一个 T: BufMut
传递给 read_buf()
时,缓冲区的内部游标会被 read_buf
自动更新。正因为如此,在我们版本的read_frame中,我们不需要管理我们自己的游标。
此外,当使用 Vec<u8>
时,缓冲区必须被初始化。 vec![0; 4096]
分配了一个4096字节的数组,并将0写入每个条目。当调整缓冲区的大小时,新的容量也必须用零来初始化。初始化过程是不没有开销的。当使用 BytesMut
和 BufMut
时,容量是不初始化的。BytesMut
的抽象防止我们读取未初始化的内存。这让我们避免了初始化的步骤。
解析
现在,让我们看一下parse_frame()函数。解析工作分两步进行。
- 确保一个完整的帧被缓冲,并找到该帧的结束索引。
- 解析该帧。
mini-redis crate 为我们提供了一个用于这两个步骤的函数:
Frame::check
2Frame::parse
我们还将重用 Buf
的抽象来帮助。 Buf
被传入 Frame::check
。当检查函数遍历传入的缓冲区时,内部游标将被推进。当check返回时,缓冲区的内部游标指向帧的末端。
对于Buf类型,我们将使用 std::io::Cursor<&[u8]>
。
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;
fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);
// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);
// Parse the frame
let frame = Frame::parse(&mut buf)?;
// Discard the frame from the buffer
self.buffer.advance(len);
// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}
完整的 Frame::check
函数可以在这里找到。我们将不涉及它的全部内容。
需要注意的是,Buf的 “字节迭代器” 风格的API被使用。这些获取数据并推进内部游标。例如,为了解析一个帧,第一个字节被检查以确定该帧的类型。使用的函数是 Buf::get_u8
。这个函数获取当前光标位置的字节并使光标前进一个。
在Buf特性上还有更多有用的方法。查看API文档以了解更多细节。
带缓冲的写入
分帧API的另一半是 write_frame(frame)
函数。这个函数将整个帧写到套接字中。为了尽量减少 write
系统调用,写将被缓冲。一个写缓冲区被维护,帧在被写入套接字之前被编码到这个缓冲区。然而,与 read_frame()
不同的是,在写入套接字之前,整个帧并不总是被缓冲到一个字节数组中。
考虑一个批量流帧。被写入的值是 Frame::Bulk(Bytes)
。散装帧的线格式是一个帧头,它由$字符和以字节为单位的数据长度组成。帧的大部分是Bytes值的内容。如果数据很大,把它复制到一个中间缓冲区将是很昂贵的。
为了实现缓冲写入,我们将使用 BufWriter
结构。这个结构被初始化为一个 T: AsyncWrite
并实现 AsyncWrite
本身。当在 BufWriter
上调用写时,写不会直接进入内部写器,而是进入一个缓冲区。当缓冲区满的时候,内容会被刷到内部写入器上,内部缓冲区被清空。还有一些优化措施,允许在某些情况下绕过缓冲区。
作为教程的一部分,我们将不尝试完整地实现 write_frame()
。请看这里的完整实现。
首先,Connection结构被更新。
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}
接下来,write_frame()被实现:
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
这里使用的函数是由 AsyncWriteExt
提供的。 它们在 TcpStream
上也是可用的,但是在没有中间缓冲区的情况下发出单字节的写入是不可取的。
write_u8
向写程序写一个字节。write_all
将整个片断写入写入器。write_decimal
是由 mini-redis 实现的。
该函数以调用 self.stream.flush().await
结束。因为 BufWriter
将写入的数据存储在一个中间缓冲区中,对写入的调用并不能保证数据被写入套接字。在返回之前,我们希望帧被写到套接字中。对 flush()
的调用将缓冲区中的任何数据写到套接字。
另一个选择是不在 write_frame()
中调用 flush()
。相反,在 Connection
上提供一个 flush()
函数。这将允许调用者在写缓冲区中写入多个小帧的队列,然后用一个写系统调用将它们全部写入套接字。这样做会使 Connection API
复杂化。简化是 Mini-Redis
的目标之一,所以我们决定将 flush().await
调用包含在 fn write_frame()
中。
9 - 深入异步
在这一点上,我们已经完成了对异步 Rust 和 Tokio 的相当全面的考察。现在我们将深入挖掘Rust的异步运行时模型。在教程的一开始,我们就暗示过,异步Rust采取了一种独特的方法。现在,我们解释一下这意味着什么。
Futures
作为快速回顾,让我们采取一个非常基本的异步函数。与本教程到目前为止所涉及的内容相比,这并不新鲜。
use tokio::net::TcpStream;
async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}
我们调用这个函数,它返回一些值。我们在这个值上调用.await。
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// Nothing has been printed yet.
what_is_this.await;
// Text has been printed and socket has been
// established and closed.
}
my_async_fn()
返回的值是一个future。future是一个实现了标准库所提供的 std::future::Future
特性的值。它们是包含正在进行的异步计算的值。
std::future::Future
trait的定义是:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}
相关类型 Output
是 future 完成后产生的类型。Pin 类型是Rust能够支持异步函数中的借用的方式。更多细节请参见标准库文档。
与其他语言实现 future 的方式不同,Rust future 并不代表在后台发生的计算,相反,Rust future就是计算本身。Future的所有者负责通过轮询Future来推进计算。这可以通过调用 Future::poll
来实现。
实现future
让我们来实现一个非常简单的future。这个future将:
- 等待到一个特定的时间点。
- 输出一些文本到STDOUT。
- 产生一个字符串。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
作为Future的Async fn
在main函数中,我们实例化了future并对其调用 .await
。从异步函数中,我们可以对任何实现Future的值调用 .await
。反过来,调用一个异步函数会返回一个实现Future的匿名类型。在 async fn main()
的例子中,生成的future大致是这样的。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
Rust futures是一种状态机。在这里,MainFuture
被表示为一个 future 的可能状态的枚举。future 在 State0 状态下开始。当 poll
被调用时,future 试图尽可能地推进其内部状态。如果 future 能够完成,Poll::Ready
将被返回,其中包含异步计算的输出。
如果future不能完成,通常是由于它所等待的资源没有准备好,那么就会返回 Poll::Pending
。收到 Poll::Pending
是向调用者表明,future 将在稍后的时间完成,调用者应该在稍后再次调用poll。
我们还看到,future 是由其他 future 组成的。在外层 future 上调用 poll 的结果是调用内部 future 的 poll
函数。
executors
异步的Rust函数返回future。future必须被调用 poll 以推进其状态。future是由其他 future 组成的。那么,问题来了,是什么在最外层的 future 上调用poll?
回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn
或者是被 #[tokio::main]
注释的主函数。这样做的结果是将生成的外层 future 提交给 Tokio执行器。执行器负责在外部 future 上调用 Future::poll
,推动异步计算的完成。
mini Tokio
为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的最小版本的Tokio! 完整的代码可以在这里找到。
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
这将运行异步块。一个具有所要求的延迟的 Delay
实例被创建并被等待。然而,到目前为止,我们的实现有一个重大缺陷。我们的执行器从未进入睡眠状态。执行器不断地循环所有被催生的 future,并对它们进行 poll 。大多数时候,这些 future 还没有准备好执行更多的工作,并会再次返回 Poll::Pending
。这个过程会消耗CPU,一般来说效率不高。
理想情况下,我们希望 mini-tokio 只在 future 能够取得进展时 poll future。这发生在任务被阻塞的资源准备好执行请求的操作时。如果任务想从一个TCP套接字中读取数据,那么我们只想在TCP套接字收到数据时 poll 任务。在我们的例子中,任务在达到给定的瞬间被阻断。理想情况下,mini-tokio只会在那个瞬间过去后 poll 任务。
为了实现这一点,当一个资源被 poll 而该资源又还没有准备好时,一旦它过渡到 ready 的状态,该资源将发送一个通知。
Wakers
Waker 是缺失的那部分。这是一个系统,通过这个系统,资源能够通知等待的任务,资源已经准备好继续某些操作。
让我们再看一下Future::poll的定义:
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
Poll 的 Context 参数有一个 waker() 方法。该方法返回一个与当前任务绑定的Waker。该Waker有一个wake()方法。调用该方法向执行器发出信号,相关任务应该被安排执行。当资源过渡到准备好的状态时调用wake(),通知执行者,poll 任务将能够取得进展。
更新 Delay
我们可以更新 Delay 来使用 wakers。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
现在,一旦请求的持续时间过了,调用的任务就会被通知,执行者可以确保任务被再次安排。下一步是更新mini-tokio以监听唤醒通知。
我们的 Delay 实现还有一些剩余的问题。我们将在以后修复它们。
当一个 future 返回
Poll::Pending
时,它必须确保在某个时间点对 waker 发出信号。忘记这样做会导致任务无限期地挂起。在返回
Poll::Pending
后忘记唤醒一个任务是一个常见的错误来源。
回顾一下 “Delay"的第一次迭代。这里是 future 的实现。
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
在返回 Poll::Pending
之前,我们调用 cx.waker().wake_by_ref()
。这是为了满足 future 契约。通过返回 Poll::Pending
,我们负责给唤醒者发信号。因为我们还没有实现定时器线程,所以我们在内联中给唤醒者发信号。这样做的结果是,future 将立即被重新安排,再次执行,而且可能还没有准备好完成。
请注意,允许对 waker 发出超过必要次数的信号。在这个特殊的例子中,即使我们根本没有准备好继续操作,我们还是向唤醒者发出信号。除了浪费一些CPU周期外,这样做并没有什么问题。然而,这种特殊的实现方式会导致一个繁忙的循环。
更新Mini Tokio
下一步是更新 Mini Tokio 以接收 waker 的通知。我们希望执行器只在被唤醒时运行任务,为了做到这一点,Mini Tokio将提供它自己的唤醒器。当唤醒者被调用时,其相关的任务将被排队执行。Mini Tokio在 poll future 时将这个 waker 传递给 future。
更新后的 Mini Tokio 将使用一个通道来存储预定任务。通道允许任务从任何线程被排队执行。Wakers 必须是 Send 和 Sync,所以我们使用来自crossbeam crate的通道,因为标准库的通道不是Sync。
Send和Sync特性是Rust提供的与并发性有关的标记特性。可以被发送到不同线程的类型是Send。大多数类型都是Send,但像Rc这样的类型则不是。可以通过不可变的引用并发访问的类型是Sync。一个类型可以是Send,但不是Sync–一个很好的例子是Cell,它可以通过不可变的引用被修改,因此并发访问是不安全的。
更多细节请参见Rust书中的相关章节。
然后,更新MiniTokio的结构。
use crossbeam::channel;
use std::sync::Arc;
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// This will be filled in soon.
}
Wakers 是 sync,并且可以被克隆。当 wake 被调用时,任务必须被安排执行。为了实现这一点,我们有一个通道。当 wake()
被调用时,任务被推到通道的发送部分。我们的 task 结构将实现唤醒逻辑。要做到这一点,它需要同时包含催生的future 和通道的发送部分。
use std::sync::{Arc, Mutex};
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
为了安排任务,Arc被克隆并通过通道发送。现在,我们需要将我们的 schedule 函数与 std::task::Waker
挂钩。标准库提供了一个低级别的API,通过手动构建vtable来完成这个任务。这种策略为实现者提供了最大的灵活性,但需要一堆不安全的模板代码。我们不直接使用 RawWakerVTable
,而是使用由futures crate提供的ArcWake工具。这使得我们可以实现一个简单的特质,将我们的任务结构暴露为一个waker。
在你的Cargo.toml中添加以下依赖,以拉入future。
futures = "0.3"
然后实现 futures::task::ArcWake
。
use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
当上面的定时器线程调用waker.wake()时,任务被推送到通道中。接下来,我们在MiniTokio::run()函数中实现接收和执行任务。
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
// Spawns a new taks with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
这里发生了多件事情。首先,MiniTokio::run()
被实现。该函数在一个循环中运行,接收来自通道的预定任务。由于任务在被唤醒时被推入通道,这些任务在执行时能够取得进展。
此外,MiniTokio::new()
和 MiniTokio::spwn()
函数被调整为使用通道而不是 VecDeque
。当新的任务被催生时,它们会被赋予一个通道的发送者部分的克隆,任务可以用它来在运行时安排自己。
Task::poll()
函数使用来自 futures crate 的 ArcWake
工具创建waker。waker被用来创建一个 task::Context
。该 task::Context
被传递给 poll。
摘要
我们现在已经看到了一个端到端的例子,说明异步Rust是如何工作的。Rust的 async/await
功能是由traits支持的。这允许第三方crate,如Tokio,提供执行细节。
- Rust的异步操作是 lazy 的,需要调用者来 poll 它们。
- Wakers被传递给futures,以将一个future与调用它的任务联系起来。
- 当一个资源没有准备好完成一个操作时,
Poll::Pending
被返回,任务的waker被记录。 - 当资源准备好时,任务的 waker 会被通知。
- 执行者收到通知并安排任务的执行。
- 任务再次被 poll ,这次资源已经准备好了,任务取得了进展。
某些未尽事宜
记得我们在实现 Delay future 的时候,说过还有一些事情要解决。Rust的异步模型允许单个future在执行时跨任务迁移。考虑一下下面的情况。
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
poll_fn
函数使用闭包创建Future实例。上面的片段创建了一个Delay实例,对其进行了一次轮询,然后将Delay实例发送到一个新的任务中等待。在这个例子中,Delay::poll在不同的Waker实例中被调用了不止一次。当这种情况发生时,你必须确保在最近一次调用 poll 时所传递的 Waker上调用 wake。
当实现 future 时,关键是要假设每一次对poll的调用都可能提供一个不同的Waker实例。poll 函数必须用新的唤醒者来更新任何先前记录的唤醒者。
我们早期实现的Delay在每次 poll 时都会产生一个新的线程。这很好,但是如果 poll 太频繁的话,效率就会很低(例如,如果你 select! 这个future和其他的future,只要其中一个有事件,这两个都会被poll)。一种方法是记住你是否已经产生了一个线程,如果你还没有产生一个线程,就只产生一个新的线程。然而,如果你这样做,你必须确保线程的Waker在以后调用 poll 时被更新,否则你就不能唤醒最近的Waker。
为了修复我们之前的实现,我们可以这样做。
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
// This Some when we have spawned a thread, and None otherwise.
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
}
这有点复杂,但想法是,在每次调用 poll 时,future 会检查所提供的 waker 是否与之前记录的 waker 相匹配。如果两个 waker 匹配,那么就没有其他事情要做。如果不匹配,则必须更新记录的 waker。
Notify 工具
我们演示了如何使用wakers手工实现一个 Delay future。Wakers是异步Rust工作方式的基础。通常情况下,没有必要深入到这个水平。例如,在Delay的情况下,我们可以通过使用 tokio::sync::Notify
工具,完全用 async/await
实现它。这个工具提供了一个基本的任务通知机制。它处理了waker的细节,包括确保记录的waker与当前任务相匹配。
使用Notify,我们可以像这样用 async/await
实现一个 Delay 函数。
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify2.notify_one();
});
notify.notified().await;
}
10 - select
到目前为止,当我们想给系统添加并发性时,我们会生成一个新的任务。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。
tokio::select!
tokio::select!
宏允许在多个异步计算中等待,并在单个计算完成后返回。
比如说:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
使用了两个 oneshot 通道。任何一个通道都可以先完成。select!
语句在两个通道上等待,并将 val 与任务返回的值绑定。当 tx1 或 tx2 完成时,相关的块被执行。
没有完成的分支被放弃。在这个例子中,计算正在等待每个通道的 oneshot::Receiver
。尚未完成的通道的 oneshot::Receiver
被放弃。
取消
在异步Rust中,取消操作是通过丢弃一个 future 来实现的。回顾 “Async in depth”,异步Rust操作是使用 futures 实现的,而 futures 是 lazy 的。只有当期货被 poll 时,操作才会继续进行。如果future被丢弃,操作就不能进行,因为所有相关的状态都被丢弃了。
也就是说,有时一个异步操作会催生后台任务或启动其他在后台运行的操作。例如,在上面的例子中,一个任务被催生出来,以发送一个消息回来。通常情况下,该任务会进行一些计算来生成数值。
Futures或其他类型可以实现 Drop
来清理后台资源。Tokio 的 oneshot::Receiver
通过向 Sender
half 发送一个关闭的通知来实现 Drop
。sender 部分可以收到这个通知,并通过丢弃来中止正在进行的操作。
use tokio::sync::oneshot;
async fn some_operation() -> String {
// Compute value here
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
// Select on the operation and the oneshot's
// `closed()` notification.
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
_ = tx1.closed() => {
// `some_operation()` is canceled, the
// task completes and `tx1` is dropped.
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
Future 实现
为了帮助更好地理解 select!
的工作原理,让我们看看一个假想的Future实现是什么样子的。这是一个简化版本。在实践中,select!
包括额外的功能,如随机选择要先 poll 的分支。
use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MySelect {
rx1: oneshot::Receiver<&'static str>,
rx2: oneshot::Receiver<&'static str>,
}
impl Future for MySelect {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
println!("rx1 completed first with {:?}", val);
return Poll::Ready(());
}
if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
println!("rx2 completed first with {:?}", val);
return Poll::Ready(());
}
Poll::Pending
}
}
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// use tx1 and tx2
MySelect {
rx1,
rx2,
}.await;
}
MySelect future 包含每个分支的future。当MySelect被 poll 时,第一个分支被 poll。如果它准备好了,该值被使用,MySelect完成。在 .await
收到一个future的输出后,该future被放弃。这导致两个分支的futures都被丢弃。由于有一个分支没有完成,所以该操作实际上被取消了。
请记住上一节的内容:
当一个 future 返回
Poll::Pending
时,它必须确保在未来的某个时间点上对 waker 发出信号。如果忘记这样做,任务就会被无限期地挂起。
在 MySelect
的实现中,没有明确使用 Context
参数。相应的是,waker的要求是通过传递 cx 给内部 future 来满足的。由于内部 future 也必须满足waker的要求,通过只在收到内部 future 的 Poll::Pending
时返回 Poll::Pending
,MySelect
也满足 waker 的要求。
语法
选择 select!
宏可以处理两个以上的分支。目前的限制是64个分支。每个分支的结构为:
<pattern> = <async expression> => <handler>,
当 select 宏被评估时,所有的 <async expression>
被聚集起来并同时执行。当一个表达式完成时,其结果与 <pattern>
匹配。如果结果与模式匹配,那么所有剩余的异步表达式被放弃,<handler>
被执行。<handler>
表达式可以访问由 <pattern>
建立的任何绑定关系。
基本情况是 <pattern>
是一个变量名,异步表达式的结果被绑定到该变量名,<handler>
可以访问该变量。这就是为什么在最初的例子中,val
被用于<pattern>
,而 <handler>
能够访问 val
。
如果 <pattern>
与异步计算的结果不匹配,那么剩下的异步表达式继续并发地执行,直到下一个表达式完成。这时,同样的逻辑被应用于该结果。
因为 select!
可以接受任何异步表达式,所以可以定义更复杂的计算来进行选择。
在这里,我们在一个 oneshot 通道和一个TCP连接的输出上进行选择。
use tokio::net::TcpStream;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
// Spawn a task that sends a message over the oneshot
tokio::spawn(async move {
tx.send("done").unwrap();
});
tokio::select! {
socket = TcpStream::connect("localhost:3465") => {
println!("Socket connected {:?}", socket);
}
msg = rx => {
println!("received message first {:?}", msg);
}
}
}
在这里,我们选择了一个 onehot 并接受来自 TcpListener 的套接字。
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(()).unwrap();
});
let mut listener = TcpListener::bind("localhost:3465").await?;
tokio::select! {
_ = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
accept 循环一直运行到遇到错误或 rx
收到一个值。_模式表示我们对异步计算的返回值不感兴趣。
返回值
tokio::select!
宏返回被评估的 <handler>
表达式的结果。
async fn computation1() -> String {
// .. computation
}
async fn computation2() -> String {
// .. computation
}
#[tokio::main]
async fn main() {
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};
println!("Got = {}", out);
}
正因为如此,要求每个分支的 <handler>
表达式求值为同一类型。如果不需要 select!
表达式的输出,让表达式求值为()是很好的做法。
错误
使用?
操作符会从表达式中传播错误。这取决于是在异步表达式中还是在处理程序中使用?
在一个异步表达式中使用?
操作符会将错误从异步表达式中传播出去。这使得异步表达式的输出成为一个结果。在 handler 中使用?
会立即将错误从 select!
表达式中传播出去。让我们再看一下接受循环的例子。
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// [setup `rx` oneshot channel]
let listener = TcpListener::bind("localhost:3465").await?;
tokio::select! {
res = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {
res?;
}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
注意 listener.accept().await?
操作符将错误从该表达式中传播出来,并传播到 res
绑定中。在发生错误时, res 将被设置为 Err(_)
。然后,在处理程序中,再次使用?
操作符。res?
语句将把一个错误从主函数中传播出去。
模式匹配
回顾一下,select!
宏分支语法被定义为:
<pattern> = <async expression> => <handler>,
到目前为止,我们只使用了 <pattern>
的变量绑定。然而,任何Rust模式都可以被使用。例如,假设我们从多个MPSC通道接收信息,我们可以这样做。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);
tokio::spawn(async move {
// Do something w/ `tx1` and `tx2`
});
tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
println!("Both channels closed");
}
}
}
借用
当催生任务时,被催生的异步表达式必须拥有其所有的数据。select!
宏没有这个限制。每个分支的异步表达式都可以借用数据并同时操作。按照Rust的借用规则,多个异步表达式可以不变地借用一个数据,或者一个异步表达式可以可变地借用一个数据。
我们来看看一些例子。在这里,我们同时向两个不同的TCP目的地发送相同的数据。
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;
async fn race(
data: &[u8],
addr1: SocketAddr,
addr2: SocketAddr
) -> io::Result<()> {
tokio::select! {
Ok(_) = async {
let mut socket = TcpStream::connect(addr1).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
Ok(_) = async {
let mut socket = TcpStream::connect(addr2).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
else => {}
};
Ok(())
}
data 变量被从两个异步表达式中不可变地借用。当其中一个操作成功完成时,另一个就会被放弃。因为我们在 Ok(_)
上进行模式匹配,如果一个表达式失败,另一个表达式继续执行。
当涉及到每个分支的 <handler>
时,select!
保证只运行一个 <handler>
。正因为如此,每个<handler>
都可以相互借用相同的数据。
例如,这在两个处理程序中都修改了out:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut out = String::new();
tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});
tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}
println!("{}", out);
}
循环
select!
宏经常在循环中使用。本节将通过一些例子来说明在循环中使用 select!
宏的常见方法。我们首先在多个通道上进行选择。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);
loop {
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};
println!("Got {}", msg);
}
println!("All channels have been closed.");
}
这个例子在三个通道的接收器上进行 select。当在任何通道上收到消息时,它被写入STDOUT。当一个通道被关闭时,recv()
以None返回。通过使用模式匹配,select!
宏继续在其余通道上等待。当所有的通道都关闭时,else分支被评估,循环被终止。
select!
宏随机挑选分支,首先检查是否准备就绪。当多个通道有等待值时,将随机挑选一个通道来接收。这是为了处理这样的情况:接收循环处理消息的速度比推入通道的速度慢,也就是说,通道开始被填满。如果 select!
不随机挑选一个分支先检查,在循环的每个迭代中,rx1将被首先检查。如果rx1总是包含一个新的消息,其余的通道将永远不会被检查。
如果当select!被评估时,多个通道有待处理的消息,只有一个通道有一个值被弹出。所有其他的通道保持不动,它们的消息保持在这些通道中,直到下一个循环迭代。没有消息丢失。
恢复异步操作
现在我们将展示如何在多次调用 select!
时运行一个异步操作。在这个例子中,我们有一个MPSC通道,类型为i32
,还有一个异步函数。我们想运行异步函数,直到它完成或在通道上收到一个偶数整数。
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action();
tokio::pin!(operation);
loop {
tokio::select! {
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
请注意,不是在 select!
宏中调用 action()
,而是在循环之外调用它。action()
的返回被分配给 operation,而不调用 .await
。然后我们在 operation 上调用 tokio::pin!
在 select!
循环中,我们没有传入 operation
,而是传入 &mut operation
。operation
变量正在跟踪飞行中的异步操作。循环的每个迭代都使用相同的 operation,而不是对 action()
发出一个新的调用。
另一个 select!
分支从通道中接收消息。如果该消息是偶数,我们就完成了循环。否则,再次启动 select!
。
这是我们第一次使用 tokio::pin!
我们现在还不打算讨论 pining 的细节。需要注意的是,为了 .await
一个引用,被引用的值必须被 pin 或者实现 Unpin
。
如果我们删除 tokio::pin!
这一行,并尝试编译,我们会得到以下错误:
error[E0599]: no method named `poll` found for struct
`std::pin::Pin<&mut &mut impl std::future::Future>`
in the current scope
--> src/main.rs:16:9
|
16 | / tokio::select! {
17 | | _ = &mut operation => break,
18 | | Some(v) = rx.recv() => {
19 | | if v % 2 == 0 {
... |
22 | | }
23 | | }
| |_________^ method not found in
| `std::pin::Pin<&mut &mut impl std::future::Future>`
|
= note: the method `poll` exists but the following trait bounds
were not satisfied:
`impl std::future::Future: std::marker::Unpin`
which is required by
`&mut impl std::future::Future: std::future::Future`
虽然我们在上一章中介绍了 Future
,但这个错误仍然不是很清楚。如果你在试图对一个引用调用 .await
时遇到这样一个关于 Future
没有被实现的错误,那么这个Future可能需要被 pin
。
阅读更多关于标准库中的Pin。
修改分支
让我们来看看一个稍微复杂的循环。我们有:
- 一个
i32
值的通道。 - 一个在
i32
值上执行的异步操作。
我们要实现的逻辑是:
- 在通道上等待一个偶数。
- 使用偶数作为输入启动异步操作。
- 等待操作,但同时在通道上监听更多的偶数。
- 如果在现有的操作完成之前收到一个新的偶数,则中止现有的操作,用新的偶数重新开始操作。
async fn action(input: Option<i32>) -> Option<String> {
// If the input is `None`, return `None`.
// This could also be written as `let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// async logic here
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut done = false;
let operation = action(None);
tokio::pin!(operation);
tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});
loop {
tokio::select! {
res = &mut operation, if !done => {
done = true;
if let Some(v) = res {
println!("GOT = {}", v);
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}
我们使用的策略与前面的例子类似。async fn
在循环之外被调用,并被分配给 operation
。operation 变量被 pin 住。循环在 operation 和通道接收器上都进行select。
注意 action 是如何将 Option<i32>
作为参数的。在我们接收第一个偶数之前,我们需要将 operation 实例化为某种东西。我们让 action
接受 Option
并返回Option
。如果传入的是 None
,就会返回 None
。在第一个循环迭代中,operation
立即以 None
完成。
这个例子使用了一些新的语法。第一个分支包括 , if !done
。这是一个分支的前提条件。在解释它是如何工作的之前,让我们看一下如果省略了这个前提条件会发生什么。省略 , if !done
并运行这个例子的结果是如下输出。
thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
这个错误发生在试图使用已经完成的 operation
时。通常情况下,当使用 .await
时,被等待的值会被消耗。在这个例子中,我们对一个引用进行 await。这意味着 operation 在完成后仍然存在。
为了避免这种 panic,我们必须注意在 operation 完成后禁用第一个分支。done 变量用于跟踪 operation 是否完成。一个 select!
分支可能包括一个 precondition。这个前提条件在 select!
分支等待之前被检查。如果该条件被评估为false,那么该分支将被禁用。done变量被初始化为false。当 operation 完成后,done被设置为true。下一个循环迭代将禁用该操作分支。当从通道收到一个偶数信息时,operation 被重置,done被设置为false。
每任务的并发性
tokio::spoon
和 select!
都可以运行并发的异步操作。然而,用于运行并发操作的策略是不同的。tokio::spoon
函数接收一个异步操作并生成一个新的任务来运行它。任务是 Tokio 运行时安排的对象。两个不同的任务是由 Tokio 独立调度的。它们可能同时运行在不同的操作系统线程上。正因为如此,一个催生的任务和一个催生的线程有同样的限制:不能借用。
select!
宏在同一个任务上同时运行所有分支。因为 select!
宏的所有分支都在同一个任务上执行,所以它们永远不会同时运行。select!
宏在一个任务上复用异步操作。
11 - stream
流是一个数值的异步系列。它是 Rust 的 std::iter::Iterator
的异步等价物,由 Stream 特性表示。流可以在 async 函数中被迭代。它们也可以使用适配器进行转换。Tokio在 StreamExt
trait上提供了许多常见的适配器。
Tokio在一个单独的 tokio-stream
crate 中提供流支持:
tokio-stream = "0.1"
目前,Tokio 的 Stream工具存在于
tokio-stream
crate中。一旦 Stream 特性在 Rust 标准库中稳定下来,Tokio 的 Stream 工具将被移到 tokio crate 中。
迭代/Iteration
目前,Rust编程语言不支持异步 for 循环。相反,流的迭代是通过与 StreamExt::next()
搭配的 while let
循环完成的。
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(&[1, 2, 3]);
while let Some(v) = stream.next().await {
println!("GOT = {:?}", v);
}
}
像迭代器一样,next()
方法返回 Option<T>
,其中T是流的值类型。接收到 None 表示流的迭代已经结束。
Mini-Redis广播
让我们来看看使用 Mini-Redis 客户端的一个稍微复杂的例子。
完整的代码可以在这里找到。
use tokio_stream::StreamExt;
use mini_redis::client;
async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
// Publish some data
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}
async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber.into_stream();
tokio::pin!(messages);
while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}
Ok(())
}
#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});
subscribe().await?;
println!("DONE");
Ok(())
}
任务被派生出来,在 “number” 频道上向 Mini-Redis 服务器发布消息。然后,在主任务中,我们订阅 “number” 频道并显示收到的消息。
在订阅之后,into_stream()
被调用到返回的订阅者上。这将消耗 Subscriber ,返回一个 stream,在消息到达时产生消息。在我们开始迭代消息之前,请注意流是用 tokio::pin
pin 在栈上的。在一个流上调用 next()
需要流被 pin 住。into_stream()
函数返回的是一个没有 pin 的流,我们必须明确地 pin 它,以便对其进行遍历。
当一个 Rust 值在内存中不能再被移动时,它就被 “pin"了。被 pin 的值的一个关键属性是,指针可以被带到被 pin 的数据上,并且调用者可以确信该指针保持有效。这个特性被
async/await
用来支持跨.await
点借用数据。
如果我们忘记 pin 住流,就会出现这样的错误:
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
--> streams/src/main.rs:29:36
|
29 | while let Some(msg) = messages.next().await {
| ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
= note: required because it appears within the type `impl Stream`
= note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
如果你遇到这样的错误信息,请尝试 pin 住该值!
在尝试运行这个之前,启动Mini-Redis服务器:
$ mini-redis-server
然后尝试运行该代码。我们将看到输出到STDOUT的信息:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
一些早期的信息可能会被丢弃,因为订阅和发布之间存在着竞争。该程序永远不会退出。只要服务器处于活动状态,对 Mini-Redis 频道的订阅就会保持活动状态。
让我们看看我们如何用流来扩展这个程序。
适配器
接受一个 stream 并返回另一个 stream 的函数通常被称为 “stream adapter”,因为它们是 “适配器模式"的一种形式。常见的流适配器包括 map
、take
和 filter
。
让我们更新Mini-Redis,使其退出。在收到三条消息后,停止迭代消息。这是用 take
完成的。这个适配器将流限制为最多产生n条消息。
let messages = subscriber
.into_stream()
.take(3);
再次运行该程序,我们得到:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
这一次,程序结束了。
现在,让我们把信息流限制在单个字节。我们将通过检查消息的长度来检查这一点。我们使用 filter
适配器来放弃任何不符合前提条件的消息。
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);
现在,输出是:
got = b"1"
got = b"3"
got = b"6"
另一个选择是使用 filter_map
将 filter 和 map 步骤合并为一个单一的调用。
还有更多可用的适配器。请看这里的列表。
实现stream
stream 特征与 future 特征非常相似:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
Stream::poll_next()
函数很像 Future::poll
,只是它可以被反复调用,以便从流中接收许多值。就像我们在 Async in depth
中看到的那样,当一个流还没有准备好返回一个值时,就会返回 Poll::Pending
来代替。该任务的waker被注册。一旦流应该被再次poll,该唤醒者将被通知。
size_hint()
方法与迭代器的使用方法相同。
通常,当手动实现一个 stream 时,是通过组合 future 和其他流来完成的。作为一个例子,让我们以我们在 Async 中深入实现的 Delay future 为基础。我们将把它转换为一个stream,以10毫秒的间隔产生三次 ()
。
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
rem: usize,
delay: Delay,
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
if self.rem == 0 {
// No more delays
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}
async-stream
使用 Stream trait 手动实现流是很繁琐的。不幸的是,Rust编程语言还不支持用于定义流的 async/await 语法。这一点正在酝酿之中,但还没有准备好。
Async-stream
crate可以作为一个临时的解决方案。这个 crate 提供了一个 async_stream!
宏,将输入转化为一个流。使用这个create,上面的 interval 可以这样实现:
use async_stream::stream;
use std::time::{Duration, Instant};
stream! {
let mut when = Instant::now();
for _ in 0..3 {
let delay = Delay { when };
delay.await;
yield ();
when += Duration::from_millis(10);
}
}
12 - 桥接同步代码
在我们到目前为止看到的例子中,我们用 #[tokio::main]
标记了主函数,并使整个项目成为异步的。然而,这对所有项目来说都是不可取的。例如,一个GUI应用程序可能希望在主线程上运行GUI代码,并在另一个线程上运行Tokio运行时。
本页解释了如何将异步/等待隔离到项目的一小部分。
#[tokio::main]
扩展到什么?
#[tokio::main]
宏是一个将你的主函数替换为非同步主函数的宏,它启动一个运行时,然后调用你的代码。例如,这样的main函数
#[tokio::main]
async fn main() {
println!("Hello world");
}
通过这个宏变成了这样:
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
为了在我们自己的项目中使用 async/await
,我们可以做一些类似的事情,我们利用. block_on
方法在适当的时候进入异步环境。
到mini-redis的同步接口
在本节中,我们将讨论如何通过存储 Runtime 对象并使用其 block_on 方法来构建 mini-redis 的同步接口。在接下来的章节中,我们将讨论一些替代的方法,以及何时应该使用每种方法。
我们要封装的接口是异步的 client 类型。它有几个方法,我们将实现以下方法的阻塞版本:
为此,我们引入了一个名为 src/blocking_client.rs
的新文件,并用一个围绕 async Client 类型的封装结构来初始化它:
use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;
pub use crate::client::Message;
/// Established connection with a Redis server.
pub struct BlockingClient {
/// The asynchronous `Client`.
inner: crate::client::Client,
/// A `current_thread` runtime for executing operations on the
/// asynchronous client in a blocking manner.
rt: Runtime,
}
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
// Call the asynchronous connect method using the runtime.
let inner = rt.block_on(crate::client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
在这里,我们把构造函数作为第一个例子,说明如何在非同步上下文中执行异步方法。我们使用Tokio Runtime类型上的block_on方法来做到这一点,它执行一个异步方法并返回其结果。
一个重要的细节是对 current_thread 运行时的使用。通常在使用Tokio时,你会使用默认的 multi_thread 运行时,它将产生一堆后台线程,这样它就可以有效地同时运行许多东西。对于我们的用例,我们每次只做一件事,所以我们不会因为运行多个线程而获得任何好处。这使得 current_thread 运行时成为完美的选择,因为它不会产生任何线程。
enable_all
调用启用了 Tokio 运行时的IO和定时器驱动。如果它们没有被启用,运行时就无法执行IO或定时器。
因为 current_thread 运行时不产生线程,所以它只在 block_on 被调用时运行。一旦 block_on 返回,所有在该运行时上生成的任务将冻结,直到你再次调用 block_on。如果催生的任务在不调用 block_on 时必须继续运行,请使用 multi_threaded runtime。
一旦我们有了这个结构体,大部分的方法就很容易实现:
use bytes::Bytes;
use std::time::Duration;
impl BlockingClient {
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
self.rt.block_on(self.inner.get(key))
}
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
self.rt.block_on(self.inner.set(key, value))
}
pub fn set_expires(
&mut self,
key: &str,
value: Bytes,
expiration: Duration,
) -> crate::Result<()> {
self.rt.block_on(self.inner.set_expires(key, value, expiration))
}
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.rt.block_on(self.inner.publish(channel, message))
}
}
Client::subscribe方法更有趣,因为它将 Client 转化为 Subscriber 对象。我们可以用以下方式实现它:
/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform
/// pub/sub related commands. The `BlockingClient` type is
/// transitioned to a `BlockingSubscriber` type in order to
/// prevent non-pub/sub methods from being called.
pub struct BlockingSubscriber {
/// The asynchronous `Subscriber`.
inner: crate::client::Subscriber,
/// A `current_thread` runtime for executing operations on the
/// asynchronous client in a blocking manner.
rt: Runtime,
}
impl BlockingClient {
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
Ok(BlockingSubscriber {
inner: subscriber,
rt: self.rt,
})
}
}
impl BlockingSubscriber {
pub fn get_subscribed(&self) -> &[String] {
self.inner.get_subscribed()
}
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
self.rt.block_on(self.inner.next_message())
}
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.subscribe(channels))
}
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.unsubscribe(channels))
}
}
因此,subscribe 方法将首先使用运行时将异步 Client 转化为异步 Subscriber。然后,它将把产生的 Subscriber 与运行时一起存储,并使用 block_on 实现各种方法。
请注意,异步 Subscriber 结构体有一个名为 get_subscribed 的非异步方法。为了处理这个问题,我们只需直接调用它而不涉及运行时。
其他方法
上节解释了实现同步包装器的最简单的方法,但这并不是唯一的方法。这些方法是:
- 创建一个 Runtime 并在异步代码上调用 block_on。
- 创建一个 Runtime 并在其上生成事物。
- 在一个单独的线程中运行 Runtime 并向其发送消息。
我们已经看到了第一种方法。其他两种方法概述如下。
在运行时上生成东西
运行时对象有一个叫做 spawn 的方法。当你调用这个方法时,你会创建一个新的在运行时上运行的后台任务。比如说
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
fn main() {
let runtime = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut handles = Vec::with_capacity(10);
for i in 0..10 {
handles.push(runtime.spawn(my_bg_task(i)));
}
// Do something time-consuming while the background tasks execute.
std::thread::sleep(Duration::from_millis(750));
println!("Finished time-consuming task.");
// Wait for all of them to complete.
for handle in handles {
// The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
// a future, so we can wait for it using `block_on`.
runtime.block_on(handle).unwrap();
}
}
async fn my_bg_task(i: u64) {
// By subtracting, the tasks with larger values of i sleep for a
// shorter duration.
let millis = 1000 - 50 * i;
println!("Task {} sleeping for {} ms.", i, millis);
sleep(Duration::from_millis(millis)).await;
println!("Task {} stopping.", i);
}
在上面的例子中,我们在运行时催生了 10 个后台任务,然后等待所有的任务。作为一个例子,这可能是在图形应用程序中实现后台网络请求的一个好方法,因为网络请求在GUI主线程上运行太耗时了。相反,你在后台运行的 Tokio 运行时上生成请求,并让任务在请求完成后将信息送回GUI代码,如果你想要一个进度条,甚至可以递增。
在这个例子中,重要的是,运行时被配置为多线程运行时。如果你把它改为 current_thread 运行时,你会发现耗时的任务会在任何后台任务开始之前完成。这是因为在 current_thread 运行时上生成的后台任务只在调用 block_on 时执行,否则运行时就没有地方可以运行它们。
这个例子通过对调用 spawn 返回的 JoinHandle 调用 block_on 来等待生成的任务完成,但这并不是唯一的方法。这里有一些替代方法。
- 使用消息传递通道,如
tokio::sync::mpsc
。 - 修改由 Mutex 等保护的共享值。这对GUI中的进度条来说是一个很好的方法,GUI在每一帧都会读取共享值。
spawn 方法在 Handle 类型上也是可用的。Handle 类型可以被克隆,以便在一个运行时中获得许多句柄,每个 Handle 都可以用来在运行时中生成新的任务。
发送消息
第三种技术是催生一个运行时,并使用消息传递来与之通信。这比其他两种方法涉及更多的模板,但它是最灵活的方法。你可以在下面找到一个基本的例子。
use tokio::runtime::Builder;
use tokio::sync::mpsc;
pub struct Task {
name: String,
// info that describes the task
}
async fn handle_task(task: Task) {
println!("Got task {}", task.name);
}
#[derive(Clone)]
pub struct TaskSpawner {
spawn: mpsc::Sender<Task>,
}
impl TaskSpawner {
pub fn new() -> TaskSpawner {
// Set up a channel for communicating.
let (send, mut recv) = mpsc::channel(16);
// Build the runtime for the new thread.
//
// The runtime is created before spawning the thread
// to more cleanly forward errors if the `unwrap()`
// panics.
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
rt.block_on(async move {
while let Some(task) = recv.recv().await {
tokio::spawn(handle_task(task));
}
// Once all senders have gone out of scope,
// the `.recv()` call returns None and it will
// exit from the while loop and shut down the
// thread.
});
});
TaskSpawner {
spawn: send,
}
}
pub fn spawn_task(&self, task: Task) {
match self.spawn.blocking_send(task) {
Ok(()) => {},
Err(_) => panic!("The shared runtime has shut down."),
}
}
}
这个例子可以用很多方式来配置。例如,你可以使用一个 Semaphore 来限制活动任务的数量,或者你可以使用一个相反方向的通道来向 spawner 发送响应。当你以这种方式催生一个运行时,它是一种 actor。