select

Tokio教程之select

https://tokio.rs/tokio/tutorial/select

到目前为止,当我们想给系统添加并发性时,我们会生成一个新的任务。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。

tokio::select!

tokio::select! 宏允许在多个异步计算中等待,并在单个计算完成后返回。

比如说:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

使用了两个 oneshot 通道。任何一个通道都可以先完成。select! 语句在两个通道上等待,并将 val 与任务返回的值绑定。当 tx1 或 tx2 完成时,相关的块被执行。

没有完成的分支被放弃。在这个例子中,计算正在等待每个通道的 oneshot::Receiver。尚未完成的通道的 oneshot::Receiver 被放弃。

取消

在异步Rust中,取消操作是通过丢弃一个 future 来实现的。回顾 “Async in depth”,异步Rust操作是使用 futures 实现的,而 futures 是 lazy 的。只有当期货被 poll 时,操作才会继续进行。如果future被丢弃,操作就不能进行,因为所有相关的状态都被丢弃了。

也就是说,有时一个异步操作会催生后台任务或启动其他在后台运行的操作。例如,在上面的例子中,一个任务被催生出来,以发送一个消息回来。通常情况下,该任务会进行一些计算来生成数值。

Futures或其他类型可以实现 Drop 来清理后台资源。Tokio 的 oneshot::Receiver 通过向 Sender half 发送一个关闭的通知来实现 Drop。sender 部分可以收到这个通知,并通过丢弃来中止正在进行的操作。

use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Compute value here
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Select on the operation and the oneshot's
        // `closed()` notification.
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                // `some_operation()` is canceled, the
                // task completes and `tx1` is dropped.
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Future 实现

为了帮助更好地理解 select! 的工作原理,让我们看看一个假想的Future实现是什么样子的。这是一个简化版本。在实践中,select! 包括额外的功能,如随机选择要先 poll 的分支。

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }

        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    // use tx1 and tx2

    MySelect {
        rx1,
        rx2,
    }.await;
}

MySelect future 包含每个分支的future。当MySelect被 poll 时,第一个分支被 poll。如果它准备好了,该值被使用,MySelect完成。在 .await 收到一个future的输出后,该future被放弃。这导致两个分支的futures都被丢弃。由于有一个分支没有完成,所以该操作实际上被取消了。

请记住上一节的内容:

当一个 future 返回 Poll::Pending 时,它必须确保在未来的某个时间点上对 waker 发出信号。如果忘记这样做,任务就会被无限期地挂起。

MySelect 的实现中,没有明确使用 Context 参数。相应的是,waker的要求是通过传递 cx 给内部 future 来满足的。由于内部 future 也必须满足waker的要求,通过只在收到内部 future 的 Poll::Pending 时返回 Poll::PendingMySelect 也满足 waker 的要求。

语法

选择 select! 宏可以处理两个以上的分支。目前的限制是64个分支。每个分支的结构为:

<pattern> = <async expression> => <handler>,

当 select 宏被评估时,所有的 <async expression> 被聚集起来并同时执行。当一个表达式完成时,其结果与 <pattern> 匹配。如果结果与模式匹配,那么所有剩余的异步表达式被放弃,<handler> 被执行。<handler> 表达式可以访问由 <pattern> 建立的任何绑定关系。

基本情况是 <pattern> 是一个变量名,异步表达式的结果被绑定到该变量名,<handler> 可以访问该变量。这就是为什么在最初的例子中,val 被用于<pattern>,而 <handler> 能够访问 val

如果 <pattern> 与异步计算的结果不匹配,那么剩下的异步表达式继续并发地执行,直到下一个表达式完成。这时,同样的逻辑被应用于该结果。

因为 select! 可以接受任何异步表达式,所以可以定义更复杂的计算来进行选择。

在这里,我们在一个 oneshot 通道和一个TCP连接的输出上进行选择。

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Spawn a task that sends a message over the oneshot
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

在这里,我们选择了一个 onehot 并接受来自 TcpListener 的套接字。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {}
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

accept 循环一直运行到遇到错误或 rx 收到一个值。_模式表示我们对异步计算的返回值不感兴趣。

返回值

tokio::select! 宏返回被评估的 <handler> 表达式的结果。

async fn computation1() -> String {
    // .. computation
}

async fn computation2() -> String {
    // .. computation
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}

正因为如此,要求每个分支的 <handler> 表达式求值为同一类型。如果不需要 select! 表达式的输出,让表达式求值为()是很好的做法。

错误

使用?操作符会从表达式中传播错误。这取决于是在异步表达式中还是在处理程序中使用? 在一个异步表达式中使用?操作符会将错误从异步表达式中传播出去。这使得异步表达式的输出成为一个结果。在 handler 中使用?会立即将错误从 select!表达式中传播出去。让我们再看一下接受循环的例子。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [setup `rx` oneshot channel]

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

注意 listener.accept().await? 操作符将错误从该表达式中传播出来,并传播到 res 绑定中。在发生错误时, res 将被设置为 Err(_)。然后,在处理程序中,再次使用?操作符。res? 语句将把一个错误从主函数中传播出去。

模式匹配

回顾一下,select! 宏分支语法被定义为:

<pattern> = <async expression> => <handler>,

到目前为止,我们只使用了 <pattern> 的变量绑定。然而,任何Rust模式都可以被使用。例如,假设我们从多个MPSC通道接收信息,我们可以这样做。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Do something w/ `tx1` and `tx2`
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

借用

当催生任务时,被催生的异步表达式必须拥有其所有的数据。select! 宏没有这个限制。每个分支的异步表达式都可以借用数据并同时操作。按照Rust的借用规则,多个异步表达式可以不变地借用一个数据,或者一个异步表达式可以可变地借用一个数据。

我们来看看一些例子。在这里,我们同时向两个不同的TCP目的地发送相同的数据。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        else => {}
    };

    Ok(())
}

data 变量被从两个异步表达式中不可变地借用。当其中一个操作成功完成时,另一个就会被放弃。因为我们在 Ok(_) 上进行模式匹配,如果一个表达式失败,另一个表达式继续执行。

当涉及到每个分支的 <handler> 时,select! 保证只运行一个 <handler>。正因为如此,每个<handler>都可以相互借用相同的数据。

例如,这在两个处理程序中都修改了out:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

循环

select! 宏经常在循环中使用。本节将通过一些例子来说明在循环中使用 select! 宏的常见方法。我们首先在多个通道上进行选择。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {}", msg);
    }

    println!("All channels have been closed.");
}

这个例子在三个通道的接收器上进行 select。当在任何通道上收到消息时,它被写入STDOUT。当一个通道被关闭时,recv() 以None返回。通过使用模式匹配,select! 宏继续在其余通道上等待。当所有的通道都关闭时,else分支被评估,循环被终止。

select! 宏随机挑选分支,首先检查是否准备就绪。当多个通道有等待值时,将随机挑选一个通道来接收。这是为了处理这样的情况:接收循环处理消息的速度比推入通道的速度慢,也就是说,通道开始被填满。如果 select! 不随机挑选一个分支先检查,在循环的每个迭代中,rx1将被首先检查。如果rx1总是包含一个新的消息,其余的通道将永远不会被检查。

如果当select!被评估时,多个通道有待处理的消息,只有一个通道有一个值被弹出。所有其他的通道保持不动,它们的消息保持在这些通道中,直到下一个循环迭代。没有消息丢失。

恢复异步操作

现在我们将展示如何在多次调用 select! 时运行一个异步操作。在这个例子中,我们有一个MPSC通道,类型为i32,还有一个异步函数。我们想运行异步函数,直到它完成或在通道上收到一个偶数整数。

async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    
    
    let operation = action();
    tokio::pin!(operation);
    
    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

请注意,不是在 select! 宏中调用 action() ,而是在循环之外调用它。action() 的返回被分配给 operation,而不调用 .await。然后我们在 operation 上调用 tokio::pin!

select! 循环中,我们没有传入 operation,而是传入 &mut operationoperation 变量正在跟踪飞行中的异步操作。循环的每个迭代都使用相同的 operation,而不是对 action() 发出一个新的调用。

另一个 select! 分支从通道中接收消息。如果该消息是偶数,我们就完成了循环。否则,再次启动 select!

这是我们第一次使用 tokio::pin! 我们现在还不打算讨论 pining 的细节。需要注意的是,为了 .await 一个引用,被引用的值必须被 pin 或者实现 Unpin

如果我们删除 tokio::pin! 这一行,并尝试编译,我们会得到以下错误:

error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

虽然我们在上一章中介绍了 Future,但这个错误仍然不是很清楚。如果你在试图对一个引用调用 .await 时遇到这样一个关于 Future 没有被实现的错误,那么这个Future可能需要被 pin

阅读更多关于标准库中的Pin。

修改分支

让我们来看看一个稍微复杂的循环。我们有:

  1. 一个 i32 值的通道。
  2. 一个在 i32 值上执行的异步操作。

我们要实现的逻辑是:

  1. 在通道上等待一个偶数。
  2. 使用偶数作为输入启动异步操作。
  3. 等待操作,但同时在通道上监听更多的偶数。
  4. 如果在现有的操作完成之前收到一个新的偶数,则中止现有的操作,用新的偶数重新开始操作。
async fn action(input: Option<i32>) -> Option<String> {
    // If the input is `None`, return `None`.
    // This could also be written as `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // async logic here
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
    
    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);
    
    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });
    
    loop {
        tokio::select! {
            res = &mut operation, if !done => {
                done = true;

                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` is a method on `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

我们使用的策略与前面的例子类似。async fn 在循环之外被调用,并被分配给 operation。operation 变量被 pin 住。循环在 operation 和通道接收器上都进行select。

注意 action 是如何将 Option<i32> 作为参数的。在我们接收第一个偶数之前,我们需要将 operation 实例化为某种东西。我们让 action 接受 Option 并返回Option。如果传入的是 None,就会返回 None。在第一个循环迭代中,operation 立即以 None 完成。

这个例子使用了一些新的语法。第一个分支包括 , if !done。这是一个分支的前提条件。在解释它是如何工作的之前,让我们看一下如果省略了这个前提条件会发生什么。省略 , if !done 并运行这个例子的结果是如下输出。

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

这个错误发生在试图使用已经完成的 operation 时。通常情况下,当使用 .await 时,被等待的值会被消耗。在这个例子中,我们对一个引用进行 await。这意味着 operation 在完成后仍然存在。

为了避免这种 panic,我们必须注意在 operation 完成后禁用第一个分支。done 变量用于跟踪 operation 是否完成。一个 select! 分支可能包括一个 precondition。这个前提条件在 select! 分支等待之前被检查。如果该条件被评估为false,那么该分支将被禁用。done变量被初始化为false。当 operation 完成后,done被设置为true。下一个循环迭代将禁用该操作分支。当从通道收到一个偶数信息时,operation 被重置,done被设置为false。

每任务的并发性

tokio::spoonselect! 都可以运行并发的异步操作。然而,用于运行并发操作的策略是不同的。tokio::spoon 函数接收一个异步操作并生成一个新的任务来运行它。任务是 Tokio 运行时安排的对象。两个不同的任务是由 Tokio 独立调度的。它们可能同时运行在不同的操作系统线程上。正因为如此,一个催生的任务和一个催生的线程有同样的限制:不能借用。

select! 宏在同一个任务上同时运行所有分支。因为 select! 宏的所有分支都在同一个任务上执行,所以它们永远不会同时运行。select! 宏在一个任务上复用异步操作。