spawning
内容出处:https://tokio.rs/tokio/tutorial/spawning
我们将换个角度,开始在Redis服务器上工作。
首先,把上一节中的客户端SET/GET代码移到一个示例文件中。这样,我们就可以针对我们的服务器运行它:
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
然后创建一个新的、空的 src/main.rs
并继续。
接受套接字
我们的Redis服务器需要做的第一件事是接受入站的TCP套接字。这是用 tokio::net::TcpListener
完成的。
Tokio的许多类型与Rust标准库中的同步类型命名相同。在合理的情况下,Tokio暴露了与std相同的API,但使用了async fn。
TcpListener被绑定到6379端口,然后在一个循环中接受socket。每个套接字都被处理,然后关闭。现在,我们将读取命令,将其打印到stdout,并回应一个错误。
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
现在,运行这个 accept 循环:
$ cargo run
在一个单独的终端窗口,运行 hello-redis 的例子(上一节的SET/GET命令):
$ cargo run --example hello-redis
输出应该是:
Error: "unimplemented"
在服务器终端,输出是:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发
我们的服务器有一个小问题(除了只回应错误)。它一次处理一个入站请求。当一个连接被接受时,服务器停留在接受循环块内,直到响应被完全写入套接字。
我们希望我们的Redis服务器能够处理许多并发的请求。要做到这一点,我们需要增加一些并发性。
并发和并行是不一样的。如果你在两个任务之间交替进行,那么你就是在同时进行两个任务,但不是并行的。要想获得并行的资格,你需要两个人,一个人专门负责每个任务。
使用Tokio的一个优点是,异步代码允许你在许多任务上并发工作,而不必使用普通线程并行工作。事实上,Tokio可以在一个单线程上并发运行许多任务
为了并发地处理连接,为每个入站连接生成一个新的任务。连接在这个任务中被处理。
接受循环变成:
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}
任务
Tokio任务是一个异步的绿色线程。它们是通过传递一个异步块给 tokio::spawn
来创建的。tokio::spawn
函数返回 JoinHandle
,调用者可以用它来与生成的任务进行交互。该异步块可以有一个返回值。调用者可以使用 JoinHandle
上的 .await
获取返回值。
比如说:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
对JoinHandle的等待会返回一个 Result
。当任务在执行过程中遇到错误时,JoinHandle将返回 Err
。这发生在任务恐慌的时候,或者任务被运行时关闭而强行取消的时候。
任务是由调度器管理的执行单位。生成的任务提交给 Tokio 调度器,然后确保该任务在有工作要做时执行。生成的任务可以在它被生成的同一线程上执行,也可以在不同的运行时线程上执行。任务在被催生后也可以在线程之间移动。
Tokio中的任务是非常轻便的。在引擎盖下,它们只需要一次分配和64字节的内存。应用程序可以自由地生成数千,甚至数百万个任务。
'static
bound
当你在Tokio运行时中催生一个任务时,它的类型必须是 'static
的。这意味着生成的任务不能包含对任务之外拥有的数据的任何引用。
一个常见的误解是,
'static
总是意味着 “永远活着”,但事实并非如此。仅仅因为一个值是'static
,并不意味着你有内存泄漏。你可以在《常见的Rust寿命误解》中阅读更多内容。
例如,以下内容将无法编译:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
试图编译时,出现了以下错误:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
发生这种情况是因为,默认情况下,变量不会被 move 到异步块中。v向量仍然归主函数所有。rust编译器很好地解释了这一点,甚至还提出了解决方法 将第7行改为 task::spawn(async move {
将指示编译器将v move 到被催生的任务中。这样,该任务拥有其所有的数据,使其成为 ‘static。
如果数据必须被多个任务同时访问,那么它必须使用同步原语(如Arc)进行共享。
请注意,错误信息提到了参数类型超过了 'static
生命周期。这个术语可能相当令人困惑,因为 'static
生命周期一直持续到程序结束,所以如果它超过了这个生命周期,你不就有内存泄漏了吗?解释是,必须超过 'static
生命周期的是类型,而不是值,而且值可能在其类型不再有效之前被销毁。
当我们说一个值是 'static
的时候,所有的意思是,把这个值永远留在身边是不对的。这一点很重要,因为编译器无法推理出一个新产生的任务会停留多长时间,所以它能确保任务不会活得太久的唯一方法就是确保它可能永远存在。
上面信息框中的链接使用了 “bounded by 'static
” 的术语,而不是 “its type outlives 'static
” 或 “the value is 'static
” for T: 'static
。这些都是同一个意思,与 &'static T
中的注解 'static
是不同的。
Send
bound
由 tokio::spawn
产生的任务必须实现 Send
。这允许Tokio运行时在线程之间 move 任务,而这些任务在一个 .await
中被暂停。
当所有跨 .await
调用的数据都是Send时,任务就是Send。这有点微妙。当 .await
被调用时,任务就回到了调度器中。下一次任务被执行时,它将从最后的让出点恢复。为了使其正常工作,所有在 .await
之后使用的状态都必须由任务保存。如果这个状态是 Send
,即可以跨线程移动,那么任务本身也可以跨线程移动。反过来说,如果这个状态不是 Send
,那么任务也不是。
例如,这样就可以了:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
这并不是:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}
试图编译该片段的结果是:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将在下一章更深入地讨论这个错误的一个特例。
存储数值
我们现在将实现处理传入命令的 process
函数。我们将使用 HashMap 来存储值。SET命令将插入到HashMap中,GET 值将加载它们。此外,我们将使用一个循环来接受每个连接的一个以上的命令。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// A hashmap is used to store data
let mut db = HashMap::new();
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
// Use `read_frame` to receive a command from the connection.
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// The value is stored as `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
现在,启动服务器:
$ cargo run
并在一个单独的终端窗口中,运行hello-redis的例子:
$ cargo run --example hello-redis
现在,输出将是:
got value from the server; result=Some(b"world")
我们现在可以获取和设置值了,但有一个问题:这些值在连接之间是不共享的。如果另一个套接字连接并试图获取Hello键,它将不会发现任何东西。
你可以在这里找到完整的代码。
在下一节中,我们将实现对所有套接字的数据进行持久化。