桥接同步代码

Tokio教程之桥接同步代码

https://tokio.rs/tokio/tutorial/bridging

在我们到目前为止看到的例子中,我们用 #[tokio::main] 标记了主函数,并使整个项目成为异步的。然而,这对所有项目来说都是不可取的。例如,一个GUI应用程序可能希望在主线程上运行GUI代码,并在另一个线程上运行Tokio运行时。

本页解释了如何将异步/等待隔离到项目的一小部分。

#[tokio::main]扩展到什么?

#[tokio::main] 宏是一个将你的主函数替换为非同步主函数的宏,它启动一个运行时,然后调用你的代码。例如,这样的main函数

#[tokio::main]
async fn main() {
    println!("Hello world");
}

通过这个宏变成了这样:

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

为了在我们自己的项目中使用 async/await,我们可以做一些类似的事情,我们利用. block_on 方法在适当的时候进入异步环境。

到mini-redis的同步接口

在本节中,我们将讨论如何通过存储 Runtime 对象并使用其 block_on 方法来构建 mini-redis 的同步接口。在接下来的章节中,我们将讨论一些替代的方法,以及何时应该使用每种方法。

我们要封装的接口是异步的 client 类型。它有几个方法,我们将实现以下方法的阻塞版本:

为此,我们引入了一个名为 src/blocking_client.rs 的新文件,并用一个围绕 async Client 类型的封装结构来初始化它:

use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;

pub use crate::client::Message;

/// Established connection with a Redis server.
pub struct BlockingClient {
    /// The asynchronous `Client`.
    inner: crate::client::Client,

    /// A `current_thread` runtime for executing operations on the
    /// asynchronous client in a blocking manner.
    rt: Runtime,
}

pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()?;

    // Call the asynchronous connect method using the runtime.
    let inner = rt.block_on(crate::client::connect(addr))?;

    Ok(BlockingClient { inner, rt })
}

在这里,我们把构造函数作为第一个例子,说明如何在非同步上下文中执行异步方法。我们使用Tokio Runtime类型上的block_on方法来做到这一点,它执行一个异步方法并返回其结果。

一个重要的细节是对 current_thread 运行时的使用。通常在使用Tokio时,你会使用默认的 multi_thread 运行时,它将产生一堆后台线程,这样它就可以有效地同时运行许多东西。对于我们的用例,我们每次只做一件事,所以我们不会因为运行多个线程而获得任何好处。这使得 current_thread 运行时成为完美的选择,因为它不会产生任何线程。

enable_all 调用启用了 Tokio 运行时的IO和定时器驱动。如果它们没有被启用,运行时就无法执行IO或定时器。

因为 current_thread 运行时不产生线程,所以它只在 block_on 被调用时运行。一旦 block_on 返回,所有在该运行时上生成的任务将冻结,直到你再次调用 block_on。如果催生的任务在不调用 block_on 时必须继续运行,请使用 multi_threaded runtime。

一旦我们有了这个结构体,大部分的方法就很容易实现:

use bytes::Bytes;
use std::time::Duration;

impl BlockingClient {
    pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
        self.rt.block_on(self.inner.get(key))
    }

    pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
        self.rt.block_on(self.inner.set(key, value))
    }

    pub fn set_expires(
        &mut self,
        key: &str,
        value: Bytes,
        expiration: Duration,
    ) -> crate::Result<()> {
        self.rt.block_on(self.inner.set_expires(key, value, expiration))
    }

    pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
        self.rt.block_on(self.inner.publish(channel, message))
    }
}

Client::subscribe方法更有趣,因为它将 Client 转化为 Subscriber 对象。我们可以用以下方式实现它:

/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform
/// pub/sub related commands. The `BlockingClient` type is
/// transitioned to a `BlockingSubscriber` type in order to
/// prevent non-pub/sub methods from being called.
pub struct BlockingSubscriber {
    /// The asynchronous `Subscriber`.
    inner: crate::client::Subscriber,

    /// A `current_thread` runtime for executing operations on the
    /// asynchronous client in a blocking manner.
    rt: Runtime,
}

impl BlockingClient {
    pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
        let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
        Ok(BlockingSubscriber {
            inner: subscriber,
            rt: self.rt,
        })
    }
}

impl BlockingSubscriber {
    pub fn get_subscribed(&self) -> &[String] {
        self.inner.get_subscribed()
    }

    pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
        self.rt.block_on(self.inner.next_message())
    }

    pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.subscribe(channels))
    }

    pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.unsubscribe(channels))
    }
}

因此,subscribe 方法将首先使用运行时将异步 Client 转化为异步 Subscriber。然后,它将把产生的 Subscriber 与运行时一起存储,并使用 block_on 实现各种方法。

请注意,异步 Subscriber 结构体有一个名为 get_subscribed 的非异步方法。为了处理这个问题,我们只需直接调用它而不涉及运行时。

其他方法

上节解释了实现同步包装器的最简单的方法,但这并不是唯一的方法。这些方法是:

  • 创建一个 Runtime 并在异步代码上调用 block_on。
  • 创建一个 Runtime 并在其上生成事物。
  • 在一个单独的线程中运行 Runtime 并向其发送消息。

我们已经看到了第一种方法。其他两种方法概述如下。

在运行时上生成东西

运行时对象有一个叫做 spawn 的方法。当你调用这个方法时,你会创建一个新的在运行时上运行的后台任务。比如说

use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};

fn main() {
    let runtime = Builder::new_multi_thread()
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(runtime.spawn(my_bg_task(i)));
    }

    // Do something time-consuming while the background tasks execute.
    std::thread::sleep(Duration::from_millis(750));
    println!("Finished time-consuming task.");

    // Wait for all of them to complete.
    for handle in handles {
        // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
        // a future, so we can wait for it using `block_on`.
        runtime.block_on(handle).unwrap();
    }
}

async fn my_bg_task(i: u64) {
    // By subtracting, the tasks with larger values of i sleep for a
    // shorter duration.
    let millis = 1000 - 50 * i;
    println!("Task {} sleeping for {} ms.", i, millis);

    sleep(Duration::from_millis(millis)).await;

    println!("Task {} stopping.", i);
}

在上面的例子中,我们在运行时催生了 10 个后台任务,然后等待所有的任务。作为一个例子,这可能是在图形应用程序中实现后台网络请求的一个好方法,因为网络请求在GUI主线程上运行太耗时了。相反,你在后台运行的 Tokio 运行时上生成请求,并让任务在请求完成后将信息送回GUI代码,如果你想要一个进度条,甚至可以递增。

在这个例子中,重要的是,运行时被配置为多线程运行时。如果你把它改为 current_thread 运行时,你会发现耗时的任务会在任何后台任务开始之前完成。这是因为在 current_thread 运行时上生成的后台任务只在调用 block_on 时执行,否则运行时就没有地方可以运行它们。

这个例子通过对调用 spawn 返回的 JoinHandle 调用 block_on 来等待生成的任务完成,但这并不是唯一的方法。这里有一些替代方法。

  • 使用消息传递通道,如 tokio::sync::mpsc
  • 修改由 Mutex 等保护的共享值。这对GUI中的进度条来说是一个很好的方法,GUI在每一帧都会读取共享值。

spawn 方法在 Handle 类型上也是可用的。Handle 类型可以被克隆,以便在一个运行时中获得许多句柄,每个 Handle 都可以用来在运行时中生成新的任务。

发送消息

第三种技术是催生一个运行时,并使用消息传递来与之通信。这比其他两种方法涉及更多的模板,但它是最灵活的方法。你可以在下面找到一个基本的例子。

use tokio::runtime::Builder;
use tokio::sync::mpsc;

pub struct Task {
    name: String,
    // info that describes the task
}

async fn handle_task(task: Task) {
    println!("Got task {}", task.name);
}

#[derive(Clone)]
pub struct TaskSpawner {
    spawn: mpsc::Sender<Task>,
}

impl TaskSpawner {
    pub fn new() -> TaskSpawner {
        // Set up a channel for communicating.
        let (send, mut recv) = mpsc::channel(16);

        // Build the runtime for the new thread.
        //
        // The runtime is created before spawning the thread
        // to more cleanly forward errors if the `unwrap()`
        // panics.
        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            rt.block_on(async move {
                while let Some(task) = recv.recv().await {
                    tokio::spawn(handle_task(task));
                }

                // Once all senders have gone out of scope,
                // the `.recv()` call returns None and it will
                // exit from the while loop and shut down the
                // thread.
            });
        });

        TaskSpawner {
            spawn: send,
        }
    }

    pub fn spawn_task(&self, task: Task) {
        match self.spawn.blocking_send(task) {
            Ok(()) => {},
            Err(_) => panic!("The shared runtime has shut down."),
        }
    }
}

这个例子可以用很多方式来配置。例如,你可以使用一个 Semaphore 来限制活动任务的数量,或者你可以使用一个相反方向的通道来向 spawner 发送响应。当你以这种方式催生一个运行时,它是一种 actor。