select
到目前为止,当我们想给系统添加并发性时,我们会生成一个新的任务。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。
tokio::select!
tokio::select!
宏允许在多个异步计算中等待,并在单个计算完成后返回。
比如说:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
使用了两个 oneshot 通道。任何一个通道都可以先完成。select!
语句在两个通道上等待,并将 val 与任务返回的值绑定。当 tx1 或 tx2 完成时,相关的块被执行。
没有完成的分支被放弃。在这个例子中,计算正在等待每个通道的 oneshot::Receiver
。尚未完成的通道的 oneshot::Receiver
被放弃。
取消
在异步Rust中,取消操作是通过丢弃一个 future 来实现的。回顾 “Async in depth”,异步Rust操作是使用 futures 实现的,而 futures 是 lazy 的。只有当期货被 poll 时,操作才会继续进行。如果future被丢弃,操作就不能进行,因为所有相关的状态都被丢弃了。
也就是说,有时一个异步操作会催生后台任务或启动其他在后台运行的操作。例如,在上面的例子中,一个任务被催生出来,以发送一个消息回来。通常情况下,该任务会进行一些计算来生成数值。
Futures或其他类型可以实现 Drop
来清理后台资源。Tokio 的 oneshot::Receiver
通过向 Sender
half 发送一个关闭的通知来实现 Drop
。sender 部分可以收到这个通知,并通过丢弃来中止正在进行的操作。
use tokio::sync::oneshot;
async fn some_operation() -> String {
// Compute value here
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
// Select on the operation and the oneshot's
// `closed()` notification.
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
_ = tx1.closed() => {
// `some_operation()` is canceled, the
// task completes and `tx1` is dropped.
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
Future 实现
为了帮助更好地理解 select!
的工作原理,让我们看看一个假想的Future实现是什么样子的。这是一个简化版本。在实践中,select!
包括额外的功能,如随机选择要先 poll 的分支。
use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MySelect {
rx1: oneshot::Receiver<&'static str>,
rx2: oneshot::Receiver<&'static str>,
}
impl Future for MySelect {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
println!("rx1 completed first with {:?}", val);
return Poll::Ready(());
}
if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
println!("rx2 completed first with {:?}", val);
return Poll::Ready(());
}
Poll::Pending
}
}
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// use tx1 and tx2
MySelect {
rx1,
rx2,
}.await;
}
MySelect future 包含每个分支的future。当MySelect被 poll 时,第一个分支被 poll。如果它准备好了,该值被使用,MySelect完成。在 .await
收到一个future的输出后,该future被放弃。这导致两个分支的futures都被丢弃。由于有一个分支没有完成,所以该操作实际上被取消了。
请记住上一节的内容:
当一个 future 返回
Poll::Pending
时,它必须确保在未来的某个时间点上对 waker 发出信号。如果忘记这样做,任务就会被无限期地挂起。
在 MySelect
的实现中,没有明确使用 Context
参数。相应的是,waker的要求是通过传递 cx 给内部 future 来满足的。由于内部 future 也必须满足waker的要求,通过只在收到内部 future 的 Poll::Pending
时返回 Poll::Pending
,MySelect
也满足 waker 的要求。
语法
选择 select!
宏可以处理两个以上的分支。目前的限制是64个分支。每个分支的结构为:
<pattern> = <async expression> => <handler>,
当 select 宏被评估时,所有的 <async expression>
被聚集起来并同时执行。当一个表达式完成时,其结果与 <pattern>
匹配。如果结果与模式匹配,那么所有剩余的异步表达式被放弃,<handler>
被执行。<handler>
表达式可以访问由 <pattern>
建立的任何绑定关系。
基本情况是 <pattern>
是一个变量名,异步表达式的结果被绑定到该变量名,<handler>
可以访问该变量。这就是为什么在最初的例子中,val
被用于<pattern>
,而 <handler>
能够访问 val
。
如果 <pattern>
与异步计算的结果不匹配,那么剩下的异步表达式继续并发地执行,直到下一个表达式完成。这时,同样的逻辑被应用于该结果。
因为 select!
可以接受任何异步表达式,所以可以定义更复杂的计算来进行选择。
在这里,我们在一个 oneshot 通道和一个TCP连接的输出上进行选择。
use tokio::net::TcpStream;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
// Spawn a task that sends a message over the oneshot
tokio::spawn(async move {
tx.send("done").unwrap();
});
tokio::select! {
socket = TcpStream::connect("localhost:3465") => {
println!("Socket connected {:?}", socket);
}
msg = rx => {
println!("received message first {:?}", msg);
}
}
}
在这里,我们选择了一个 onehot 并接受来自 TcpListener 的套接字。
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(()).unwrap();
});
let mut listener = TcpListener::bind("localhost:3465").await?;
tokio::select! {
_ = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
accept 循环一直运行到遇到错误或 rx
收到一个值。_模式表示我们对异步计算的返回值不感兴趣。
返回值
tokio::select!
宏返回被评估的 <handler>
表达式的结果。
async fn computation1() -> String {
// .. computation
}
async fn computation2() -> String {
// .. computation
}
#[tokio::main]
async fn main() {
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};
println!("Got = {}", out);
}
正因为如此,要求每个分支的 <handler>
表达式求值为同一类型。如果不需要 select!
表达式的输出,让表达式求值为()是很好的做法。
错误
使用?
操作符会从表达式中传播错误。这取决于是在异步表达式中还是在处理程序中使用?
在一个异步表达式中使用?
操作符会将错误从异步表达式中传播出去。这使得异步表达式的输出成为一个结果。在 handler 中使用?
会立即将错误从 select!
表达式中传播出去。让我们再看一下接受循环的例子。
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// [setup `rx` oneshot channel]
let listener = TcpListener::bind("localhost:3465").await?;
tokio::select! {
res = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {
res?;
}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
注意 listener.accept().await?
操作符将错误从该表达式中传播出来,并传播到 res
绑定中。在发生错误时, res 将被设置为 Err(_)
。然后,在处理程序中,再次使用?
操作符。res?
语句将把一个错误从主函数中传播出去。
模式匹配
回顾一下,select!
宏分支语法被定义为:
<pattern> = <async expression> => <handler>,
到目前为止,我们只使用了 <pattern>
的变量绑定。然而,任何Rust模式都可以被使用。例如,假设我们从多个MPSC通道接收信息,我们可以这样做。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);
tokio::spawn(async move {
// Do something w/ `tx1` and `tx2`
});
tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
println!("Both channels closed");
}
}
}
借用
当催生任务时,被催生的异步表达式必须拥有其所有的数据。select!
宏没有这个限制。每个分支的异步表达式都可以借用数据并同时操作。按照Rust的借用规则,多个异步表达式可以不变地借用一个数据,或者一个异步表达式可以可变地借用一个数据。
我们来看看一些例子。在这里,我们同时向两个不同的TCP目的地发送相同的数据。
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;
async fn race(
data: &[u8],
addr1: SocketAddr,
addr2: SocketAddr
) -> io::Result<()> {
tokio::select! {
Ok(_) = async {
let mut socket = TcpStream::connect(addr1).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
Ok(_) = async {
let mut socket = TcpStream::connect(addr2).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
else => {}
};
Ok(())
}
data 变量被从两个异步表达式中不可变地借用。当其中一个操作成功完成时,另一个就会被放弃。因为我们在 Ok(_)
上进行模式匹配,如果一个表达式失败,另一个表达式继续执行。
当涉及到每个分支的 <handler>
时,select!
保证只运行一个 <handler>
。正因为如此,每个<handler>
都可以相互借用相同的数据。
例如,这在两个处理程序中都修改了out:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut out = String::new();
tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});
tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}
println!("{}", out);
}
循环
select!
宏经常在循环中使用。本节将通过一些例子来说明在循环中使用 select!
宏的常见方法。我们首先在多个通道上进行选择。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);
loop {
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};
println!("Got {}", msg);
}
println!("All channels have been closed.");
}
这个例子在三个通道的接收器上进行 select。当在任何通道上收到消息时,它被写入STDOUT。当一个通道被关闭时,recv()
以None返回。通过使用模式匹配,select!
宏继续在其余通道上等待。当所有的通道都关闭时,else分支被评估,循环被终止。
select!
宏随机挑选分支,首先检查是否准备就绪。当多个通道有等待值时,将随机挑选一个通道来接收。这是为了处理这样的情况:接收循环处理消息的速度比推入通道的速度慢,也就是说,通道开始被填满。如果 select!
不随机挑选一个分支先检查,在循环的每个迭代中,rx1将被首先检查。如果rx1总是包含一个新的消息,其余的通道将永远不会被检查。
如果当select!被评估时,多个通道有待处理的消息,只有一个通道有一个值被弹出。所有其他的通道保持不动,它们的消息保持在这些通道中,直到下一个循环迭代。没有消息丢失。
恢复异步操作
现在我们将展示如何在多次调用 select!
时运行一个异步操作。在这个例子中,我们有一个MPSC通道,类型为i32
,还有一个异步函数。我们想运行异步函数,直到它完成或在通道上收到一个偶数整数。
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action();
tokio::pin!(operation);
loop {
tokio::select! {
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
请注意,不是在 select!
宏中调用 action()
,而是在循环之外调用它。action()
的返回被分配给 operation,而不调用 .await
。然后我们在 operation 上调用 tokio::pin!
在 select!
循环中,我们没有传入 operation
,而是传入 &mut operation
。operation
变量正在跟踪飞行中的异步操作。循环的每个迭代都使用相同的 operation,而不是对 action()
发出一个新的调用。
另一个 select!
分支从通道中接收消息。如果该消息是偶数,我们就完成了循环。否则,再次启动 select!
。
这是我们第一次使用 tokio::pin!
我们现在还不打算讨论 pining 的细节。需要注意的是,为了 .await
一个引用,被引用的值必须被 pin 或者实现 Unpin
。
如果我们删除 tokio::pin!
这一行,并尝试编译,我们会得到以下错误:
error[E0599]: no method named `poll` found for struct
`std::pin::Pin<&mut &mut impl std::future::Future>`
in the current scope
--> src/main.rs:16:9
|
16 | / tokio::select! {
17 | | _ = &mut operation => break,
18 | | Some(v) = rx.recv() => {
19 | | if v % 2 == 0 {
... |
22 | | }
23 | | }
| |_________^ method not found in
| `std::pin::Pin<&mut &mut impl std::future::Future>`
|
= note: the method `poll` exists but the following trait bounds
were not satisfied:
`impl std::future::Future: std::marker::Unpin`
which is required by
`&mut impl std::future::Future: std::future::Future`
虽然我们在上一章中介绍了 Future
,但这个错误仍然不是很清楚。如果你在试图对一个引用调用 .await
时遇到这样一个关于 Future
没有被实现的错误,那么这个Future可能需要被 pin
。
阅读更多关于标准库中的Pin。
修改分支
让我们来看看一个稍微复杂的循环。我们有:
- 一个
i32
值的通道。 - 一个在
i32
值上执行的异步操作。
我们要实现的逻辑是:
- 在通道上等待一个偶数。
- 使用偶数作为输入启动异步操作。
- 等待操作,但同时在通道上监听更多的偶数。
- 如果在现有的操作完成之前收到一个新的偶数,则中止现有的操作,用新的偶数重新开始操作。
async fn action(input: Option<i32>) -> Option<String> {
// If the input is `None`, return `None`.
// This could also be written as `let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// async logic here
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut done = false;
let operation = action(None);
tokio::pin!(operation);
tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});
loop {
tokio::select! {
res = &mut operation, if !done => {
done = true;
if let Some(v) = res {
println!("GOT = {}", v);
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}
我们使用的策略与前面的例子类似。async fn
在循环之外被调用,并被分配给 operation
。operation 变量被 pin 住。循环在 operation 和通道接收器上都进行select。
注意 action 是如何将 Option<i32>
作为参数的。在我们接收第一个偶数之前,我们需要将 operation 实例化为某种东西。我们让 action
接受 Option
并返回Option
。如果传入的是 None
,就会返回 None
。在第一个循环迭代中,operation
立即以 None
完成。
这个例子使用了一些新的语法。第一个分支包括 , if !done
。这是一个分支的前提条件。在解释它是如何工作的之前,让我们看一下如果省略了这个前提条件会发生什么。省略 , if !done
并运行这个例子的结果是如下输出。
thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
这个错误发生在试图使用已经完成的 operation
时。通常情况下,当使用 .await
时,被等待的值会被消耗。在这个例子中,我们对一个引用进行 await。这意味着 operation 在完成后仍然存在。
为了避免这种 panic,我们必须注意在 operation 完成后禁用第一个分支。done 变量用于跟踪 operation 是否完成。一个 select!
分支可能包括一个 precondition。这个前提条件在 select!
分支等待之前被检查。如果该条件被评估为false,那么该分支将被禁用。done变量被初始化为false。当 operation 完成后,done被设置为true。下一个循环迭代将禁用该操作分支。当从通道收到一个偶数信息时,operation 被重置,done被设置为false。
每任务的并发性
tokio::spoon
和 select!
都可以运行并发的异步操作。然而,用于运行并发操作的策略是不同的。tokio::spoon
函数接收一个异步操作并生成一个新的任务来运行它。任务是 Tokio 运行时安排的对象。两个不同的任务是由 Tokio 独立调度的。它们可能同时运行在不同的操作系统线程上。正因为如此,一个催生的任务和一个催生的线程有同样的限制:不能借用。
select!
宏在同一个任务上同时运行所有分支。因为 select!
宏的所有分支都在同一个任务上执行,所以它们永远不会同时运行。select!
宏在一个任务上复用异步操作。