桥接同步代码
在我们到目前为止看到的例子中,我们用 #[tokio::main]
标记了主函数,并使整个项目成为异步的。然而,这对所有项目来说都是不可取的。例如,一个GUI应用程序可能希望在主线程上运行GUI代码,并在另一个线程上运行Tokio运行时。
本页解释了如何将异步/等待隔离到项目的一小部分。
#[tokio::main]
扩展到什么?
#[tokio::main]
宏是一个将你的主函数替换为非同步主函数的宏,它启动一个运行时,然后调用你的代码。例如,这样的main函数
#[tokio::main]
async fn main() {
println!("Hello world");
}
通过这个宏变成了这样:
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
为了在我们自己的项目中使用 async/await
,我们可以做一些类似的事情,我们利用. block_on
方法在适当的时候进入异步环境。
到mini-redis的同步接口
在本节中,我们将讨论如何通过存储 Runtime 对象并使用其 block_on 方法来构建 mini-redis 的同步接口。在接下来的章节中,我们将讨论一些替代的方法,以及何时应该使用每种方法。
我们要封装的接口是异步的 client 类型。它有几个方法,我们将实现以下方法的阻塞版本:
为此,我们引入了一个名为 src/blocking_client.rs
的新文件,并用一个围绕 async Client 类型的封装结构来初始化它:
use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;
pub use crate::client::Message;
/// Established connection with a Redis server.
pub struct BlockingClient {
/// The asynchronous `Client`.
inner: crate::client::Client,
/// A `current_thread` runtime for executing operations on the
/// asynchronous client in a blocking manner.
rt: Runtime,
}
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
// Call the asynchronous connect method using the runtime.
let inner = rt.block_on(crate::client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
在这里,我们把构造函数作为第一个例子,说明如何在非同步上下文中执行异步方法。我们使用Tokio Runtime类型上的block_on方法来做到这一点,它执行一个异步方法并返回其结果。
一个重要的细节是对 current_thread 运行时的使用。通常在使用Tokio时,你会使用默认的 multi_thread 运行时,它将产生一堆后台线程,这样它就可以有效地同时运行许多东西。对于我们的用例,我们每次只做一件事,所以我们不会因为运行多个线程而获得任何好处。这使得 current_thread 运行时成为完美的选择,因为它不会产生任何线程。
enable_all
调用启用了 Tokio 运行时的IO和定时器驱动。如果它们没有被启用,运行时就无法执行IO或定时器。
因为 current_thread 运行时不产生线程,所以它只在 block_on 被调用时运行。一旦 block_on 返回,所有在该运行时上生成的任务将冻结,直到你再次调用 block_on。如果催生的任务在不调用 block_on 时必须继续运行,请使用 multi_threaded runtime。
一旦我们有了这个结构体,大部分的方法就很容易实现:
use bytes::Bytes;
use std::time::Duration;
impl BlockingClient {
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
self.rt.block_on(self.inner.get(key))
}
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
self.rt.block_on(self.inner.set(key, value))
}
pub fn set_expires(
&mut self,
key: &str,
value: Bytes,
expiration: Duration,
) -> crate::Result<()> {
self.rt.block_on(self.inner.set_expires(key, value, expiration))
}
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.rt.block_on(self.inner.publish(channel, message))
}
}
Client::subscribe方法更有趣,因为它将 Client 转化为 Subscriber 对象。我们可以用以下方式实现它:
/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform
/// pub/sub related commands. The `BlockingClient` type is
/// transitioned to a `BlockingSubscriber` type in order to
/// prevent non-pub/sub methods from being called.
pub struct BlockingSubscriber {
/// The asynchronous `Subscriber`.
inner: crate::client::Subscriber,
/// A `current_thread` runtime for executing operations on the
/// asynchronous client in a blocking manner.
rt: Runtime,
}
impl BlockingClient {
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
Ok(BlockingSubscriber {
inner: subscriber,
rt: self.rt,
})
}
}
impl BlockingSubscriber {
pub fn get_subscribed(&self) -> &[String] {
self.inner.get_subscribed()
}
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
self.rt.block_on(self.inner.next_message())
}
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.subscribe(channels))
}
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.unsubscribe(channels))
}
}
因此,subscribe 方法将首先使用运行时将异步 Client 转化为异步 Subscriber。然后,它将把产生的 Subscriber 与运行时一起存储,并使用 block_on 实现各种方法。
请注意,异步 Subscriber 结构体有一个名为 get_subscribed 的非异步方法。为了处理这个问题,我们只需直接调用它而不涉及运行时。
其他方法
上节解释了实现同步包装器的最简单的方法,但这并不是唯一的方法。这些方法是:
- 创建一个 Runtime 并在异步代码上调用 block_on。
- 创建一个 Runtime 并在其上生成事物。
- 在一个单独的线程中运行 Runtime 并向其发送消息。
我们已经看到了第一种方法。其他两种方法概述如下。
在运行时上生成东西
运行时对象有一个叫做 spawn 的方法。当你调用这个方法时,你会创建一个新的在运行时上运行的后台任务。比如说
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
fn main() {
let runtime = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut handles = Vec::with_capacity(10);
for i in 0..10 {
handles.push(runtime.spawn(my_bg_task(i)));
}
// Do something time-consuming while the background tasks execute.
std::thread::sleep(Duration::from_millis(750));
println!("Finished time-consuming task.");
// Wait for all of them to complete.
for handle in handles {
// The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
// a future, so we can wait for it using `block_on`.
runtime.block_on(handle).unwrap();
}
}
async fn my_bg_task(i: u64) {
// By subtracting, the tasks with larger values of i sleep for a
// shorter duration.
let millis = 1000 - 50 * i;
println!("Task {} sleeping for {} ms.", i, millis);
sleep(Duration::from_millis(millis)).await;
println!("Task {} stopping.", i);
}
在上面的例子中,我们在运行时催生了 10 个后台任务,然后等待所有的任务。作为一个例子,这可能是在图形应用程序中实现后台网络请求的一个好方法,因为网络请求在GUI主线程上运行太耗时了。相反,你在后台运行的 Tokio 运行时上生成请求,并让任务在请求完成后将信息送回GUI代码,如果你想要一个进度条,甚至可以递增。
在这个例子中,重要的是,运行时被配置为多线程运行时。如果你把它改为 current_thread 运行时,你会发现耗时的任务会在任何后台任务开始之前完成。这是因为在 current_thread 运行时上生成的后台任务只在调用 block_on 时执行,否则运行时就没有地方可以运行它们。
这个例子通过对调用 spawn 返回的 JoinHandle 调用 block_on 来等待生成的任务完成,但这并不是唯一的方法。这里有一些替代方法。
- 使用消息传递通道,如
tokio::sync::mpsc
。 - 修改由 Mutex 等保护的共享值。这对GUI中的进度条来说是一个很好的方法,GUI在每一帧都会读取共享值。
spawn 方法在 Handle 类型上也是可用的。Handle 类型可以被克隆,以便在一个运行时中获得许多句柄,每个 Handle 都可以用来在运行时中生成新的任务。
发送消息
第三种技术是催生一个运行时,并使用消息传递来与之通信。这比其他两种方法涉及更多的模板,但它是最灵活的方法。你可以在下面找到一个基本的例子。
use tokio::runtime::Builder;
use tokio::sync::mpsc;
pub struct Task {
name: String,
// info that describes the task
}
async fn handle_task(task: Task) {
println!("Got task {}", task.name);
}
#[derive(Clone)]
pub struct TaskSpawner {
spawn: mpsc::Sender<Task>,
}
impl TaskSpawner {
pub fn new() -> TaskSpawner {
// Set up a channel for communicating.
let (send, mut recv) = mpsc::channel(16);
// Build the runtime for the new thread.
//
// The runtime is created before spawning the thread
// to more cleanly forward errors if the `unwrap()`
// panics.
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
rt.block_on(async move {
while let Some(task) = recv.recv().await {
tokio::spawn(handle_task(task));
}
// Once all senders have gone out of scope,
// the `.recv()` call returns None and it will
// exit from the while loop and shut down the
// thread.
});
});
TaskSpawner {
spawn: send,
}
}
pub fn spawn_task(&self, task: Task) {
match self.spawn.blocking_send(task) {
Ok(()) => {},
Err(_) => panic!("The shared runtime has shut down."),
}
}
}
这个例子可以用很多方式来配置。例如,你可以使用一个 Semaphore 来限制活动任务的数量,或者你可以使用一个相反方向的通道来向 spawner 发送响应。当你以这种方式催生一个运行时,它是一种 actor。