深入异步
在这一点上,我们已经完成了对异步 Rust 和 Tokio 的相当全面的考察。现在我们将深入挖掘Rust的异步运行时模型。在教程的一开始,我们就暗示过,异步Rust采取了一种独特的方法。现在,我们解释一下这意味着什么。
Futures
作为快速回顾,让我们采取一个非常基本的异步函数。与本教程到目前为止所涉及的内容相比,这并不新鲜。
use tokio::net::TcpStream;
async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}
我们调用这个函数,它返回一些值。我们在这个值上调用.await。
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// Nothing has been printed yet.
what_is_this.await;
// Text has been printed and socket has been
// established and closed.
}
my_async_fn()
返回的值是一个future。future是一个实现了标准库所提供的 std::future::Future
特性的值。它们是包含正在进行的异步计算的值。
std::future::Future
trait的定义是:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}
相关类型 Output
是 future 完成后产生的类型。Pin 类型是Rust能够支持异步函数中的借用的方式。更多细节请参见标准库文档。
与其他语言实现 future 的方式不同,Rust future 并不代表在后台发生的计算,相反,Rust future就是计算本身。Future的所有者负责通过轮询Future来推进计算。这可以通过调用 Future::poll
来实现。
实现future
让我们来实现一个非常简单的future。这个future将:
- 等待到一个特定的时间点。
- 输出一些文本到STDOUT。
- 产生一个字符串。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
作为Future的Async fn
在main函数中,我们实例化了future并对其调用 .await
。从异步函数中,我们可以对任何实现Future的值调用 .await
。反过来,调用一个异步函数会返回一个实现Future的匿名类型。在 async fn main()
的例子中,生成的future大致是这样的。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
Rust futures是一种状态机。在这里,MainFuture
被表示为一个 future 的可能状态的枚举。future 在 State0 状态下开始。当 poll
被调用时,future 试图尽可能地推进其内部状态。如果 future 能够完成,Poll::Ready
将被返回,其中包含异步计算的输出。
如果future不能完成,通常是由于它所等待的资源没有准备好,那么就会返回 Poll::Pending
。收到 Poll::Pending
是向调用者表明,future 将在稍后的时间完成,调用者应该在稍后再次调用poll。
我们还看到,future 是由其他 future 组成的。在外层 future 上调用 poll 的结果是调用内部 future 的 poll
函数。
executors
异步的Rust函数返回future。future必须被调用 poll 以推进其状态。future是由其他 future 组成的。那么,问题来了,是什么在最外层的 future 上调用poll?
回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn
或者是被 #[tokio::main]
注释的主函数。这样做的结果是将生成的外层 future 提交给 Tokio执行器。执行器负责在外部 future 上调用 Future::poll
,推动异步计算的完成。
mini Tokio
为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的最小版本的Tokio! 完整的代码可以在这里找到。
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
这将运行异步块。一个具有所要求的延迟的 Delay
实例被创建并被等待。然而,到目前为止,我们的实现有一个重大缺陷。我们的执行器从未进入睡眠状态。执行器不断地循环所有被催生的 future,并对它们进行 poll 。大多数时候,这些 future 还没有准备好执行更多的工作,并会再次返回 Poll::Pending
。这个过程会消耗CPU,一般来说效率不高。
理想情况下,我们希望 mini-tokio 只在 future 能够取得进展时 poll future。这发生在任务被阻塞的资源准备好执行请求的操作时。如果任务想从一个TCP套接字中读取数据,那么我们只想在TCP套接字收到数据时 poll 任务。在我们的例子中,任务在达到给定的瞬间被阻断。理想情况下,mini-tokio只会在那个瞬间过去后 poll 任务。
为了实现这一点,当一个资源被 poll 而该资源又还没有准备好时,一旦它过渡到 ready 的状态,该资源将发送一个通知。
Wakers
Waker 是缺失的那部分。这是一个系统,通过这个系统,资源能够通知等待的任务,资源已经准备好继续某些操作。
让我们再看一下Future::poll的定义:
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
Poll 的 Context 参数有一个 waker() 方法。该方法返回一个与当前任务绑定的Waker。该Waker有一个wake()方法。调用该方法向执行器发出信号,相关任务应该被安排执行。当资源过渡到准备好的状态时调用wake(),通知执行者,poll 任务将能够取得进展。
更新 Delay
我们可以更新 Delay 来使用 wakers。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
现在,一旦请求的持续时间过了,调用的任务就会被通知,执行者可以确保任务被再次安排。下一步是更新mini-tokio以监听唤醒通知。
我们的 Delay 实现还有一些剩余的问题。我们将在以后修复它们。
当一个 future 返回
Poll::Pending
时,它必须确保在某个时间点对 waker 发出信号。忘记这样做会导致任务无限期地挂起。在返回
Poll::Pending
后忘记唤醒一个任务是一个常见的错误来源。
回顾一下 “Delay"的第一次迭代。这里是 future 的实现。
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
在返回 Poll::Pending
之前,我们调用 cx.waker().wake_by_ref()
。这是为了满足 future 契约。通过返回 Poll::Pending
,我们负责给唤醒者发信号。因为我们还没有实现定时器线程,所以我们在内联中给唤醒者发信号。这样做的结果是,future 将立即被重新安排,再次执行,而且可能还没有准备好完成。
请注意,允许对 waker 发出超过必要次数的信号。在这个特殊的例子中,即使我们根本没有准备好继续操作,我们还是向唤醒者发出信号。除了浪费一些CPU周期外,这样做并没有什么问题。然而,这种特殊的实现方式会导致一个繁忙的循环。
更新Mini Tokio
下一步是更新 Mini Tokio 以接收 waker 的通知。我们希望执行器只在被唤醒时运行任务,为了做到这一点,Mini Tokio将提供它自己的唤醒器。当唤醒者被调用时,其相关的任务将被排队执行。Mini Tokio在 poll future 时将这个 waker 传递给 future。
更新后的 Mini Tokio 将使用一个通道来存储预定任务。通道允许任务从任何线程被排队执行。Wakers 必须是 Send 和 Sync,所以我们使用来自crossbeam crate的通道,因为标准库的通道不是Sync。
Send和Sync特性是Rust提供的与并发性有关的标记特性。可以被发送到不同线程的类型是Send。大多数类型都是Send,但像Rc这样的类型则不是。可以通过不可变的引用并发访问的类型是Sync。一个类型可以是Send,但不是Sync–一个很好的例子是Cell,它可以通过不可变的引用被修改,因此并发访问是不安全的。
更多细节请参见Rust书中的相关章节。
然后,更新MiniTokio的结构。
use crossbeam::channel;
use std::sync::Arc;
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// This will be filled in soon.
}
Wakers 是 sync,并且可以被克隆。当 wake 被调用时,任务必须被安排执行。为了实现这一点,我们有一个通道。当 wake()
被调用时,任务被推到通道的发送部分。我们的 task 结构将实现唤醒逻辑。要做到这一点,它需要同时包含催生的future 和通道的发送部分。
use std::sync::{Arc, Mutex};
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
为了安排任务,Arc被克隆并通过通道发送。现在,我们需要将我们的 schedule 函数与 std::task::Waker
挂钩。标准库提供了一个低级别的API,通过手动构建vtable来完成这个任务。这种策略为实现者提供了最大的灵活性,但需要一堆不安全的模板代码。我们不直接使用 RawWakerVTable
,而是使用由futures crate提供的ArcWake工具。这使得我们可以实现一个简单的特质,将我们的任务结构暴露为一个waker。
在你的Cargo.toml中添加以下依赖,以拉入future。
futures = "0.3"
然后实现 futures::task::ArcWake
。
use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
当上面的定时器线程调用waker.wake()时,任务被推送到通道中。接下来,我们在MiniTokio::run()函数中实现接收和执行任务。
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
// Spawns a new taks with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
这里发生了多件事情。首先,MiniTokio::run()
被实现。该函数在一个循环中运行,接收来自通道的预定任务。由于任务在被唤醒时被推入通道,这些任务在执行时能够取得进展。
此外,MiniTokio::new()
和 MiniTokio::spwn()
函数被调整为使用通道而不是 VecDeque
。当新的任务被催生时,它们会被赋予一个通道的发送者部分的克隆,任务可以用它来在运行时安排自己。
Task::poll()
函数使用来自 futures crate 的 ArcWake
工具创建waker。waker被用来创建一个 task::Context
。该 task::Context
被传递给 poll。
摘要
我们现在已经看到了一个端到端的例子,说明异步Rust是如何工作的。Rust的 async/await
功能是由traits支持的。这允许第三方crate,如Tokio,提供执行细节。
- Rust的异步操作是 lazy 的,需要调用者来 poll 它们。
- Wakers被传递给futures,以将一个future与调用它的任务联系起来。
- 当一个资源没有准备好完成一个操作时,
Poll::Pending
被返回,任务的waker被记录。 - 当资源准备好时,任务的 waker 会被通知。
- 执行者收到通知并安排任务的执行。
- 任务再次被 poll ,这次资源已经准备好了,任务取得了进展。
某些未尽事宜
记得我们在实现 Delay future 的时候,说过还有一些事情要解决。Rust的异步模型允许单个future在执行时跨任务迁移。考虑一下下面的情况。
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
poll_fn
函数使用闭包创建Future实例。上面的片段创建了一个Delay实例,对其进行了一次轮询,然后将Delay实例发送到一个新的任务中等待。在这个例子中,Delay::poll在不同的Waker实例中被调用了不止一次。当这种情况发生时,你必须确保在最近一次调用 poll 时所传递的 Waker上调用 wake。
当实现 future 时,关键是要假设每一次对poll的调用都可能提供一个不同的Waker实例。poll 函数必须用新的唤醒者来更新任何先前记录的唤醒者。
我们早期实现的Delay在每次 poll 时都会产生一个新的线程。这很好,但是如果 poll 太频繁的话,效率就会很低(例如,如果你 select! 这个future和其他的future,只要其中一个有事件,这两个都会被poll)。一种方法是记住你是否已经产生了一个线程,如果你还没有产生一个线程,就只产生一个新的线程。然而,如果你这样做,你必须确保线程的Waker在以后调用 poll 时被更新,否则你就不能唤醒最近的Waker。
为了修复我们之前的实现,我们可以这样做。
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
// This Some when we have spawned a thread, and None otherwise.
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
}
这有点复杂,但想法是,在每次调用 poll 时,future 会检查所提供的 waker 是否与之前记录的 waker 相匹配。如果两个 waker 匹配,那么就没有其他事情要做。如果不匹配,则必须更新记录的 waker。
Notify 工具
我们演示了如何使用wakers手工实现一个 Delay future。Wakers是异步Rust工作方式的基础。通常情况下,没有必要深入到这个水平。例如,在Delay的情况下,我们可以通过使用 tokio::sync::Notify
工具,完全用 async/await
实现它。这个工具提供了一个基本的任务通知机制。它处理了waker的细节,包括确保记录的waker与当前任务相匹配。
使用Notify,我们可以像这样用 async/await
实现一个 Delay 函数。
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify2.notify_one();
});
notify.notified().await;
}