深入异步

Tokio教程之深入异步

https://tokio.rs/tokio/tutorial/async

在这一点上,我们已经完成了对异步 Rust 和 Tokio 的相当全面的考察。现在我们将深入挖掘Rust的异步运行时模型。在教程的一开始,我们就暗示过,异步Rust采取了一种独特的方法。现在,我们解释一下这意味着什么。

Futures

作为快速回顾,让我们采取一个非常基本的异步函数。与本教程到目前为止所涉及的内容相比,这并不新鲜。

use tokio::net::TcpStream;

async fn my_async_fn() {
    println!("hello from async");
    let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
    println!("async TCP operation complete");
}

我们调用这个函数,它返回一些值。我们在这个值上调用.await。

#[tokio::main]
async fn main() {
    let what_is_this = my_async_fn();
    // Nothing has been printed yet.
    what_is_this.await;
    // Text has been printed and socket has been
    // established and closed.
}

my_async_fn() 返回的值是一个future。future是一个实现了标准库所提供的 std::future::Future 特性的值。它们是包含正在进行的异步计算的值。

std::future::Future trait的定义是:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}

相关类型 Output 是 future 完成后产生的类型。Pin 类型是Rust能够支持异步函数中的借用的方式。更多细节请参见标准库文档。

与其他语言实现 future 的方式不同,Rust future 并不代表在后台发生的计算,相反,Rust future就是计算本身。Future的所有者负责通过轮询Future来推进计算。这可以通过调用 Future::poll 来实现。

实现future

让我们来实现一个非常简单的future。这个future将:

  • 等待到一个特定的时间点。
  • 输出一些文本到STDOUT。
  • 产生一个字符串。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

作为Future的Async fn

在main函数中,我们实例化了future并对其调用 .await。从异步函数中,我们可以对任何实现Future的值调用 .await。反过来,调用一个异步函数会返回一个实现Future的匿名类型。在 async fn main() 的例子中,生成的future大致是这样的。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

Rust futures是一种状态机。在这里,MainFuture 被表示为一个 future 的可能状态的枚举。future 在 State0 状态下开始。当 poll 被调用时,future 试图尽可能地推进其内部状态。如果 future 能够完成,Poll::Ready 将被返回,其中包含异步计算的输出。

如果future不能完成,通常是由于它所等待的资源没有准备好,那么就会返回 Poll::Pending。收到 Poll::Pending 是向调用者表明,future 将在稍后的时间完成,调用者应该在稍后再次调用poll。

我们还看到,future 是由其他 future 组成的。在外层 future 上调用 poll 的结果是调用内部 future 的 poll 函数。

executors

异步的Rust函数返回future。future必须被调用 poll 以推进其状态。future是由其他 future 组成的。那么,问题来了,是什么在最外层的 future 上调用poll?

回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn 或者是被 #[tokio::main] 注释的主函数。这样做的结果是将生成的外层 future 提交给 Tokio执行器。执行器负责在外部 future 上调用 Future::poll,推动异步计算的完成。

mini Tokio

为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的最小版本的Tokio! 完整的代码可以在这里找到。

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
    let mut mini_tokio = MiniTokio::new();

    mini_tokio.spawn(async {
        let when = Instant::now() + Duration::from_millis(10);
        let future = Delay { when };

        let out = future.await;
        assert_eq!(out, "done");
    });

    mini_tokio.run();
}

struct MiniTokio {
    tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
    fn new() -> MiniTokio {
        MiniTokio {
            tasks: VecDeque::new(),
        }
    }
    
    /// Spawn a future onto the mini-tokio instance.
    fn spawn<F>(&mut self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.tasks.push_back(Box::pin(future));
    }
    
    fn run(&mut self) {
        let waker = task::noop_waker();
        let mut cx = Context::from_waker(&waker);
        
        while let Some(mut task) = self.tasks.pop_front() {
            if task.as_mut().poll(&mut cx).is_pending() {
                self.tasks.push_back(task);
            }
        }
    }
}

这将运行异步块。一个具有所要求的延迟的 Delay 实例被创建并被等待。然而,到目前为止,我们的实现有一个重大缺陷。我们的执行器从未进入睡眠状态。执行器不断地循环所有被催生的 future,并对它们进行 poll 。大多数时候,这些 future 还没有准备好执行更多的工作,并会再次返回 Poll::Pending。这个过程会消耗CPU,一般来说效率不高。

理想情况下,我们希望 mini-tokio 只在 future 能够取得进展时 poll future。这发生在任务被阻塞的资源准备好执行请求的操作时。如果任务想从一个TCP套接字中读取数据,那么我们只想在TCP套接字收到数据时 poll 任务。在我们的例子中,任务在达到给定的瞬间被阻断。理想情况下,mini-tokio只会在那个瞬间过去后 poll 任务。

为了实现这一点,当一个资源被 poll 而该资源又还没有准备好时,一旦它过渡到 ready 的状态,该资源将发送一个通知。

Wakers

Waker 是缺失的那部分。这是一个系统,通过这个系统,资源能够通知等待的任务,资源已经准备好继续某些操作。

让我们再看一下Future::poll的定义:

fn poll(self: Pin<&mut Self>, cx: &mut Context)
    -> Poll<Self::Output>;

Poll 的 Context 参数有一个 waker() 方法。该方法返回一个与当前任务绑定的Waker。该Waker有一个wake()方法。调用该方法向执行器发出信号,相关任务应该被安排执行。当资源过渡到准备好的状态时调用wake(),通知执行者,poll 任务将能够取得进展。

更新 Delay

我们可以更新 Delay 来使用 wakers。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Get a handle to the waker for the current task
            let waker = cx.waker().clone();
            let when = self.when;

            // Spawn a timer thread.
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                waker.wake();
            });

            Poll::Pending
        }
    }
}

现在,一旦请求的持续时间过了,调用的任务就会被通知,执行者可以确保任务被再次安排。下一步是更新mini-tokio以监听唤醒通知。

我们的 Delay 实现还有一些剩余的问题。我们将在以后修复它们。

当一个 future 返回 Poll::Pending 时,它必须确保在某个时间点对 waker 发出信号。忘记这样做会导致任务无限期地挂起。

在返回 Poll::Pending 后忘记唤醒一个任务是一个常见的错误来源。

回顾一下 “Delay"的第一次迭代。这里是 future 的实现。

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

在返回 Poll::Pending 之前,我们调用 cx.waker().wake_by_ref()。这是为了满足 future 契约。通过返回 Poll::Pending,我们负责给唤醒者发信号。因为我们还没有实现定时器线程,所以我们在内联中给唤醒者发信号。这样做的结果是,future 将立即被重新安排,再次执行,而且可能还没有准备好完成。

请注意,允许对 waker 发出超过必要次数的信号。在这个特殊的例子中,即使我们根本没有准备好继续操作,我们还是向唤醒者发出信号。除了浪费一些CPU周期外,这样做并没有什么问题。然而,这种特殊的实现方式会导致一个繁忙的循环。

更新Mini Tokio

下一步是更新 Mini Tokio 以接收 waker 的通知。我们希望执行器只在被唤醒时运行任务,为了做到这一点,Mini Tokio将提供它自己的唤醒器。当唤醒者被调用时,其相关的任务将被排队执行。Mini Tokio在 poll future 时将这个 waker 传递给 future。

更新后的 Mini Tokio 将使用一个通道来存储预定任务。通道允许任务从任何线程被排队执行。Wakers 必须是 Send 和 Sync,所以我们使用来自crossbeam crate的通道,因为标准库的通道不是Sync。

Send和Sync特性是Rust提供的与并发性有关的标记特性。可以被发送到不同线程的类型是Send。大多数类型都是Send,但像Rc这样的类型则不是。可以通过不可变的引用并发访问的类型是Sync。一个类型可以是Send,但不是Sync–一个很好的例子是Cell,它可以通过不可变的引用被修改,因此并发访问是不安全的。

更多细节请参见Rust书中的相关章节。

然后,更新MiniTokio的结构。

use crossbeam::channel;
use std::sync::Arc;

struct MiniTokio {
    scheduled: channel::Receiver<Arc<Task>>,
    sender: channel::Sender<Arc<Task>>,
}

struct Task {
    // This will be filled in soon.
}

Wakers 是 sync,并且可以被克隆。当 wake 被调用时,任务必须被安排执行。为了实现这一点,我们有一个通道。当 wake() 被调用时,任务被推到通道的发送部分。我们的 task 结构将实现唤醒逻辑。要做到这一点,它需要同时包含催生的future 和通道的发送部分。

use std::sync::{Arc, Mutex};

struct Task {
    // The `Mutex` is to make `Task` implement `Sync`. Only
    // one thread accesses `future` at any given time. The
    // `Mutex` is not required for correctness. Real Tokio
    // does not use a mutex here, but real Tokio has
    // more lines of code than can fit in a single tutorial
    // page.
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
    executor: channel::Sender<Arc<Task>>,
}

impl Task {
    fn schedule(self: &Arc<Self>) {
        self.executor.send(self.clone());
    }
}

为了安排任务,Arc被克隆并通过通道发送。现在,我们需要将我们的 schedule 函数与 std::task::Waker 挂钩。标准库提供了一个低级别的API,通过手动构建vtable来完成这个任务。这种策略为实现者提供了最大的灵活性,但需要一堆不安全的模板代码。我们不直接使用 RawWakerVTable,而是使用由futures crate提供的ArcWake工具。这使得我们可以实现一个简单的特质,将我们的任务结构暴露为一个waker。

在你的Cargo.toml中添加以下依赖,以拉入future。

futures = "0.3"

然后实现 futures::task::ArcWake

use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.schedule();
    }
}

当上面的定时器线程调用waker.wake()时,任务被推送到通道中。接下来,我们在MiniTokio::run()函数中实现接收和执行任务。

impl MiniTokio {
    fn run(&self) {
        while let Ok(task) = self.scheduled.recv() {
            task.poll();
        }
    }

    /// Initialize a new mini-tokio instance.
    fn new() -> MiniTokio {
        let (sender, scheduled) = channel::unbounded();

        MiniTokio { scheduled, sender }
    }

    /// Spawn a future onto the mini-tokio instance.
    ///
    /// The given future is wrapped with the `Task` harness and pushed into the
    /// `scheduled` queue. The future will be executed when `run` is called.
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        Task::spawn(future, &self.sender);
    }
}

impl Task {
    fn poll(self: Arc<Self>) {
        // Create a waker from the `Task` instance. This
        // uses the `ArcWake` impl from above.
        let waker = task::waker(self.clone());
        let mut cx = Context::from_waker(&waker);

        // No other thread ever tries to lock the future
        let mut future = self.future.try_lock().unwrap();

        // Poll the future
        let _ = future.as_mut().poll(&mut cx);
    }

    // Spawns a new taks with the given future.
    //
    // Initializes a new Task harness containing the given future and pushes it
    // onto `sender`. The receiver half of the channel will get the task and
    // execute it.
    fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
            executor: sender.clone(),
        });

        let _ = sender.send(task);
    }

}

这里发生了多件事情。首先,MiniTokio::run()被实现。该函数在一个循环中运行,接收来自通道的预定任务。由于任务在被唤醒时被推入通道,这些任务在执行时能够取得进展。

此外,MiniTokio::new()MiniTokio::spwn() 函数被调整为使用通道而不是 VecDeque。当新的任务被催生时,它们会被赋予一个通道的发送者部分的克隆,任务可以用它来在运行时安排自己。

Task::poll() 函数使用来自 futures crate 的 ArcWake 工具创建waker。waker被用来创建一个 task::Context。该 task::Context 被传递给 poll。

摘要

我们现在已经看到了一个端到端的例子,说明异步Rust是如何工作的。Rust的 async/await 功能是由traits支持的。这允许第三方crate,如Tokio,提供执行细节。

  • Rust的异步操作是 lazy 的,需要调用者来 poll 它们。
  • Wakers被传递给futures,以将一个future与调用它的任务联系起来。
  • 当一个资源没有准备好完成一个操作时,Poll::Pending 被返回,任务的waker被记录。
  • 当资源准备好时,任务的 waker 会被通知。
  • 执行者收到通知并安排任务的执行。
  • 任务再次被 poll ,这次资源已经准备好了,任务取得了进展。

某些未尽事宜

记得我们在实现 Delay future 的时候,说过还有一些事情要解决。Rust的异步模型允许单个future在执行时跨任务迁移。考虑一下下面的情况。

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let mut delay = Some(Delay { when });

    poll_fn(move |cx| {
        let mut delay = delay.take().unwrap();
        let res = Pin::new(&mut delay).poll(cx);
        assert!(res.is_pending());
        tokio::spawn(async move {
            delay.await;
        });

        Poll::Ready(())
    }).await;
}

poll_fn 函数使用闭包创建Future实例。上面的片段创建了一个Delay实例,对其进行了一次轮询,然后将Delay实例发送到一个新的任务中等待。在这个例子中,Delay::poll在不同的Waker实例中被调用了不止一次。当这种情况发生时,你必须确保在最近一次调用 poll 时所传递的 Waker上调用 wake。

当实现 future 时,关键是要假设每一次对poll的调用都可能提供一个不同的Waker实例。poll 函数必须用新的唤醒者来更新任何先前记录的唤醒者。

我们早期实现的Delay在每次 poll 时都会产生一个新的线程。这很好,但是如果 poll 太频繁的话,效率就会很低(例如,如果你 select! 这个future和其他的future,只要其中一个有事件,这两个都会被poll)。一种方法是记住你是否已经产生了一个线程,如果你还没有产生一个线程,就只产生一个新的线程。然而,如果你这样做,你必须确保线程的Waker在以后调用 poll 时被更新,否则你就不能唤醒最近的Waker。

为了修复我们之前的实现,我们可以这样做。

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
    // This Some when we have spawned a thread, and None otherwise.
    waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // First, if this is the first time the future is called, spawn the
        // timer thread. If the timer thread is already running, ensure the
        // stored `Waker` matches the current task's waker.
        if let Some(waker) = &self.waker {
            let mut waker = waker.lock().unwrap();

            // Check if the stored waker matches the current task's waker.
            // This is necessary as the `Delay` future instance may move to
            // a different task between calls to `poll`. If this happens, the
            // waker contained by the given `Context` will differ and we
            // must update our stored waker to reflect this change.
            if !waker.will_wake(cx.waker()) {
                *waker = cx.waker().clone();
            }
        } else {
            let when = self.when;
            let waker = Arc::new(Mutex::new(cx.waker().clone()));
            self.waker = Some(waker.clone());

            // This is the first time `poll` is called, spawn the timer thread.
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                // The duration has elapsed. Notify the caller by invoking
                // the waker.
                let waker = waker.lock().unwrap();
                waker.wake_by_ref();
            });
        }

        // Once the waker is stored and the timer thread is started, it is
        // time to check if the delay has completed. This is done by
        // checking the current instant. If the duration has elapsed, then
        // the future has completed and `Poll::Ready` is returned.
        if Instant::now() >= self.when {
            Poll::Ready(())
        } else {
            // The duration has not elapsed, the future has not completed so
            // return `Poll::Pending`.
            //
            // The `Future` trait contract requires that when `Pending` is
            // returned, the future ensures that the given waker is signalled
            // once the future should be polled again. In our case, by
            // returning `Pending` here, we are promising that we will
            // invoke the given waker included in the `Context` argument
            // once the requested duration has elapsed. We ensure this by
            // spawning the timer thread above.
            //
            // If we forget to invoke the waker, the task will hang
            // indefinitely.
            Poll::Pending
        }
    }
}

这有点复杂,但想法是,在每次调用 poll 时,future 会检查所提供的 waker 是否与之前记录的 waker 相匹配。如果两个 waker 匹配,那么就没有其他事情要做。如果不匹配,则必须更新记录的 waker。

Notify 工具

我们演示了如何使用wakers手工实现一个 Delay future。Wakers是异步Rust工作方式的基础。通常情况下,没有必要深入到这个水平。例如,在Delay的情况下,我们可以通过使用 tokio::sync::Notify 工具,完全用 async/await 实现它。这个工具提供了一个基本的任务通知机制。它处理了waker的细节,包括确保记录的waker与当前任务相匹配。

使用Notify,我们可以像这样用 async/await 实现一个 Delay 函数。

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify2.notify_one();
    });


    notify.notified().await;
}