这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Tokio学习笔记

介绍Tokio学习笔记的基本资料和访问方式

1 - Tokio介绍

Tokio的介绍,以及Tokio的资料收集

1.1 - Tokio概述

Tokio概述

官方介绍

来自 tokio 官网首页 的介绍:

Build reliable network applications without compromising speed.

在不影响速度的情况下建立可靠的网络应用。

Tokio is an asynchronous runtime for the Rust programming language. It provides the building blocks needed for writing network applications. It gives the flexibility to target a wide range of systems, from large servers with dozens of cores to small embedded devices.

Tokio是用于Rust编程语言的一个异步运行时。它提供了编写网络应用所需的构建块。它提供了针对各种系统的灵活性,从有几十个内核的大型服务器到小型嵌入式设备。

声称的主要优势:

  • Reliable/可靠:Tokio 的 API 是内存安全和线程安全的,并且是抗误操作的。这有助于防止常见的错误,如无界队列、缓冲区溢出和任务饿死。
  • Fast/快速:构建在Rust之上,Tokio提供了一个多线程的、抢占式的调度器。应用程序可以每秒处理数十万个请求,而且开销很小。
  • Easy/简单:async/await 减少了编写异步应用程序的复杂性。与Tokio的实用工具和充满活力的生态系统相配,编写应用程序是一件轻而易举的事。
  • Flexible/弹性:服务器应用程序的需求与嵌入式设备的需求不同。尽管Tokio带有默认值可以开箱即用,但它也提供了所需的旋钮,以便对不同的情况进行微调。

技术栈

应用程序不是在真空中建立的。Tokio 技术栈包括交货到生产所需的一切。

  • Runtime: Tokio运行时包括I/O、定时器、文件系统、同步和调度设施,是异步应用的基础。
  • Hyper: Hyper是一个HTTP客户端和服务器库,同时支持HTTP 1和2协议。
  • Tonic: 一个无固定规则(boilerplate-free)的gRPC客户端和服务器库。通过网络发布和使用API的最简单方法。
  • Tower:用于建立可靠的客户端和服务器的模块化组件。包括重试、负载平衡、过滤、请求限制设施等。
  • Mio:在操作系统的事件化I/O API之上的最小的可移植API。
  • Tracing: 对应用程序和库的统一的洞察力。提供结构化的、基于事件的数据收集和记录。
  • Bytes:在核心部分,网络应用程序操纵字节流。Bytes提供了一套丰富的实用程序来操作字节数组。

1.2 - Tokio术语

Tokio术语

来自:https://tokio.rs/tokio/glossary

异步(Asynchronous)

在Rust的上下文中,异步代码指的是使用async/await语言特性的代码,它允许许多任务在几个线程(甚至是一个单线程)上并发运行。

并发(Concurrency)和并行(Parallelism)

并发和并行是两个相关的概念,在谈论同时执行多个任务时都会用到。如果某件事情是平行发生的,那么它也是并发发生的,但反过来就不是这样了。在两个任务之间交替进行,但实际上从未同时进行两个任务,这就是并发,但不是并行。

未来(Future)

Future是一个存储某些操作的当前状态的值。future也有一个poll方法,它使操作继续进行,直到它需要等待某些东西,比如网络连接。对poll方法的调用应该很快返回。

Future通常是通过在一个异步块中使用.await来组合多个Future来创建的。

执行器(Executor)/调度器(scheduler)

执行器或调度器是通过重复调用poll方法来执行future的东西。标准库中没有执行器,所以你需要一个外部库来实现,而使用最广泛的执行器是由Tokio运行时提供。

执行器能够在几个线程上并发地运行大量的 future。它通过在等待时交换当前运行的任务来做到这一点。如果代码花了很长时间都没有到达 .await,这就被称为 “阻塞线程” 或 “not yielding back to the executor”,这将阻止其他任务的运行。

运行时(Runtime)

运行时是一个库,它包含执行器以及与该执行器集成的各种实用工具,如定时实用工具和IO。运行时和执行器这两个词有时可以互换使用。标准库没有运行时,所以你需要一个外部库来实现,最广泛使用的运行时是 Tokio 运行时。

Runtime这个词也用在其他场合,例如,“Rust没有运行时 “这句话有时被用来表示Rust不执行垃圾收集或即时(just-in-time,JIT)编译。

任务(Task)

任务是在Tokio运行时上运行的操作,由 tokio::spawnRuntime::block_on 函数创建。通过组合创建期货的工具,如 .awaitjoin! 并不创建新的任务,每个组合的部分被说成是 “在同一个任务中”。

多个任务是需要并行的,但使用 join! 等工具可以在一个任务上并发地做多件事情。

spawn

spawn 是指使用 tokio::spawn 函数来创建一个新的任务。它也可以指用std::thread::spoon创建新的线程。

异步块(Async block)

异步块是创建一个运行一些代码的future的简单方法。比如说:

let world = async {
    println!(" world!");
};
let my_future = async {
    print!("Hello ");
    world.await;
};

上面的代码创建了一个名为 my_future 的future,如果执行它,就会打印出 Hello world!。 它是通过首先打印 hello,然后运行 world future来实现的。请注意,上面的代码不会自己打印任何东西–你必须在任何事情发生之前实际执行my_future,要么直接生成(spawn)它,要么在你生成(spawn)的东西中等待它。

异步函数(Async function)

与异步块类似,异步函数是一种创建函数的简单方法,其主体成为一个future。所有的异步函数都可以被改写成返回一个future的普通函数。

async fn do_stuff(i: i32) -> String {
    // do stuff
    format!("The integer is {}.", i)
}
use std::future::Future;

// the async function above is the same as this:
fn do_stuff(i: i32) -> impl Future<Output = String> {
    async move {
        // do stuff
        format!("The integer is {}.", i)
    }
}

这使用 impl Trait 语法来返回一个 future,因为 Future 是一个 trait。请注意,由于由异步块创建的 future 在执行之前不会做任何事情,所以调用异步函数在其返回的 future 被执行之前不会做任何事情(忽略它将触发一个警告)。

让出(Yielding)

在异步Rust的背景下,Yielding是允许执行者在单个线程上执行许多future的原因。每当一个future让出时,执行者能够将该future与其他future交换,通过反复交换当前任务,执行者可以并发地执行大量的任务。future只能在 .await 时让出,所以在 .await 之间花很长时间的future可以阻止其他任务的运行。

具体来说,future只要从 poll 方法中返回就会让出。

阻塞(Blocking)

“阻塞(blocking)“这个词有两种不同的用法: 阻塞的第一个含义是简单地等待某事完成,而阻塞的另一个含义是当一个 future 花很长的时间而不让出。为了明确起见,你可以用 “阻塞线程” 这个短语来表示第二种含义。

Tokio的文档总是使用 “阻塞” 的第二种含义。

要在Tokio中运行阻塞代码,请参见Tokio API参考中的 CPU绑定任务和阻塞代码 部分。

流(Stream)

StreamIterator 的异步版本,它提供了一个数值流。它通常与 while let 循环一起使用,就像这样:

use tokio_stream::StreamExt; // for next()

while let Some(item) = stream.next().await {
    // do something
}

流这个词有时被混乱地用来指代 AsyncReadAsyncWrite 特性。

Tokio的流工具目前是由 tokio-stream crate提供的。一旦Stream特性在std中稳定下来,Stream工具将被移到 tokio crate中。

通道(Channel)

通道是一种工具,允许代码的一个部分向其他部分发送消息。Tokio提供了许多通道,每个通道都有不同的用途:

  • mpsc:多生产者、单消费者通道。可以发送许多值。
  • oneshot:单生产者,单消费者通道。可以发送一个单一的值。
  • broadcast:多生产者,多消费者。可以发送许多值。每个接收者看到每个值。
  • watch:单生产者,多消费者。可以发送许多值,但不保留历史。接收者只看到最新的值。

如果你需要一个多生产者多消费者的通道,每个消息只有一个消费者看到,你可以使用 async-channel crate。

还有一些通道是在异步Rust之外使用的,比如 std::sync::mpsccrossbeam::channel。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。

背压(Backpressure)

背压是一种设计对高负荷反应良好的应用程序的模式。例如,mpsc 通道有有界的和无界的两种形式。通过使用有界通道,如果接收方不能跟上消息的数量,接收方可以对发送方施加 “背压”,这就避免了随着通道上的消息越来越多,内存使用量无限制地增长。

Actor

一种设计应用程序的模式。Actor是指一个独立生成的任务,它代表应用程序的其他部分管理一些资源,使用通道与应用程序的其他部分进行通信。

1.3 - 资料收集

收集Tokio的各种资料

官方网站

社区

  • TODO

文档

2 - Tokio教程

Tokio的官方教程

2.1 - Tokio教程概况

Tokio介绍,优势和不适用的场景

内容出处: https://tokio.rs/tokio/tutorial

Tokio是Rust编程语言的一个异步运行时。它提供了编写网络应用所需的构建模块。它提供了针对各种系统的灵活性,从有几十个内核的大型服务器到小型嵌入式设备。

在高层次上,Tokio提供了几个主要组件:

  • 一个用于执行异步代码的多线程运行时。
  • 一个标准库的异步版本。
  • 一个庞大的库生态系统。

Tokio 在项目中的作用

当你以异步方式编写你的应用程序时,你可以通过减少在同一时间做许多事情的成本,使它能够更好地扩展。然而,异步的Rust代码不会自己运行,所以你必须选择一个运行时来执行它。Tokio库是使用最广泛的运行时,在使用量上超过了所有其他运行时的总和。

此外,Tokio提供了许多有用的工具。在编写异步代码时,你不能使用Rust标准库提供的普通阻塞API,而必须使用它们的异步版本。这些替代版本是由Tokio提供的,在有意义的地方反映了Rust标准库的API。

Tokio的优势

本节将概述Tokio的一些优势。

快速

Tokio 是快速的,它建立在 Rust 编程语言之上,而 Rust 编程语言本身是快速的。这是按照 Rust 的精神来做的,目标是你不应该通过手工编写同等的代码来提高性能。

Tokio是可扩展的,建立在 async/await 语言特性之上,而这本身就是可扩展的。当处理网络时,由于延迟的原因,你能处理一个连接的速度是有限的,所以唯一的扩展方式是一次处理许多连接。有了 async/await 语言功能,增加并发操作的数量变得异常便宜,使你可以扩展到大量的并发任务。

可靠的

Tokio 是用 Rust 构建的,Rust 是一种使每个人都能构建可靠和高效软件的语言。一些研究发现,大约有70%的高严重度安全漏洞是由内存不安全造成的。使用Rust可以在你的应用程序中消除这整类错误。

Tokio也非常注重提供一致的行为,没有任何意外。Tokio的主要目标是让用户部署可预测的软件,使其每天都有相同的表现,有可靠的响应时间,没有不可预知的延迟峰值。

简单

有了Rust的 async/await 功能,编写异步应用程序的复杂性就大大降低了。与Tokio的实用程序和充满活力的生态系统搭配,编写应用程序是一件轻而易举的事。

Tokio在合理的情况下遵循标准库的命名惯例。这使得只用标准库编写的代码很容易转换为用Tokio编写的代码。有了Rust强大的类型系统,轻松提供正确代码的能力是无可比拟的。

灵活

Tokio提供了多种运行时的变化。从多线程的、work-stealing 的运行时到轻量级的、单线程的运行时都有。每个运行时都有许多旋钮,允许用户根据自己的需要进行调整。

什么时候不使用Tokio

虽然Tokio对许多需要同时做很多事情的项目很有用,但也有一些Tokio不适合的使用情况。

  • 通过在几个线程上并行运行来加速由CPU控制的计算。Tokio是为IO绑定的应用而设计的,在这种情况下,每个单独的任务大部分时间都在等待IO。如果你的应用程序唯一做的事情是并行运行计算,你应该使用rayon。也就是说,如果你需要同时做这两件事,还是可以 “混合搭配” 的。

  • 读取大量的文件。虽然看起来Tokio对那些仅仅需要读取大量文件的项目很有用,但与普通线程池相比,Tokio在这里没有提供任何优势。这是因为操作系统一般不提供异步文件API。

  • 发送单个网络请求。Tokio给你带来优势的地方是当你需要同时做很多事情时。如果你需要使用一个用于异步Rust的库,如reqwest,但你不需要同时做很多事情,你应该选择该库的阻塞版本,因为它将使你的项目更简单。当然,使用Tokio仍然可以工作,但与阻塞式API相比,没有提供真正的优势。如果该库没有提供阻塞式的API,请看关于用同步代码桥接的章节。

2.2 - 教程准备工作

完成构建Redis客户端和服务器的过程

内容出处:https://tokio.rs/tokio/tutorial/setup

本教程将带领你一步一步地完成构建Redis客户端和服务器的过程。我们将从Rust的异步编程的基础知识开始,并在此基础上建立起来。我们将实现Redis命令的一个子集,但会对Tokio进行全面考察。

Mini-Redis

你将在本教程中构建的项目在 GitHub 上以 Mini-Redis 的形式提供。Mini-Redis是以学习Tokio为主要目的而设计的,因此注释得非常好,但这也意味着Mini-Redis缺少一些你希望在真正的Redis库中实现的功能。你可以在 crates.io 上找到可用于生产的 Redis 库。

我们将在本教程中直接使用Mini-Redis。这允许我们在教程中使用Mini-Redis的部分功能,然后再在后面的教程中实现它们。

获得帮助

在任何时候,如果你遇到困难,你都可以在Discord或GitHub的讨论中得到帮助。不要担心问 “初学者” 的问题。我们都是从某处开始的,并且很乐意提供帮助。

前提条件

读者应该已经熟悉了Rust。Rust-book 是一个很好的入门资源。

虽然不是必须的,但使用Rust标准库或其他语言编写网络代码的一些经验可能会有所帮助。

不需要对Redis有任何预先了解。

rust

在开始之前,你应该确保你已经安装了Rust工具链并准备好了。如果你没有,最简单的方法是使用rustup来安装它。

本教程要求至少有1.45.0版本的Rust,但建议使用最新的稳定版本的Rust。

要检查你的电脑上是否安装了Rust,请运行以下程序。

$ rustc --version

Mini-Redis服务器

接下来,安装Mini-Redis服务器。这将被用来测试我们的客户端,因为我们正在构建它。

cargo install mini-redis

2.3 - Hello Tokio

编写一个非常基本的Tokio应用程序

https://tokio.rs/tokio/tutorial/hello-tokio

我们将通过编写一个非常基本的Tokio应用程序开始。它将连接到Mini-Redis服务器,将 key hello的值设置为world。然后它将读回key。这将使用Mini-Redis客户端库来完成。

代码

生成一个新的crate

让我们从生成一个新的Rust应用程序开始:

$ cargo new my-redis
$ cd my-redis

添加依赖项

接下来,打开Cargo.toml,在 [dependencies] 下面添加以下内容:

tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"

编写代码

然后,打开main.rs,将该文件的内容替换为:

use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
    // Open a connection to the mini-redis address.
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Set the key "hello" with value "world"
    client.set("hello", "world".into()).await?;

    // Get key "hello"
    let result = client.get("hello").await?;

    println!("got value from the server; result={:?}", result);

    Ok(())
}

确保Mini-Redis服务器正在运行。在一个单独的终端窗口,运行:

$ mini-redis-server

如果你还没有安装mini-redis,你可以用

$ cargo install mini-redis

现在,运行my-redis应用程序:

$ cargo run
got value from the server; result=Some(b"world")

代码分解

让我们花点时间来看看我们刚刚做了什么。没有太多的代码,但有很多事情正在发生。

let mut client = client::connect("127.0.0.1:6379").await?;

client::connect 函数是由 mini-redis crate提供的。它异步地与指定的远程地址建立了一个TCP连接。一旦连接建立起来,就会返回一个 client 句柄。尽管操作是异步进行的,但我们写的代码看起来是同步的。唯一表明该操作是异步的是 .await 操作符。

什么是异步编程?

大多数计算机程序的执行顺序与它的编写顺序相同。第一行执行,然后是下一行,以此类推。在同步编程中,当程序遇到不能立即完成的操作时,它就会阻塞,直到操作完成。例如,建立一个TCP连接需要在网络上与一个对等体进行交换,这可能需要相当长的时间。在这段时间内,线程会被阻塞。

通过异步编程,不能立即完成的操作被暂停到后台。线程没有被阻塞,可以继续运行其他事情。一旦操作完成,任务就会被取消暂停,并继续从它离开的地方处理。我们之前的例子中只有一个任务,所以在它被暂停的时候什么都没有发生,但异步程序通常有许多这样的任务。

尽管异步编程可以带来更快的应用,但它往往导致更复杂的程序。程序员需要跟踪所有必要的状态,以便在异步操作完成后恢复工作。从历史上看,这是一项繁琐且容易出错的任务。

编译时绿色线程

Rust使用一个叫做 async/await 的功能实现了异步编程。执行异步操作的函数都标有 async 关键字。在我们的例子中,connect函数是这样定义的:

use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;

pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
    // ...
}

async fn 的定义看起来像一个普通的同步函数,但却以异步方式运行。Rust在编译时将 async fn 转化为一个异步运行的routine。在 async fn 中对 .await 的任何调用都会将控制权交还给线程。当操作在后台进行时,线程可以做其他工作。

尽管其他语言也实现了async/await,但Rust采取了一种独特的方法。主要是,Rust的异步操作是 lazy 的。这导致了与其他语言不同的运行时语义。

如果这还不是很有意义,不要担心。我们将在本指南中更多地探讨async/await。

使用 async/await

异步函数的调用与其他Rust函数一样。然而,调用这些函数并不会导致函数主体的执行。相反,调用 async fn 会返回一个代表操作的值。这在概念上类似于一个零参数闭包。要实际运行该操作,你应该在返回值上使用 .await 操作符。

例如,给定的程序:

async fn say_world() {
    println!("world");
}

#[tokio::main]
async fn main() {
    // Calling `say_world()` does not execute the body of `say_world()`.
    let op = say_world();

    // This println! comes first
    println!("hello");

    // Calling `.await` on `op` starts executing `say_world`.
    op.await;
}

输出为:

hello
world

async fn 的返回值是一个匿名类型,它实现了 Future trait。

异步main函数

用于启动应用程序的main函数与大多数Rust工具箱中的常见函数不同。

  • 它是 async fn
  • 它被注解为 #[tokio::main]

使用 async fn 是因为我们想进入一个异步上下文。然而,异步函数必须由一个运行时来执行。运行时包含异步任务调度器,提供事件化I/O、计时器等。运行时不会自动启动,所以主函数需要启动它。

#[tokio::main] 函数是一个宏。它将 async fn main() 转换为同步 fn main(),初始化一个运行时实例并执行异步main函数。

例如,下面的例子:

#[tokio::main]
async fn main() {
    println!("hello");
}

被转化为:

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

Tokio运行时的细节将在后面介绍。

Cargo features

在本教程中依赖Tokio时,启用了 full 的功能标志:

tokio = { version = "1", features = ["full"] }

Tokio 有很多功能(TCP、UDP、Unix 套接字、定时器、同步工具、多种调度器类型等)。不是所有的应用程序都需要所有的功能。当试图优化编译时间或最终应用程序的足迹时,应用程序可以决定只选择进入它所使用的功能。

目前,在依赖 tokio 时,请使用 “full” feature。

2.4 - spawning

Redis服务器开始支持并发请求,但还不能共享数据

内容出处:https://tokio.rs/tokio/tutorial/spawning

我们将换个角度,开始在Redis服务器上工作。

首先,把上一节中的客户端SET/GET代码移到一个示例文件中。这样,我们就可以针对我们的服务器运行它:

$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs

然后创建一个新的、空的 src/main.rs 并继续。

接受套接字

我们的Redis服务器需要做的第一件事是接受入站的TCP套接字。这是用 tokio::net::TcpListener 完成的。

Tokio的许多类型与Rust标准库中的同步类型命名相同。在合理的情况下,Tokio暴露了与std相同的API,但使用了async fn。

TcpListener被绑定到6379端口,然后在一个循环中接受socket。每个套接字都被处理,然后关闭。现在,我们将读取命令,将其打印到stdout,并回应一个错误。

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // The second item contains the IP and port of the new connection.
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis **frames** instead of
    // byte streams. The `Connection` type is defined by mini-redis.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // Respond with an error
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

现在,运行这个 accept 循环:

$ cargo run

在一个单独的终端窗口,运行 hello-redis 的例子(上一节的SET/GET命令):

$ cargo run --example hello-redis

输出应该是:

Error: "unimplemented"

在服务器终端,输出是:

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

并发

我们的服务器有一个小问题(除了只回应错误)。它一次处理一个入站请求。当一个连接被接受时,服务器停留在接受循环块内,直到响应被完全写入套接字。

我们希望我们的Redis服务器能够处理许多并发的请求。要做到这一点,我们需要增加一些并发性。

并发和并行是不一样的。如果你在两个任务之间交替进行,那么你就是在同时进行两个任务,但不是并行的。要想获得并行的资格,你需要两个人,一个人专门负责每个任务。

使用Tokio的一个优点是,异步代码允许你在许多任务上并发工作,而不必使用普通线程并行工作。事实上,Tokio可以在一个单线程上并发运行许多任务

为了并发地处理连接,为每个入站连接生成一个新的任务。连接在这个任务中被处理。

接受循环变成:

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

任务

Tokio任务是一个异步的绿色线程。它们是通过传递一个异步块给 tokio::spawn 来创建的。tokio::spawn 函数返回 JoinHandle,调用者可以用它来与生成的任务进行交互。该异步块可以有一个返回值。调用者可以使用 JoinHandle 上的 .await 获取返回值。

比如说:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });

    // Do some other work

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

对JoinHandle的等待会返回一个 Result。当任务在执行过程中遇到错误时,JoinHandle将返回 Err。这发生在任务恐慌的时候,或者任务被运行时关闭而强行取消的时候。

任务是由调度器管理的执行单位。生成的任务提交给 Tokio 调度器,然后确保该任务在有工作要做时执行。生成的任务可以在它被生成的同一线程上执行,也可以在不同的运行时线程上执行。任务在被催生后也可以在线程之间移动。

Tokio中的任务是非常轻便的。在引擎盖下,它们只需要一次分配和64字节的内存。应用程序可以自由地生成数千,甚至数百万个任务。

'static bound

当你在Tokio运行时中催生一个任务时,它的类型必须是 'static 的。这意味着生成的任务不能包含对任务之外拥有的数据的任何引用。

一个常见的误解是,'static 总是意味着 “永远活着”,但事实并非如此。仅仅因为一个值是 'static ,并不意味着你有内存泄漏。你可以在《常见的Rust寿命误解》中阅读更多内容。

例如,以下内容将无法编译:

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

试图编译时,出现了以下错误:

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

发生这种情况是因为,默认情况下,变量不会被 move 到异步块中。v向量仍然归主函数所有。rust编译器很好地解释了这一点,甚至还提出了解决方法 将第7行改为 task::spawn(async move { 将指示编译器将v move 到被催生的任务中。这样,该任务拥有其所有的数据,使其成为 ‘static。

如果数据必须被多个任务同时访问,那么它必须使用同步原语(如Arc)进行共享。

请注意,错误信息提到了参数类型超过了 'static 生命周期。这个术语可能相当令人困惑,因为 'static 生命周期一直持续到程序结束,所以如果它超过了这个生命周期,你不就有内存泄漏了吗?解释是,必须超过 'static 生命周期的是类型,而不是值,而且值可能在其类型不再有效之前被销毁。

当我们说一个值是 'static 的时候,所有的意思是,把这个值永远留在身边是不对的。这一点很重要,因为编译器无法推理出一个新产生的任务会停留多长时间,所以它能确保任务不会活得太久的唯一方法就是确保它可能永远存在。

上面信息框中的链接使用了 “bounded by 'static” 的术语,而不是 “its type outlives 'static” 或 “the value is 'static” for T: 'static。这些都是同一个意思,与 &'static T 中的注解 'static 是不同的。

Send bound

tokio::spawn产生的任务必须实现 Send。这允许Tokio运行时在线程之间 move 任务,而这些任务在一个 .await 中被暂停。

当所有跨 .await 调用的数据都是Send时,任务就是Send。这有点微妙。当 .await 被调用时,任务就回到了调度器中。下一次任务被执行时,它将从最后的让出点恢复。为了使其正常工作,所有在 .await 之后使用的状态都必须由任务保存。如果这个状态是 Send,即可以跨线程移动,那么任务本身也可以跨线程移动。反过来说,如果这个状态不是 Send,那么任务也不是。

例如,这样就可以了:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}

这并不是:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` is used after `.await`. It must be persisted to
        // the task's state.
        yield_now().await;

        println!("{}", rc);
    });
}

试图编译该片段的结果是:

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

我们将在下一章更深入地讨论这个错误的一个特例。

存储数值

我们现在将实现处理传入命令的 process 函数。我们将使用 HashMap 来存储值。SET命令将插入到HashMap中,GET 值将加载它们。此外,我们将使用一个循环来接受每个连接的一个以上的命令。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;

    // A hashmap is used to store data
    let mut db = HashMap::new();

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    // Use `read_frame` to receive a command from the connection.
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // The value is stored as `Vec<u8>`
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` expects data to be of type `Bytes`. This
                    // type will be covered later in the tutorial. For now,
                    // `&Vec<u8>` is converted to `Bytes` using `into()`.
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

现在,启动服务器:

$ cargo run

并在一个单独的终端窗口中,运行hello-redis的例子:

$ cargo run --example hello-redis

现在,输出将是:

got value from the server; result=Some(b"world")

我们现在可以获取和设置值了,但有一个问题:这些值在连接之间是不共享的。如果另一个套接字连接并试图获取Hello键,它将不会发现任何东西。

你可以在这里找到完整的代码。

在下一节中,我们将实现对所有套接字的数据进行持久化。

2.5 - 共享状态

使用 Mutex 来保护共享状态

https://tokio.rs/tokio/tutorial/shared-state

到目前为止,我们有一个键值服务器在工作。然而,有一个重大的缺陷:状态没有在不同的连接中共享。我们将在这篇文章中解决这个问题。

策略

在Tokio中,共享状态有几种不同的方式:

  1. 用Mutex来保护共享状态。
  2. 生成一个任务来管理状态,并使用消息传递来操作它。

一般来说,对于简单的数据使用第一种方法,而对于需要异步工作的东西使用第二种方法,比如I/O原语。在本章中,共享状态是一个HashMap,操作是 insert 和 get。这两种操作都不是异步的,所以我们将使用Mutex。

后一种方法将在下一章中介绍。

添加 bytes 依赖

Mini-Redis板块没有使用 Vec<u8>,而是使用 bytes crate的 Bytes。Bytes的目标是为网络编程提供一个强大的字节数组结构。相比 Vec<u8> 最大的特点是浅层克隆。换句话说,在 Bytes 实例上调用 clone() 并不复制基础数据。相反,Bytes 实例是对一些底层数据的一个引用计数的句柄。Bytes类型大致是一个Arc<Vec<u8>,但有一些附加功能。

要依赖字节,请在 Cargo.toml[dependencies] 部分添加以下内容:

bytes = "1"

初始化HashMap

HashMap将在许多任务和可能的许多线程之间共享。为了支持这一点,它被包裹在 Arc<Mutex<_>> 中。

首先,为方便起见,在use语句后添加以下类型别名:

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

然后,更新主函数以初始化HashMap,并将一个Arc句柄传递给 process 函数。使用Arc允许从许多任务中并发地引用HashMap,可能在许多线程上运行。在整个Tokio中,术语 handle 被用来引用一个提供对一些共享状态的访问的值。

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

关于 std::sync::Mutex 的使用

注意,使用 std::sync::Mutex 而不是 tokio::Mutex 来保护 HashMap。一个常见的错误是在异步代码中无条件地使用 tokio::sync::Mutex。异步Mutex是一个跨调用 .await 而被锁定的Mutex。

同步的mutex在等待获得锁的时候会阻塞当前线程。这反过来又会阻塞其他任务的处理。然而,切换到 tokio::sync::Mutex 通常没有帮助,因为异步mutex内部使用同步mutex。

作为一个经验法则,在异步代码中使用同步的mutex是可以的,只要竞争保持在较低的水平,并且在调用 .await 时不保持锁。此外,可以考虑使用parking_lot::Mutex 作为 std::sync::Mutex 的更快的替代品。

更新process()

process函数不再初始化一个HashMap。相反,它将 HashMap 的共享句柄作为一个参数。它还需要在使用 HashMap 之前锁定它。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

任务、线程和争用

当争夺最小的时候,使用一个阻塞的mutex来保护简短的关键部分是一个可以接受的策略。当锁被争夺时,执行任务的线程必须阻塞并等待mutex。这不仅会阻塞当前的任务,也会阻塞当前线程上安排的所有其他任务。

默认情况下,Tokio运行时使用一个多线程调度器。任务被安排在由运行时管理的任何数量的线程上。如果大量的任务被安排执行,并且它们都需要访问mutex,那么就会出现争夺。另一方面,如果使用 current_thread 运行时风味,那么mutex将永远不会被争夺。

current_thread 运行时是一个轻量级的、单线程的运行时。当只生成几个任务并打开少量的套接字时,它是一个很好的选择。例如,当在异步客户端库之上提供一个同步API桥接时,这个选项很好用。

如果同步 mutex 的争夺成为问题,最好的解决办法很少是切换到Tokio mutex。相反,要考虑的选项是。

  • 切换到一个专门的任务来管理状态并使用消息传递。
  • 分片 mutex。
  • 重组代码以避免使用mutex。

在我们的案例中,由于每个 key 都是独立的,mutex分片将很好地工作。为了做到这一点,我们将引入N个不同的实例,而不是一个Mutex<HashMap<_, _»实例。

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

然后,为任何给定的密钥寻找单元成为一个两步过程。首先,key 被用来识别它属于哪个分片。然后,在HashMap中查找该 key。

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

dashmap crate提供了一个分片哈希图的实现。

.await中持有MutexGuard

你可能会写这样的代码:

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

当你试图生成调用该函数的东西时,你会遇到以下错误信息:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

发生这种情况是因为 std::sync::MutexGuard 类型不是 Send。这意味着你不能把一个mutex锁发送到另一个线程,而错误的发生是因为Tokio运行时可以在每个 .await 的线程之间移动一个任务。为了避免这种情况,你应该重组你的代码,使互斥锁的析构器在.await之前运行。

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

请注意,这不起作用:

use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

这是因为编译器目前只根据作用域信息来计算一个future是否是Send。编译器有望在将来更新以支持显式丢弃,但现在,你必须显式地使用一个范围。

你不应该试图通过以不需要 Send 的方式催生任务来规避这个问题,因为如果Tokio在任务持有锁的时候将你的任务暂停在一个.await,一些其他的任务可能会被安排在同一个线程上运行,而这个其他的任务也可能试图锁定那个突变体,这将导致一个死锁,因为等待锁定突变体的任务会阻止持有突变体的任务释放突变体。

我们将在下面讨论一些方法来修复这个错误信息。

重组代码,使其不在一个.await中保持锁

我们已经在上面的片段中看到了一个例子,但还有一些更强大的方法可以做到这一点。例如,你可以将mutex包裹在一个结构中,并且只在该结构的非同步方法中锁定mutex。

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

这种模式保证你不会遇到Send错误,因为mutex guard不会出现在异步函数的任何地方。

生成任务来管理状态,并使用消息传递来操作它

这是本章开头提到的第二种方法,通常在共享资源是I/O资源时使用。更多细节见下一章。

使用Tokio的异步mutex

也可以使用 Tokio 提供的 tokio::sync::Mutex 类型。Tokio mutex的主要特点是,它可以跨 .await 持有,而没有任何问题。也就是说,异步的mutex比普通的mutex更昂贵,通常使用其他两种方法会更好。

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

2.6 - 通道

使用通道来传递消息,实现多个请求统一被发送给redis并返回应答给调用者

https://tokio.rs/tokio/tutorial/channels

现在我们已经学习了一些关于Tokio并发的知识,让我们在客户端应用这些知识。把我们之前写的服务器代码放到一个明确的二进制文件中:

mkdir src/bin
mv src/main.rs src/bin/server.rs

然后创建一个包含客户端代码的二进制文件:

touch src/bin/client.rs

本节中的代码将被写入到这个文件中。需要运行时,首先要在另外一个终端窗口中启动服务:

cargo run --bin server

然后在启动客户端,注意是分别启动在不同的终端中:

cargo run --bin client

好了,让我们开始编码。

假设我们想运行两个并发的Redis命令。我们可以为每个命令生成一个任务。那么这两条命令就会同时发生。

起初,我们可能会尝试类似的做法:

use mini_redis::client;

#[tokio::main]
async fn main() {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Spawn two tasks, one gets a key, the other sets a key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

这不会被编译,因为两个任务都需要以某种方式访问 client。由于Client没有实现Copy,如果没有一些代码来促进这种共享,它将不会被编译。此外,Client::set 需要 &mut self,这意味着调用它需要独占访问。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex,因为 .await 需要在持有锁的情况下被调用。我们可以使用 tokio::sync::Mutex,但这只允许一个飞行中的请求。如果客户端实现了管道化,那么异步Mutex会导致连接的利用率不足。

消息传递

答案是使用消息传递。这种模式包括产生一个专门的任务来管理 client 资源。任何希望发出请求的任务都会向 client 任务发送一个消息。client 任务代表发送者发出请求,并将响应发回给发送者。

使用这种策略,就可以建立一个单一的连接。管理 client 的任务能够获得排他性的访问,以便调用get和set。此外,通道作为一个缓冲区工作。在 client 任务忙碌的时候,操作可以被发送到 client 任务。一旦 client 任务可以处理新的请求,它就从通道中拉出下一个请求。这可以带来更好的吞吐量,并且可以扩展到支持连接池。

Tokio的通道原语

Tokio提供一些通道(channel),每个通道都有不同的用途:

  • mpsc:多生产者,单消费者通道。可以发送许多数值。
  • oneshot:单生产者,单消费者通道。可以发送一个单一的值。
  • broadcast: 多生产者,多消费者。可以发送许多值。每个接收者看到每个值。
  • watch:单生产者,多消费者。可以发送许多值,但不保留历史。接收者只看到最近的值。

如果你需要一个多生产者多消费者的通道,只有一个消费者看到每个消息,你可以使用 async-channel crate。也有一些通道用于异步Rust之外,比如 std::sync::mpsccrossbeam::channel。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。

在本节中,我们将使用 mpsconeshot。其他类型的消息传递通道将在后面的章节中进行探讨。本节的完整代码可在此找到。

定义消息类型

在大多数情况下,当使用消息传递时,接收消息的任务会对一个以上的命令做出响应。在我们的例子中,该任务将对GET和SET命令做出响应。为了对此进行建模,我们首先定义一个Command枚举,并为每个命令类型包含一个变量。

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}	

创建通道

在主函数中,创建 mpsc 通道:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Create a new channel with a capacity of at most 32.
    let (tx, mut rx) = mpsc::channel(32);

    // ... Rest comes here
}

mpsc 通道被用来向管理 redis 连接的任务发送命令。多生产者的能力允许从许多任务发送消息。创建通道会返回两个值,一个发送者和一个接收者。这两个句柄是单独使用的。它们可以被转移到不同的任务。

通道的创建容量为32。如果消息的发送速度比接收速度快,通道将储存这些消息。一旦通道中存储了32条消息,调用 send(...).await 将进入睡眠状态,直到有消息被接收者删除。

从多个任务中发送是通过克隆 sender 来完成的。比如说。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

两条信息都被发送到单一的 Receiver 句柄。mpsc 通道的Receiver 不能克隆。

当每个 sender 都超出了范围或被放弃时,就不再可能向通道发送更多的消息了。在这一点上,接收器上的 recv 调用将返回None,这意味着所有的发送者都消失了,通道被关闭。

在我们管理 Redis 连接的任务中,它知道一旦通道关闭,它就可以关闭 Redis 连接,因为该连接将不会再被使用。

生成管理器任务

接下来,生成处理来自通道的消息的任务。首先,建立到 Redis 客户端的连接。然后,通过 Redis 连接发出收到的命令。

use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Start receiving messages
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});

现在,更新这两个任务,通过通道发送命令,而不是直接在 Redis 连接上发布。

// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();

// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

在 main 函数的底部,我们 .await join 句柄,以确保在进程退出前完全完成命令。

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

接收响应

最后一步是接收来自管理器任务的响应。GET命令需要获得数值,SET命令需要知道操作是否成功完成。

为了传递响应,使用了一个 oneshot 通道。oneshot 通道是一个单一生产者、单一消费者的通道,为发送单一数值而优化。在我们的例子中,这个单一的值就是响应。

与mpsc类似,oneshot::channel() 返回一个 sender 和 receiver 句柄。

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

与 mpsc 不同,没有指定容量,因为容量总是1。此外,两个句柄都不能被克隆。

为了接收来自管理任务的响应,在发送命令之前,创建了一个 oneshot 通道。该通道的 Sender 部分被包含在给管理任务的命令中。receive 部分用来接收响应。

首先,更新 Command 以包括 Sender。为了方便起见,使用一个类型别名来引用 Sender。

use tokio::sync::oneshot;
use bytes::Bytes;

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Vec<u8>,
        resp: Responder<()>,
    },
}

/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

现在,更新发布命令的任务,包括 oneshot::Sender

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };

    // Send the GET request
    tx.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: b"bar".to_vec(),
        resp: resp_tx,
    };

    // Send the SET request
    tx2.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

最后,更新管理任务,通过 oneshot 的通道发送响应:

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // Ignore errors
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val.into()).await;
            // Ignore errors
            let _ = resp.send(res);
        }
    }
}

oneshot::Sender 上调用 send 会立即完成,不需要 .await。这是因为在 oneshot 通道上的发送将总是立即失败或成功,而不需要任何形式的等待。

在 oneshot 通道上发送一个值,当接收方的一半已经放弃时,返回 Err。这表明接收方对响应不再感兴趣了。在我们的方案中,接收方取消兴趣是一个可接受的事件。resp.send(...)返回的Err不需要被处理。

背压和有界通道

无论何时引入并发或队列,都必须确保队列是有界的,系统将优雅地处理负载。无界的队列最终会占用所有可用的内存,导致系统以不可预测的方式失败。

Tokio 小心避免隐性队列。这其中很大一部分是由于异步操作是 lazy 的。请考虑以下情况:

loop {
    async_op();
}

如果异步操作急切地运行,循环将重复排队运行一个新的 async_op,而不确保之前的操作完成。这就导致了隐性的无边界队列。基于回调的系统和基于急切的 future 的系统特别容易受此影响。

然而,在Tokio和异步Rust中,上述片段根本不会导致 async_op 的运行。这是因为 .await 从未被调用。如果该片段被更新为使用 .await,那么循环会等待操作完成后再重新开始。

loop {
    // Will not repeat until `async_op` completes
    async_op().await;
}

必须明确地引入并发和队列。做到这一点的方法包括:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

在这样做的时候,要注意确保并发的总量是有界限的。例如,当编写一个TCP接受循环时,要确保打开的套接字的总数是有限制的。当使用 mpsc::channel 时,选择一个可管理的通道容量。具体的约束值将是特定于应用的。

注意和挑选好的界限是编写可靠的 tokio 应用程序的一个重要部分。

2.7 - I/O

Tokio教程之I/O

https://tokio.rs/tokio/tutorial/io

Tokio中的 I/O 操作方式与 std 中大致相同,但是是异步的。有一个特质用于读取(AsyncRead)和一个特质用于写入(AsyncWrite)。特定的类型根据情况实现这些特性(TcpStream、File、Stdout)。AsyncRead 和 AsyncWrite 也由一些数据结构实现,如 Vec<u8>&[u8]。这允许在期望有读写器的地方使用字节数组。

本页将介绍 Tokio 的基本 I/O 读取和写入,并通过几个例子进行说明。下一页将介绍一个更高级的 I/O 例子。

AsyncRead 和 AsyncWrite

这两个特性提供了异步读取和写入字节流的设施。这些特质上的方法通常不被直接调用,类似于你不手动调用 Future 特质的 poll 方法。相反,你将通过AsyncReadExtAsyncWriteExt 提供的实用方法使用它们。

让我们简单地看看这些方法中的几个。所有这些函数都是异步的,必须与 .await 一起使用。

async fn read()

AsyncReadExt::read 提供了一个向缓冲区读取数据的异步方法,返回读取的字节数。

注意:当 read() 返回 Ok(0) 时,这表示流已经关闭。任何对 read() 的进一步调用将立即以 Ok(0) 完成。对于TcpStream实例,这意味着套接字的读取部分被关闭。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end 从流中读取所有字节直到EOF。

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // read the whole file
    f.read_to_end(&mut buffer).await?;
    Ok(())
}

async fn write()

AsyncWriteExt::write 将缓冲区写入写入器,返回写入的字节数。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // Writes some prefix of the byte string, but not necessarily all of it.
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

async fn write_all()

AsyncWriteExt::write_all 将整个缓冲区写入写入器。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut buffer = File::create("foo.txt").await?;

    buffer.write_all(b"some bytes").await?;
    Ok(())
}

这两个特性都包括许多其他有用的方法。请参阅API文档以获得一个全面的列表。

辅助函数

此外,就像std一样,tokio::io模块包含一些有用的实用函数,以及用于处理标准输入、标准输出和标准错误的API。例如,tokio::io::copy 异步地将一个 reader 的全部内容复制到一个 writer。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}

注意,这使用了字节数组也实现了 AsyncRead 这一事实。

Echo server

让我们练习做一些异步I/O。我们将编写一个 echo 服务器。

echo服务器绑定了一个 TcpListener 并在一个循环中接受入站连接。对于每个入站连接,数据被从套接字中读取并立即写回套接字。客户端向服务器发送数据,并接收完全相同的数据回来。

我们将使用稍微不同的策略实现两次 echo 服务器。

使用io::copy()

这是一个TCP服务器,需要一个接受循环。一个新的任务被生成以处理每个被接受的套接字。

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Copy data here
        });
    }
}

正如前面所看到的,这个实用程序需要一个 reader 和一个 writer,并将数据从一个复制到另一个。然而,我们只有一个单一的TcpStream。这个单一的值同时实现了 AsyncRead 和 AsyncWrite。因为 io::copy 对 reader 和 writer 两者都要求&mut,所以socket不能用于两个参数。

// This fails to compile
io::copy(&mut socket, &mut socket).await

拆分reader + writer

为了解决这个问题,我们必须把套接字分成一个reader句柄和一个writer句柄。拆分读写器组合的最佳方法取决于具体的类型。

任何读写器类型都可以使用 io::split 函数进行分割。这个函数接受一个单一的值,并返回单独的 reader 和 writer 句柄。这两个句柄可以独立使用,包括来自不同的任务。

例如,echo客户端可以这样处理并发的读和写。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Write data in the background
    let write_task = tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Sometimes, the rust type inferencer needs
        // a little help
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];

    loop {
        let n = rd.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        println!("GOT {:?}", &buf[..n]);
    }

    Ok(())
}

因为 io::split 支持任何实现 AsyncRead + AsyncWrite 并返回独立句柄的值,在内部 io::split 使用一个Arc和一个Mutex。这种开销可以通过TcpStream来避免。TcpStream提供了两个专门的分割函数。

TcpStream::split 接收流的 reference ,并返回 reader 和 writer 句柄。因为使用了reference,所以这两个句柄必须留在 split() 被调用的同一个任务上。这种专门的 split 是零成本的。不需要Arc或Mutex。TcpStream 还提供了 into_split,它支持可以跨任务移动的句柄,但只需要一个Arc。

因为 io::copy() 是在拥有TcpStream的同一个任务中调用的,我们可以使用 TcpStream::split。处理 echo 逻辑的任务变成了。

tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();
    
    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});

手动复制

现在让我们来看看我们如何通过手动复制数据来编写 echo 服务器。为了做到这一点,我们使用 AsyncReadExt::readAsyncWriteExt::write_all

完整的 echo 服务器如下:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => return,
                    Ok(n) => {
                        // Copy the data back to socket
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // Unexpected socket error. There isn't much we can
                            // do here so just stop processing.
                            return;
                        }
                    }
                    Err(_) => {
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
}

让我们把它分解一下。首先,由于使用了AsyncRead 和 AsyncWrite 工具,扩展特性必须被带入范围。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

分配缓冲区

该策略是将一些数据从套接字中读入一个缓冲区,然后将缓冲区的内容写回套接字。

let mut buf = vec![0; 1024];

堆栈缓冲区被明确地避免了。回顾前面,我们注意到,所有跨调用 .await 的任务数据都必须由任务存储。在这种情况下,buf 被用于跨越 .await 的调用。所有的任务数据被存储在一个单一的分配中。你可以把它看作是一个枚举,每个变量都是需要为特定的 .await 调用存储的数据。

如果缓冲区由堆栈数组表示,每个接受的套接字所产生的任务的内部结构可能看起来像。

struct Task {
    // internal task fields here
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}

如果使用堆栈数组作为缓冲区类型,它将被内联存储在任务结构中。这将使任务结构变得非常大。此外,缓冲区的大小通常是以页为单位的。这将反过来使任务成为一个尴尬的大小:$page-size + a-few-bytes。

编译器对异步块的布局的优化比基本枚举更进一步。在实践中,变量不会像枚举所要求的那样在变体之间移动。然而,任务结构的大小至少和最大的变量一样大。

正因为如此,为缓冲区使用专门的分配通常更有效率。

处理EOF

当TCP流的读取部分被关闭时,对 read() 的调用返回 Ok(0) 。在这一点上退出读循环是很重要的。忘记在EOF时退出读循环是一个常见的错误来源。

loop {
    match socket.read(&mut buf).await {
        // Return value of `Ok(0)` signifies that the remote has
        // closed
        Ok(0) => return,
        // ... other cases handled here
    }
}

忘记从读循环中断开,通常会导致100%的CPU无限循环情况。由于套接字被关闭,socket.read()立即返回。循环就会永远重复下去。

2.8 - 分帧

Tokio教程之分帧

https://tokio.rs/tokio/tutorial/framing

我们现在将应用我们刚刚学到的关于I/O的知识,实现 Mini-Redis 的分帧层。成帧是将一个字节流转换为一个分帧流的过程。一个帧是两个对等体之间传输的数据单位。Redis协议的帧定义如下。

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

请注意,该分帧只由数据组成,没有任何语义。命令的解析和执行发生在更高的层次。

对于HTTP,分帧可能看起来像:

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

为了实现 Mini-Redis 的分帧,我们将实现一个 Connection 结构,它包裹着一个TcpStream并读写 mini_redis::Frame 值。

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... other fields here
}

impl Connection {
    /// Read a frame from the connection.
    /// 
    /// Returns `None` if EOF is reached
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // implementation here
    }

    /// Write a frame to the connection.
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // implementation here
    }
}

带缓冲的读取

read_frame 方法在返回之前会等待一整帧的接收。对 TcpStream::read() 的一次调用可以返回一个任意数量的数据。它可能包含一整个帧,一个部分帧,或者多个帧。如果收到一个部分帧,数据被缓冲,并从套接字中读取更多数据。如果收到多个帧,则返回第一个帧,其余的数据被缓冲,直到下次调用 read_frame。

为了实现这一点,Connection需要一个读取缓冲区字段。数据从套接字中读入读取缓冲区。当一个帧被解析后,相应的数据就会从缓冲区中移除。

我们将使用BytesMut作为缓冲区类型。这是Bytes的一个可变版本。

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

以及Connection上的read_frame()函数:

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Ensure the buffer has capacity
        if self.buffer.len() == self.cursor {
            // Grow the buffer
            self.buffer.resize(self.cursor * 2, 0);
        }

        // Read into the buffer, tracking the number
        // of bytes read
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Update our cursor
            self.cursor += n;
        }
    }
}

在处理字节数组和 read 时,我们还必须维护一个游标,跟踪有多少数据被缓冲。我们必须确保将缓冲区的空部分传递给 read()。否则,我们会覆盖缓冲区的数据。如果我们的缓冲区被填满了,我们必须增加缓冲区,以便继续读取。在 parse_frame() 中(不包括),我们需要解析 self.buffer[..self.cursor] 所包含的数据。

因为将字节数组与游标配对是非常常见的,bytes crate提供了一个代表字节数组和游标的抽象概念。Buf 特性是由可以读取数据的类型实现的。BufMut 特性是由可以写入数据的类型实现的。当把一个 T: BufMut 传递给 read_buf() 时,缓冲区的内部游标会被 read_buf 自动更新。正因为如此,在我们版本的read_frame中,我们不需要管理我们自己的游标。

此外,当使用 Vec<u8> 时,缓冲区必须被初始化。 vec![0; 4096] 分配了一个4096字节的数组,并将0写入每个条目。当调整缓冲区的大小时,新的容量也必须用零来初始化。初始化过程是不没有开销的。当使用 BytesMutBufMut 时,容量是不初始化的。BytesMut 的抽象防止我们读取未初始化的内存。这让我们避免了初始化的步骤。

解析

现在,让我们看一下parse_frame()函数。解析工作分两步进行。

  1. 确保一个完整的帧被缓冲,并找到该帧的结束索引。
  2. 解析该帧。

mini-redis crate 为我们提供了一个用于这两个步骤的函数:

  1. Frame::check
  2. 2Frame::parse

我们还将重用 Buf 的抽象来帮助。 Buf 被传入 Frame::check。当检查函数遍历传入的缓冲区时,内部游标将被推进。当check返回时,缓冲区的内部游标指向帧的末端。

对于Buf类型,我们将使用 std::io::Cursor<&[u8]>

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Create the `T: Buf` type.
    let mut buf = Cursor::new(&self.buffer[..]);

    // Check whether a full frame is available
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Get the byte length of the frame
            let len = buf.position() as usize;

            // Reset the internal cursor for the
            // call to `parse`.
            buf.set_position(0);

            // Parse the frame
            let frame = Frame::parse(&mut buf)?;

            // Discard the frame from the buffer
            self.buffer.advance(len);

            // Return the frame to the caller.
            Ok(Some(frame))
        }
        // Not enough data has been buffered
        Err(Incomplete) => Ok(None),
        // An error was encountered
        Err(e) => Err(e.into()),
    }
}

完整的 Frame::check 函数可以在这里找到。我们将不涉及它的全部内容。

需要注意的是,Buf的 “字节迭代器” 风格的API被使用。这些获取数据并推进内部游标。例如,为了解析一个帧,第一个字节被检查以确定该帧的类型。使用的函数是 Buf::get_u8。这个函数获取当前光标位置的字节并使光标前进一个。

在Buf特性上还有更多有用的方法。查看API文档以了解更多细节。

带缓冲的写入

分帧API的另一半是 write_frame(frame) 函数。这个函数将整个帧写到套接字中。为了尽量减少 write 系统调用,写将被缓冲。一个写缓冲区被维护,帧在被写入套接字之前被编码到这个缓冲区。然而,与 read_frame() 不同的是,在写入套接字之前,整个帧并不总是被缓冲到一个字节数组中。

考虑一个批量流帧。被写入的值是 Frame::Bulk(Bytes)。散装帧的线格式是一个帧头,它由$字符和以字节为单位的数据长度组成。帧的大部分是Bytes值的内容。如果数据很大,把它复制到一个中间缓冲区将是很昂贵的。

为了实现缓冲写入,我们将使用 BufWriter 结构。这个结构被初始化为一个 T: AsyncWrite 并实现 AsyncWrite 本身。当在 BufWriter 上调用写时,写不会直接进入内部写器,而是进入一个缓冲区。当缓冲区满的时候,内容会被刷到内部写入器上,内部缓冲区被清空。还有一些优化措施,允许在某些情况下绕过缓冲区。

作为教程的一部分,我们将不尝试完整地实现 write_frame()。请看这里的完整实现。

首先,Connection结构被更新。

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

接下来,write_frame()被实现:

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

这里使用的函数是由 AsyncWriteExt 提供的。 它们在 TcpStream 上也是可用的,但是在没有中间缓冲区的情况下发出单字节的写入是不可取的。

  • write_u8 向写程序写一个字节。
  • write_all 将整个片断写入写入器。
  • write_decimal 是由 mini-redis 实现的。

该函数以调用 self.stream.flush().await 结束。因为 BufWriter 将写入的数据存储在一个中间缓冲区中,对写入的调用并不能保证数据被写入套接字。在返回之前,我们希望帧被写到套接字中。对 flush() 的调用将缓冲区中的任何数据写到套接字。

另一个选择是不在 write_frame() 中调用 flush()。相反,在 Connection 上提供一个 flush() 函数。这将允许调用者在写缓冲区中写入多个小帧的队列,然后用一个写系统调用将它们全部写入套接字。这样做会使 Connection API 复杂化。简化是 Mini-Redis 的目标之一,所以我们决定将 flush().await 调用包含在 fn write_frame() 中。

2.9 - 深入异步

Tokio教程之深入异步

https://tokio.rs/tokio/tutorial/async

在这一点上,我们已经完成了对异步 Rust 和 Tokio 的相当全面的考察。现在我们将深入挖掘Rust的异步运行时模型。在教程的一开始,我们就暗示过,异步Rust采取了一种独特的方法。现在,我们解释一下这意味着什么。

Futures

作为快速回顾,让我们采取一个非常基本的异步函数。与本教程到目前为止所涉及的内容相比,这并不新鲜。

use tokio::net::TcpStream;

async fn my_async_fn() {
    println!("hello from async");
    let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
    println!("async TCP operation complete");
}

我们调用这个函数,它返回一些值。我们在这个值上调用.await。

#[tokio::main]
async fn main() {
    let what_is_this = my_async_fn();
    // Nothing has been printed yet.
    what_is_this.await;
    // Text has been printed and socket has been
    // established and closed.
}

my_async_fn() 返回的值是一个future。future是一个实现了标准库所提供的 std::future::Future 特性的值。它们是包含正在进行的异步计算的值。

std::future::Future trait的定义是:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}

相关类型 Output 是 future 完成后产生的类型。Pin 类型是Rust能够支持异步函数中的借用的方式。更多细节请参见标准库文档。

与其他语言实现 future 的方式不同,Rust future 并不代表在后台发生的计算,相反,Rust future就是计算本身。Future的所有者负责通过轮询Future来推进计算。这可以通过调用 Future::poll 来实现。

实现future

让我们来实现一个非常简单的future。这个future将:

  • 等待到一个特定的时间点。
  • 输出一些文本到STDOUT。
  • 产生一个字符串。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

作为Future的Async fn

在main函数中,我们实例化了future并对其调用 .await。从异步函数中,我们可以对任何实现Future的值调用 .await。反过来,调用一个异步函数会返回一个实现Future的匿名类型。在 async fn main() 的例子中,生成的future大致是这样的。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

Rust futures是一种状态机。在这里,MainFuture 被表示为一个 future 的可能状态的枚举。future 在 State0 状态下开始。当 poll 被调用时,future 试图尽可能地推进其内部状态。如果 future 能够完成,Poll::Ready 将被返回,其中包含异步计算的输出。

如果future不能完成,通常是由于它所等待的资源没有准备好,那么就会返回 Poll::Pending。收到 Poll::Pending 是向调用者表明,future 将在稍后的时间完成,调用者应该在稍后再次调用poll。

我们还看到,future 是由其他 future 组成的。在外层 future 上调用 poll 的结果是调用内部 future 的 poll 函数。

executors

异步的Rust函数返回future。future必须被调用 poll 以推进其状态。future是由其他 future 组成的。那么,问题来了,是什么在最外层的 future 上调用poll?

回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn 或者是被 #[tokio::main] 注释的主函数。这样做的结果是将生成的外层 future 提交给 Tokio执行器。执行器负责在外部 future 上调用 Future::poll,推动异步计算的完成。

mini Tokio

为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的最小版本的Tokio! 完整的代码可以在这里找到。

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
    let mut mini_tokio = MiniTokio::new();

    mini_tokio.spawn(async {
        let when = Instant::now() + Duration::from_millis(10);
        let future = Delay { when };

        let out = future.await;
        assert_eq!(out, "done");
    });

    mini_tokio.run();
}

struct MiniTokio {
    tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
    fn new() -> MiniTokio {
        MiniTokio {
            tasks: VecDeque::new(),
        }
    }
    
    /// Spawn a future onto the mini-tokio instance.
    fn spawn<F>(&mut self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.tasks.push_back(Box::pin(future));
    }
    
    fn run(&mut self) {
        let waker = task::noop_waker();
        let mut cx = Context::from_waker(&waker);
        
        while let Some(mut task) = self.tasks.pop_front() {
            if task.as_mut().poll(&mut cx).is_pending() {
                self.tasks.push_back(task);
            }
        }
    }
}

这将运行异步块。一个具有所要求的延迟的 Delay 实例被创建并被等待。然而,到目前为止,我们的实现有一个重大缺陷。我们的执行器从未进入睡眠状态。执行器不断地循环所有被催生的 future,并对它们进行 poll 。大多数时候,这些 future 还没有准备好执行更多的工作,并会再次返回 Poll::Pending。这个过程会消耗CPU,一般来说效率不高。

理想情况下,我们希望 mini-tokio 只在 future 能够取得进展时 poll future。这发生在任务被阻塞的资源准备好执行请求的操作时。如果任务想从一个TCP套接字中读取数据,那么我们只想在TCP套接字收到数据时 poll 任务。在我们的例子中,任务在达到给定的瞬间被阻断。理想情况下,mini-tokio只会在那个瞬间过去后 poll 任务。

为了实现这一点,当一个资源被 poll 而该资源又还没有准备好时,一旦它过渡到 ready 的状态,该资源将发送一个通知。

Wakers

Waker 是缺失的那部分。这是一个系统,通过这个系统,资源能够通知等待的任务,资源已经准备好继续某些操作。

让我们再看一下Future::poll的定义:

fn poll(self: Pin<&mut Self>, cx: &mut Context)
    -> Poll<Self::Output>;

Poll 的 Context 参数有一个 waker() 方法。该方法返回一个与当前任务绑定的Waker。该Waker有一个wake()方法。调用该方法向执行器发出信号,相关任务应该被安排执行。当资源过渡到准备好的状态时调用wake(),通知执行者,poll 任务将能够取得进展。

更新 Delay

我们可以更新 Delay 来使用 wakers。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Get a handle to the waker for the current task
            let waker = cx.waker().clone();
            let when = self.when;

            // Spawn a timer thread.
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                waker.wake();
            });

            Poll::Pending
        }
    }
}

现在,一旦请求的持续时间过了,调用的任务就会被通知,执行者可以确保任务被再次安排。下一步是更新mini-tokio以监听唤醒通知。

我们的 Delay 实现还有一些剩余的问题。我们将在以后修复它们。

当一个 future 返回 Poll::Pending 时,它必须确保在某个时间点对 waker 发出信号。忘记这样做会导致任务无限期地挂起。

在返回 Poll::Pending 后忘记唤醒一个任务是一个常见的错误来源。

回顾一下 “Delay"的第一次迭代。这里是 future 的实现。

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

在返回 Poll::Pending 之前,我们调用 cx.waker().wake_by_ref()。这是为了满足 future 契约。通过返回 Poll::Pending,我们负责给唤醒者发信号。因为我们还没有实现定时器线程,所以我们在内联中给唤醒者发信号。这样做的结果是,future 将立即被重新安排,再次执行,而且可能还没有准备好完成。

请注意,允许对 waker 发出超过必要次数的信号。在这个特殊的例子中,即使我们根本没有准备好继续操作,我们还是向唤醒者发出信号。除了浪费一些CPU周期外,这样做并没有什么问题。然而,这种特殊的实现方式会导致一个繁忙的循环。

更新Mini Tokio

下一步是更新 Mini Tokio 以接收 waker 的通知。我们希望执行器只在被唤醒时运行任务,为了做到这一点,Mini Tokio将提供它自己的唤醒器。当唤醒者被调用时,其相关的任务将被排队执行。Mini Tokio在 poll future 时将这个 waker 传递给 future。

更新后的 Mini Tokio 将使用一个通道来存储预定任务。通道允许任务从任何线程被排队执行。Wakers 必须是 Send 和 Sync,所以我们使用来自crossbeam crate的通道,因为标准库的通道不是Sync。

Send和Sync特性是Rust提供的与并发性有关的标记特性。可以被发送到不同线程的类型是Send。大多数类型都是Send,但像Rc这样的类型则不是。可以通过不可变的引用并发访问的类型是Sync。一个类型可以是Send,但不是Sync–一个很好的例子是Cell,它可以通过不可变的引用被修改,因此并发访问是不安全的。

更多细节请参见Rust书中的相关章节。

然后,更新MiniTokio的结构。

use crossbeam::channel;
use std::sync::Arc;

struct MiniTokio {
    scheduled: channel::Receiver<Arc<Task>>,
    sender: channel::Sender<Arc<Task>>,
}

struct Task {
    // This will be filled in soon.
}

Wakers 是 sync,并且可以被克隆。当 wake 被调用时,任务必须被安排执行。为了实现这一点,我们有一个通道。当 wake() 被调用时,任务被推到通道的发送部分。我们的 task 结构将实现唤醒逻辑。要做到这一点,它需要同时包含催生的future 和通道的发送部分。

use std::sync::{Arc, Mutex};

struct Task {
    // The `Mutex` is to make `Task` implement `Sync`. Only
    // one thread accesses `future` at any given time. The
    // `Mutex` is not required for correctness. Real Tokio
    // does not use a mutex here, but real Tokio has
    // more lines of code than can fit in a single tutorial
    // page.
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
    executor: channel::Sender<Arc<Task>>,
}

impl Task {
    fn schedule(self: &Arc<Self>) {
        self.executor.send(self.clone());
    }
}

为了安排任务,Arc被克隆并通过通道发送。现在,我们需要将我们的 schedule 函数与 std::task::Waker 挂钩。标准库提供了一个低级别的API,通过手动构建vtable来完成这个任务。这种策略为实现者提供了最大的灵活性,但需要一堆不安全的模板代码。我们不直接使用 RawWakerVTable,而是使用由futures crate提供的ArcWake工具。这使得我们可以实现一个简单的特质,将我们的任务结构暴露为一个waker。

在你的Cargo.toml中添加以下依赖,以拉入future。

futures = "0.3"

然后实现 futures::task::ArcWake

use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.schedule();
    }
}

当上面的定时器线程调用waker.wake()时,任务被推送到通道中。接下来,我们在MiniTokio::run()函数中实现接收和执行任务。

impl MiniTokio {
    fn run(&self) {
        while let Ok(task) = self.scheduled.recv() {
            task.poll();
        }
    }

    /// Initialize a new mini-tokio instance.
    fn new() -> MiniTokio {
        let (sender, scheduled) = channel::unbounded();

        MiniTokio { scheduled, sender }
    }

    /// Spawn a future onto the mini-tokio instance.
    ///
    /// The given future is wrapped with the `Task` harness and pushed into the
    /// `scheduled` queue. The future will be executed when `run` is called.
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        Task::spawn(future, &self.sender);
    }
}

impl Task {
    fn poll(self: Arc<Self>) {
        // Create a waker from the `Task` instance. This
        // uses the `ArcWake` impl from above.
        let waker = task::waker(self.clone());
        let mut cx = Context::from_waker(&waker);

        // No other thread ever tries to lock the future
        let mut future = self.future.try_lock().unwrap();

        // Poll the future
        let _ = future.as_mut().poll(&mut cx);
    }

    // Spawns a new taks with the given future.
    //
    // Initializes a new Task harness containing the given future and pushes it
    // onto `sender`. The receiver half of the channel will get the task and
    // execute it.
    fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
            executor: sender.clone(),
        });

        let _ = sender.send(task);
    }

}

这里发生了多件事情。首先,MiniTokio::run()被实现。该函数在一个循环中运行,接收来自通道的预定任务。由于任务在被唤醒时被推入通道,这些任务在执行时能够取得进展。

此外,MiniTokio::new()MiniTokio::spwn() 函数被调整为使用通道而不是 VecDeque。当新的任务被催生时,它们会被赋予一个通道的发送者部分的克隆,任务可以用它来在运行时安排自己。

Task::poll() 函数使用来自 futures crate 的 ArcWake 工具创建waker。waker被用来创建一个 task::Context。该 task::Context 被传递给 poll。

摘要

我们现在已经看到了一个端到端的例子,说明异步Rust是如何工作的。Rust的 async/await 功能是由traits支持的。这允许第三方crate,如Tokio,提供执行细节。

  • Rust的异步操作是 lazy 的,需要调用者来 poll 它们。
  • Wakers被传递给futures,以将一个future与调用它的任务联系起来。
  • 当一个资源没有准备好完成一个操作时,Poll::Pending 被返回,任务的waker被记录。
  • 当资源准备好时,任务的 waker 会被通知。
  • 执行者收到通知并安排任务的执行。
  • 任务再次被 poll ,这次资源已经准备好了,任务取得了进展。

某些未尽事宜

记得我们在实现 Delay future 的时候,说过还有一些事情要解决。Rust的异步模型允许单个future在执行时跨任务迁移。考虑一下下面的情况。

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let mut delay = Some(Delay { when });

    poll_fn(move |cx| {
        let mut delay = delay.take().unwrap();
        let res = Pin::new(&mut delay).poll(cx);
        assert!(res.is_pending());
        tokio::spawn(async move {
            delay.await;
        });

        Poll::Ready(())
    }).await;
}

poll_fn 函数使用闭包创建Future实例。上面的片段创建了一个Delay实例,对其进行了一次轮询,然后将Delay实例发送到一个新的任务中等待。在这个例子中,Delay::poll在不同的Waker实例中被调用了不止一次。当这种情况发生时,你必须确保在最近一次调用 poll 时所传递的 Waker上调用 wake。

当实现 future 时,关键是要假设每一次对poll的调用都可能提供一个不同的Waker实例。poll 函数必须用新的唤醒者来更新任何先前记录的唤醒者。

我们早期实现的Delay在每次 poll 时都会产生一个新的线程。这很好,但是如果 poll 太频繁的话,效率就会很低(例如,如果你 select! 这个future和其他的future,只要其中一个有事件,这两个都会被poll)。一种方法是记住你是否已经产生了一个线程,如果你还没有产生一个线程,就只产生一个新的线程。然而,如果你这样做,你必须确保线程的Waker在以后调用 poll 时被更新,否则你就不能唤醒最近的Waker。

为了修复我们之前的实现,我们可以这样做。

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
    // This Some when we have spawned a thread, and None otherwise.
    waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // First, if this is the first time the future is called, spawn the
        // timer thread. If the timer thread is already running, ensure the
        // stored `Waker` matches the current task's waker.
        if let Some(waker) = &self.waker {
            let mut waker = waker.lock().unwrap();

            // Check if the stored waker matches the current task's waker.
            // This is necessary as the `Delay` future instance may move to
            // a different task between calls to `poll`. If this happens, the
            // waker contained by the given `Context` will differ and we
            // must update our stored waker to reflect this change.
            if !waker.will_wake(cx.waker()) {
                *waker = cx.waker().clone();
            }
        } else {
            let when = self.when;
            let waker = Arc::new(Mutex::new(cx.waker().clone()));
            self.waker = Some(waker.clone());

            // This is the first time `poll` is called, spawn the timer thread.
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                // The duration has elapsed. Notify the caller by invoking
                // the waker.
                let waker = waker.lock().unwrap();
                waker.wake_by_ref();
            });
        }

        // Once the waker is stored and the timer thread is started, it is
        // time to check if the delay has completed. This is done by
        // checking the current instant. If the duration has elapsed, then
        // the future has completed and `Poll::Ready` is returned.
        if Instant::now() >= self.when {
            Poll::Ready(())
        } else {
            // The duration has not elapsed, the future has not completed so
            // return `Poll::Pending`.
            //
            // The `Future` trait contract requires that when `Pending` is
            // returned, the future ensures that the given waker is signalled
            // once the future should be polled again. In our case, by
            // returning `Pending` here, we are promising that we will
            // invoke the given waker included in the `Context` argument
            // once the requested duration has elapsed. We ensure this by
            // spawning the timer thread above.
            //
            // If we forget to invoke the waker, the task will hang
            // indefinitely.
            Poll::Pending
        }
    }
}

这有点复杂,但想法是,在每次调用 poll 时,future 会检查所提供的 waker 是否与之前记录的 waker 相匹配。如果两个 waker 匹配,那么就没有其他事情要做。如果不匹配,则必须更新记录的 waker。

Notify 工具

我们演示了如何使用wakers手工实现一个 Delay future。Wakers是异步Rust工作方式的基础。通常情况下,没有必要深入到这个水平。例如,在Delay的情况下,我们可以通过使用 tokio::sync::Notify 工具,完全用 async/await 实现它。这个工具提供了一个基本的任务通知机制。它处理了waker的细节,包括确保记录的waker与当前任务相匹配。

使用Notify,我们可以像这样用 async/await 实现一个 Delay 函数。

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify2.notify_one();
    });


    notify.notified().await;
}

2.10 - select

Tokio教程之select

https://tokio.rs/tokio/tutorial/select

到目前为止,当我们想给系统添加并发性时,我们会生成一个新的任务。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。

tokio::select!

tokio::select! 宏允许在多个异步计算中等待,并在单个计算完成后返回。

比如说:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

使用了两个 oneshot 通道。任何一个通道都可以先完成。select! 语句在两个通道上等待,并将 val 与任务返回的值绑定。当 tx1 或 tx2 完成时,相关的块被执行。

没有完成的分支被放弃。在这个例子中,计算正在等待每个通道的 oneshot::Receiver。尚未完成的通道的 oneshot::Receiver 被放弃。

取消

在异步Rust中,取消操作是通过丢弃一个 future 来实现的。回顾 “Async in depth”,异步Rust操作是使用 futures 实现的,而 futures 是 lazy 的。只有当期货被 poll 时,操作才会继续进行。如果future被丢弃,操作就不能进行,因为所有相关的状态都被丢弃了。

也就是说,有时一个异步操作会催生后台任务或启动其他在后台运行的操作。例如,在上面的例子中,一个任务被催生出来,以发送一个消息回来。通常情况下,该任务会进行一些计算来生成数值。

Futures或其他类型可以实现 Drop 来清理后台资源。Tokio 的 oneshot::Receiver 通过向 Sender half 发送一个关闭的通知来实现 Drop。sender 部分可以收到这个通知,并通过丢弃来中止正在进行的操作。

use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Compute value here
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Select on the operation and the oneshot's
        // `closed()` notification.
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                // `some_operation()` is canceled, the
                // task completes and `tx1` is dropped.
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Future 实现

为了帮助更好地理解 select! 的工作原理,让我们看看一个假想的Future实现是什么样子的。这是一个简化版本。在实践中,select! 包括额外的功能,如随机选择要先 poll 的分支。

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }

        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    // use tx1 and tx2

    MySelect {
        rx1,
        rx2,
    }.await;
}

MySelect future 包含每个分支的future。当MySelect被 poll 时,第一个分支被 poll。如果它准备好了,该值被使用,MySelect完成。在 .await 收到一个future的输出后,该future被放弃。这导致两个分支的futures都被丢弃。由于有一个分支没有完成,所以该操作实际上被取消了。

请记住上一节的内容:

当一个 future 返回 Poll::Pending 时,它必须确保在未来的某个时间点上对 waker 发出信号。如果忘记这样做,任务就会被无限期地挂起。

MySelect 的实现中,没有明确使用 Context 参数。相应的是,waker的要求是通过传递 cx 给内部 future 来满足的。由于内部 future 也必须满足waker的要求,通过只在收到内部 future 的 Poll::Pending 时返回 Poll::PendingMySelect 也满足 waker 的要求。

语法

选择 select! 宏可以处理两个以上的分支。目前的限制是64个分支。每个分支的结构为:

<pattern> = <async expression> => <handler>,

当 select 宏被评估时,所有的 <async expression> 被聚集起来并同时执行。当一个表达式完成时,其结果与 <pattern> 匹配。如果结果与模式匹配,那么所有剩余的异步表达式被放弃,<handler> 被执行。<handler> 表达式可以访问由 <pattern> 建立的任何绑定关系。

基本情况是 <pattern> 是一个变量名,异步表达式的结果被绑定到该变量名,<handler> 可以访问该变量。这就是为什么在最初的例子中,val 被用于<pattern>,而 <handler> 能够访问 val

如果 <pattern> 与异步计算的结果不匹配,那么剩下的异步表达式继续并发地执行,直到下一个表达式完成。这时,同样的逻辑被应用于该结果。

因为 select! 可以接受任何异步表达式,所以可以定义更复杂的计算来进行选择。

在这里,我们在一个 oneshot 通道和一个TCP连接的输出上进行选择。

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Spawn a task that sends a message over the oneshot
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

在这里,我们选择了一个 onehot 并接受来自 TcpListener 的套接字。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {}
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

accept 循环一直运行到遇到错误或 rx 收到一个值。_模式表示我们对异步计算的返回值不感兴趣。

返回值

tokio::select! 宏返回被评估的 <handler> 表达式的结果。

async fn computation1() -> String {
    // .. computation
}

async fn computation2() -> String {
    // .. computation
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}

正因为如此,要求每个分支的 <handler> 表达式求值为同一类型。如果不需要 select! 表达式的输出,让表达式求值为()是很好的做法。

错误

使用?操作符会从表达式中传播错误。这取决于是在异步表达式中还是在处理程序中使用? 在一个异步表达式中使用?操作符会将错误从异步表达式中传播出去。这使得异步表达式的输出成为一个结果。在 handler 中使用?会立即将错误从 select!表达式中传播出去。让我们再看一下接受循环的例子。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [setup `rx` oneshot channel]

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

注意 listener.accept().await? 操作符将错误从该表达式中传播出来,并传播到 res 绑定中。在发生错误时, res 将被设置为 Err(_)。然后,在处理程序中,再次使用?操作符。res? 语句将把一个错误从主函数中传播出去。

模式匹配

回顾一下,select! 宏分支语法被定义为:

<pattern> = <async expression> => <handler>,

到目前为止,我们只使用了 <pattern> 的变量绑定。然而,任何Rust模式都可以被使用。例如,假设我们从多个MPSC通道接收信息,我们可以这样做。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Do something w/ `tx1` and `tx2`
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

借用

当催生任务时,被催生的异步表达式必须拥有其所有的数据。select! 宏没有这个限制。每个分支的异步表达式都可以借用数据并同时操作。按照Rust的借用规则,多个异步表达式可以不变地借用一个数据,或者一个异步表达式可以可变地借用一个数据。

我们来看看一些例子。在这里,我们同时向两个不同的TCP目的地发送相同的数据。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        else => {}
    };

    Ok(())
}

data 变量被从两个异步表达式中不可变地借用。当其中一个操作成功完成时,另一个就会被放弃。因为我们在 Ok(_) 上进行模式匹配,如果一个表达式失败,另一个表达式继续执行。

当涉及到每个分支的 <handler> 时,select! 保证只运行一个 <handler>。正因为如此,每个<handler>都可以相互借用相同的数据。

例如,这在两个处理程序中都修改了out:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

循环

select! 宏经常在循环中使用。本节将通过一些例子来说明在循环中使用 select! 宏的常见方法。我们首先在多个通道上进行选择。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {}", msg);
    }

    println!("All channels have been closed.");
}

这个例子在三个通道的接收器上进行 select。当在任何通道上收到消息时,它被写入STDOUT。当一个通道被关闭时,recv() 以None返回。通过使用模式匹配,select! 宏继续在其余通道上等待。当所有的通道都关闭时,else分支被评估,循环被终止。

select! 宏随机挑选分支,首先检查是否准备就绪。当多个通道有等待值时,将随机挑选一个通道来接收。这是为了处理这样的情况:接收循环处理消息的速度比推入通道的速度慢,也就是说,通道开始被填满。如果 select! 不随机挑选一个分支先检查,在循环的每个迭代中,rx1将被首先检查。如果rx1总是包含一个新的消息,其余的通道将永远不会被检查。

如果当select!被评估时,多个通道有待处理的消息,只有一个通道有一个值被弹出。所有其他的通道保持不动,它们的消息保持在这些通道中,直到下一个循环迭代。没有消息丢失。

恢复异步操作

现在我们将展示如何在多次调用 select! 时运行一个异步操作。在这个例子中,我们有一个MPSC通道,类型为i32,还有一个异步函数。我们想运行异步函数,直到它完成或在通道上收到一个偶数整数。

async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    
    
    let operation = action();
    tokio::pin!(operation);
    
    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

请注意,不是在 select! 宏中调用 action() ,而是在循环之外调用它。action() 的返回被分配给 operation,而不调用 .await。然后我们在 operation 上调用 tokio::pin!

select! 循环中,我们没有传入 operation,而是传入 &mut operationoperation 变量正在跟踪飞行中的异步操作。循环的每个迭代都使用相同的 operation,而不是对 action() 发出一个新的调用。

另一个 select! 分支从通道中接收消息。如果该消息是偶数,我们就完成了循环。否则,再次启动 select!

这是我们第一次使用 tokio::pin! 我们现在还不打算讨论 pining 的细节。需要注意的是,为了 .await 一个引用,被引用的值必须被 pin 或者实现 Unpin

如果我们删除 tokio::pin! 这一行,并尝试编译,我们会得到以下错误:

error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

虽然我们在上一章中介绍了 Future,但这个错误仍然不是很清楚。如果你在试图对一个引用调用 .await 时遇到这样一个关于 Future 没有被实现的错误,那么这个Future可能需要被 pin

阅读更多关于标准库中的Pin。

修改分支

让我们来看看一个稍微复杂的循环。我们有:

  1. 一个 i32 值的通道。
  2. 一个在 i32 值上执行的异步操作。

我们要实现的逻辑是:

  1. 在通道上等待一个偶数。
  2. 使用偶数作为输入启动异步操作。
  3. 等待操作,但同时在通道上监听更多的偶数。
  4. 如果在现有的操作完成之前收到一个新的偶数,则中止现有的操作,用新的偶数重新开始操作。
async fn action(input: Option<i32>) -> Option<String> {
    // If the input is `None`, return `None`.
    // This could also be written as `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // async logic here
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
    
    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);
    
    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });
    
    loop {
        tokio::select! {
            res = &mut operation, if !done => {
                done = true;

                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` is a method on `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

我们使用的策略与前面的例子类似。async fn 在循环之外被调用,并被分配给 operation。operation 变量被 pin 住。循环在 operation 和通道接收器上都进行select。

注意 action 是如何将 Option<i32> 作为参数的。在我们接收第一个偶数之前,我们需要将 operation 实例化为某种东西。我们让 action 接受 Option 并返回Option。如果传入的是 None,就会返回 None。在第一个循环迭代中,operation 立即以 None 完成。

这个例子使用了一些新的语法。第一个分支包括 , if !done。这是一个分支的前提条件。在解释它是如何工作的之前,让我们看一下如果省略了这个前提条件会发生什么。省略 , if !done 并运行这个例子的结果是如下输出。

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

这个错误发生在试图使用已经完成的 operation 时。通常情况下,当使用 .await 时,被等待的值会被消耗。在这个例子中,我们对一个引用进行 await。这意味着 operation 在完成后仍然存在。

为了避免这种 panic,我们必须注意在 operation 完成后禁用第一个分支。done 变量用于跟踪 operation 是否完成。一个 select! 分支可能包括一个 precondition。这个前提条件在 select! 分支等待之前被检查。如果该条件被评估为false,那么该分支将被禁用。done变量被初始化为false。当 operation 完成后,done被设置为true。下一个循环迭代将禁用该操作分支。当从通道收到一个偶数信息时,operation 被重置,done被设置为false。

每任务的并发性

tokio::spoonselect! 都可以运行并发的异步操作。然而,用于运行并发操作的策略是不同的。tokio::spoon 函数接收一个异步操作并生成一个新的任务来运行它。任务是 Tokio 运行时安排的对象。两个不同的任务是由 Tokio 独立调度的。它们可能同时运行在不同的操作系统线程上。正因为如此,一个催生的任务和一个催生的线程有同样的限制:不能借用。

select! 宏在同一个任务上同时运行所有分支。因为 select! 宏的所有分支都在同一个任务上执行,所以它们永远不会同时运行。select! 宏在一个任务上复用异步操作。

2.11 - stream

Tokio教程之stream

https://tokio.rs/tokio/tutorial/streams

流是一个数值的异步系列。它是 Rust 的 std::iter::Iterator 的异步等价物,由 Stream 特性表示。流可以在 async 函数中被迭代。它们也可以使用适配器进行转换。Tokio在 StreamExt trait上提供了许多常见的适配器。

Tokio在一个单独的 tokio-stream crate 中提供流支持:

tokio-stream = "0.1"

目前,Tokio 的 Stream工具存在于 tokio-stream crate中。一旦 Stream 特性在 Rust 标准库中稳定下来,Tokio 的 Stream 工具将被移到 tokio crate 中。

迭代/Iteration

目前,Rust编程语言不支持异步 for 循环。相反,流的迭代是通过与 StreamExt::next() 搭配的 while let 循环完成的。

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

像迭代器一样,next() 方法返回 Option<T>,其中T是流的值类型。接收到 None 表示流的迭代已经结束。

Mini-Redis广播

让我们来看看使用 Mini-Redis 客户端的一个稍微复杂的例子。

完整的代码可以在这里找到。

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Publish some data
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

任务被派生出来,在 “number” 频道上向 Mini-Redis 服务器发布消息。然后,在主任务中,我们订阅 “number” 频道并显示收到的消息。

在订阅之后,into_stream() 被调用到返回的订阅者上。这将消耗 Subscriber ,返回一个 stream,在消息到达时产生消息。在我们开始迭代消息之前,请注意流是用 tokio::pin pin 在栈上的。在一个流上调用 next() 需要流被 pin 住。into_stream() 函数返回的是一个没有 pin 的流,我们必须明确地 pin 它,以便对其进行遍历。

当一个 Rust 值在内存中不能再被移动时,它就被 “pin"了。被 pin 的值的一个关键属性是,指针可以被带到被 pin 的数据上,并且调用者可以确信该指针保持有效。这个特性被 async/await 用来支持跨 .await 点借用数据。

如果我们忘记 pin 住流,就会出现这样的错误:

error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  --> streams/src/main.rs:29:36
   |
29 |     while let Some(msg) = messages.next().await {
   |                                    ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
   |
   = note: required because it appears within the type `impl Future`
   = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
   = note: required because it appears within the type `impl Stream`
   = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

如果你遇到这样的错误信息,请尝试 pin 住该值!

在尝试运行这个之前,启动Mini-Redis服务器:

$ mini-redis-server

然后尝试运行该代码。我们将看到输出到STDOUT的信息:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

一些早期的信息可能会被丢弃,因为订阅和发布之间存在着竞争。该程序永远不会退出。只要服务器处于活动状态,对 Mini-Redis 频道的订阅就会保持活动状态。

让我们看看我们如何用流来扩展这个程序。

适配器

接受一个 stream 并返回另一个 stream 的函数通常被称为 “stream adapter”,因为它们是 “适配器模式"的一种形式。常见的流适配器包括 maptakefilter

让我们更新Mini-Redis,使其退出。在收到三条消息后,停止迭代消息。这是用 take 完成的。这个适配器将流限制为最多产生n条消息。

let messages = subscriber
    .into_stream()
    .take(3);

再次运行该程序,我们得到:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

这一次,程序结束了。

现在,让我们把信息流限制在单个字节。我们将通过检查消息的长度来检查这一点。我们使用 filter 适配器来放弃任何不符合前提条件的消息。

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

现在,输出是:

got = b"1"
got = b"3"
got = b"6"

另一个选择是使用 filter_map 将 filter 和 map 步骤合并为一个单一的调用。

还有更多可用的适配器。请看这里的列表。

实现stream

stream 特征与 future 特征非常相似:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Stream::poll_next() 函数很像 Future::poll,只是它可以被反复调用,以便从流中接收许多值。就像我们在 Async in depth 中看到的那样,当一个流还没有准备好返回一个值时,就会返回 Poll::Pending 来代替。该任务的waker被注册。一旦流应该被再次poll,该唤醒者将被通知。

size_hint() 方法与迭代器的使用方法相同。

通常,当手动实现一个 stream 时,是通过组合 future 和其他流来完成的。作为一个例子,让我们以我们在 Async 中深入实现的 Delay future 为基础。我们将把它转换为一个stream,以10毫秒的间隔产生三次 ()

use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // No more delays
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream

使用 Stream trait 手动实现流是很繁琐的。不幸的是,Rust编程语言还不支持用于定义流的 async/await 语法。这一点正在酝酿之中,但还没有准备好。

Async-stream crate可以作为一个临时的解决方案。这个 crate 提供了一个 async_stream! 宏,将输入转化为一个流。使用这个create,上面的 interval 可以这样实现:

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}

2.12 - 桥接同步代码

Tokio教程之桥接同步代码

https://tokio.rs/tokio/tutorial/bridging

在我们到目前为止看到的例子中,我们用 #[tokio::main] 标记了主函数,并使整个项目成为异步的。然而,这对所有项目来说都是不可取的。例如,一个GUI应用程序可能希望在主线程上运行GUI代码,并在另一个线程上运行Tokio运行时。

本页解释了如何将异步/等待隔离到项目的一小部分。

#[tokio::main]扩展到什么?

#[tokio::main] 宏是一个将你的主函数替换为非同步主函数的宏,它启动一个运行时,然后调用你的代码。例如,这样的main函数

#[tokio::main]
async fn main() {
    println!("Hello world");
}

通过这个宏变成了这样:

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

为了在我们自己的项目中使用 async/await,我们可以做一些类似的事情,我们利用. block_on 方法在适当的时候进入异步环境。

到mini-redis的同步接口

在本节中,我们将讨论如何通过存储 Runtime 对象并使用其 block_on 方法来构建 mini-redis 的同步接口。在接下来的章节中,我们将讨论一些替代的方法,以及何时应该使用每种方法。

我们要封装的接口是异步的 client 类型。它有几个方法,我们将实现以下方法的阻塞版本:

为此,我们引入了一个名为 src/blocking_client.rs 的新文件,并用一个围绕 async Client 类型的封装结构来初始化它:

use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;

pub use crate::client::Message;

/// Established connection with a Redis server.
pub struct BlockingClient {
    /// The asynchronous `Client`.
    inner: crate::client::Client,

    /// A `current_thread` runtime for executing operations on the
    /// asynchronous client in a blocking manner.
    rt: Runtime,
}

pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()?;

    // Call the asynchronous connect method using the runtime.
    let inner = rt.block_on(crate::client::connect(addr))?;

    Ok(BlockingClient { inner, rt })
}

在这里,我们把构造函数作为第一个例子,说明如何在非同步上下文中执行异步方法。我们使用Tokio Runtime类型上的block_on方法来做到这一点,它执行一个异步方法并返回其结果。

一个重要的细节是对 current_thread 运行时的使用。通常在使用Tokio时,你会使用默认的 multi_thread 运行时,它将产生一堆后台线程,这样它就可以有效地同时运行许多东西。对于我们的用例,我们每次只做一件事,所以我们不会因为运行多个线程而获得任何好处。这使得 current_thread 运行时成为完美的选择,因为它不会产生任何线程。

enable_all 调用启用了 Tokio 运行时的IO和定时器驱动。如果它们没有被启用,运行时就无法执行IO或定时器。

因为 current_thread 运行时不产生线程,所以它只在 block_on 被调用时运行。一旦 block_on 返回,所有在该运行时上生成的任务将冻结,直到你再次调用 block_on。如果催生的任务在不调用 block_on 时必须继续运行,请使用 multi_threaded runtime。

一旦我们有了这个结构体,大部分的方法就很容易实现:

use bytes::Bytes;
use std::time::Duration;

impl BlockingClient {
    pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
        self.rt.block_on(self.inner.get(key))
    }

    pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
        self.rt.block_on(self.inner.set(key, value))
    }

    pub fn set_expires(
        &mut self,
        key: &str,
        value: Bytes,
        expiration: Duration,
    ) -> crate::Result<()> {
        self.rt.block_on(self.inner.set_expires(key, value, expiration))
    }

    pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
        self.rt.block_on(self.inner.publish(channel, message))
    }
}

Client::subscribe方法更有趣,因为它将 Client 转化为 Subscriber 对象。我们可以用以下方式实现它:

/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform
/// pub/sub related commands. The `BlockingClient` type is
/// transitioned to a `BlockingSubscriber` type in order to
/// prevent non-pub/sub methods from being called.
pub struct BlockingSubscriber {
    /// The asynchronous `Subscriber`.
    inner: crate::client::Subscriber,

    /// A `current_thread` runtime for executing operations on the
    /// asynchronous client in a blocking manner.
    rt: Runtime,
}

impl BlockingClient {
    pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
        let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
        Ok(BlockingSubscriber {
            inner: subscriber,
            rt: self.rt,
        })
    }
}

impl BlockingSubscriber {
    pub fn get_subscribed(&self) -> &[String] {
        self.inner.get_subscribed()
    }

    pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
        self.rt.block_on(self.inner.next_message())
    }

    pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.subscribe(channels))
    }

    pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.unsubscribe(channels))
    }
}

因此,subscribe 方法将首先使用运行时将异步 Client 转化为异步 Subscriber。然后,它将把产生的 Subscriber 与运行时一起存储,并使用 block_on 实现各种方法。

请注意,异步 Subscriber 结构体有一个名为 get_subscribed 的非异步方法。为了处理这个问题,我们只需直接调用它而不涉及运行时。

其他方法

上节解释了实现同步包装器的最简单的方法,但这并不是唯一的方法。这些方法是:

  • 创建一个 Runtime 并在异步代码上调用 block_on。
  • 创建一个 Runtime 并在其上生成事物。
  • 在一个单独的线程中运行 Runtime 并向其发送消息。

我们已经看到了第一种方法。其他两种方法概述如下。

在运行时上生成东西

运行时对象有一个叫做 spawn 的方法。当你调用这个方法时,你会创建一个新的在运行时上运行的后台任务。比如说

use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};

fn main() {
    let runtime = Builder::new_multi_thread()
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(runtime.spawn(my_bg_task(i)));
    }

    // Do something time-consuming while the background tasks execute.
    std::thread::sleep(Duration::from_millis(750));
    println!("Finished time-consuming task.");

    // Wait for all of them to complete.
    for handle in handles {
        // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
        // a future, so we can wait for it using `block_on`.
        runtime.block_on(handle).unwrap();
    }
}

async fn my_bg_task(i: u64) {
    // By subtracting, the tasks with larger values of i sleep for a
    // shorter duration.
    let millis = 1000 - 50 * i;
    println!("Task {} sleeping for {} ms.", i, millis);

    sleep(Duration::from_millis(millis)).await;

    println!("Task {} stopping.", i);
}

在上面的例子中,我们在运行时催生了 10 个后台任务,然后等待所有的任务。作为一个例子,这可能是在图形应用程序中实现后台网络请求的一个好方法,因为网络请求在GUI主线程上运行太耗时了。相反,你在后台运行的 Tokio 运行时上生成请求,并让任务在请求完成后将信息送回GUI代码,如果你想要一个进度条,甚至可以递增。

在这个例子中,重要的是,运行时被配置为多线程运行时。如果你把它改为 current_thread 运行时,你会发现耗时的任务会在任何后台任务开始之前完成。这是因为在 current_thread 运行时上生成的后台任务只在调用 block_on 时执行,否则运行时就没有地方可以运行它们。

这个例子通过对调用 spawn 返回的 JoinHandle 调用 block_on 来等待生成的任务完成,但这并不是唯一的方法。这里有一些替代方法。

  • 使用消息传递通道,如 tokio::sync::mpsc
  • 修改由 Mutex 等保护的共享值。这对GUI中的进度条来说是一个很好的方法,GUI在每一帧都会读取共享值。

spawn 方法在 Handle 类型上也是可用的。Handle 类型可以被克隆,以便在一个运行时中获得许多句柄,每个 Handle 都可以用来在运行时中生成新的任务。

发送消息

第三种技术是催生一个运行时,并使用消息传递来与之通信。这比其他两种方法涉及更多的模板,但它是最灵活的方法。你可以在下面找到一个基本的例子。

use tokio::runtime::Builder;
use tokio::sync::mpsc;

pub struct Task {
    name: String,
    // info that describes the task
}

async fn handle_task(task: Task) {
    println!("Got task {}", task.name);
}

#[derive(Clone)]
pub struct TaskSpawner {
    spawn: mpsc::Sender<Task>,
}

impl TaskSpawner {
    pub fn new() -> TaskSpawner {
        // Set up a channel for communicating.
        let (send, mut recv) = mpsc::channel(16);

        // Build the runtime for the new thread.
        //
        // The runtime is created before spawning the thread
        // to more cleanly forward errors if the `unwrap()`
        // panics.
        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            rt.block_on(async move {
                while let Some(task) = recv.recv().await {
                    tokio::spawn(handle_task(task));
                }

                // Once all senders have gone out of scope,
                // the `.recv()` call returns None and it will
                // exit from the while loop and shut down the
                // thread.
            });
        });

        TaskSpawner {
            spawn: send,
        }
    }

    pub fn spawn_task(&self, task: Task) {
        match self.spawn.blocking_send(task) {
            Ok(()) => {},
            Err(_) => panic!("The shared runtime has shut down."),
        }
    }
}

这个例子可以用很多方式来配置。例如,你可以使用一个 Semaphore 来限制活动任务的数量,或者你可以使用一个相反方向的通道来向 spawner 发送响应。当你以这种方式催生一个运行时,它是一种 actor。

3 - Tokio背景知识

Tokio相关的背景知识

3.1 - IO

Tokio相关的IO背景知识

3.1.1 - 各种类型的IO:阻塞、非阻塞、多路复用和异步

解释阻塞、非阻塞、多路复用和异步IO

我发现对于软件程序员来说很难分清楚各种类型的IO。对于阻塞,非阻塞,多路复用和异步IO有很多的混淆点。所以我想尝试解释清楚各种IO类型意味着什么

硬件层面

在现代操作系统中,IO(输入/输出)是一种和外围设备交换数据的方式。包括读写磁盘或SSD,通过网络发送和接受数据,在显示器上显示,接入键盘和鼠标输入,等等。

现代操作系统和外围设备的交流取决于外围设备的特定类型以及他们的固件版本和硬件能力。通常来说,你可以认为外围设备是很高级的,他们可以同时处理多个并发的读写数据请求。也就是说,串行交流的日子一去不返了。在这些场景中,外围设备和CPU间的交流在硬件层面都是异步的。

这个异步机制被称为硬件中断(hardware interrupt)。想想一个简单的场景,CPU请求外围设备去读取一些数据,接着CPU会进入一个无限循环,每一次都会检查外围设备的数据是否可用,直到获得了数据为止。这种方法被称为轮询(polling),因为CPU需要保持检查外围设备。在现代硬件中,取而代之发生的是CPU请求外围硬件执行操作,然后就忘了这件事,继续处理其他的CPU指令。只要外围设备做完了,他会通过电路中断来通知CPU。这发生在硬件中,CPU因此不需要停下来或者检查这个外围设备,可以继续执行其他的工作,直到周边设备说已经做完了。

在软件层面

现在我们了解了硬件中发生的事,我们可以移动到软件这一侧了。在这一层IO通过多种方式被暴露:阻塞,非阻塞,多路复用和异步。让我们一个个来仔细解释。

阻塞/Blocking

还记得用户程序如何在一个进程内运行,代码是在线程的上下文中执行的吗?你总是会遇到需要编写一个需要从文件中读取数据的程序的情况。使用阻塞IO,你所做的是从你的线程中请求操作系统,将线程置于休眠(sleep),当数据可用于被消费时操作系统会唤醒线程。

也就是说,阻塞IO之所以被称为阻塞是因为使用他的线程会被阻塞直到IO完成。

非阻塞/Non-Blocking

阻塞IO的问题是当你的线程在休眠时,他除了等IO完成不能干其他事。有时候,你的程序可能没有其他事可做了。但如果还有其他事需要做的话,能在等待IO的时候并发做可是极好的。

其中一种实现方式被称为非阻塞IO。\他的思想是当你读取一个文件时,OS只是简单返回给你文件的内容或者一个等待状态告诉你IO还未完成,而不是将线程休眠。他不会阻塞你的线程,但之后检查IO是否完成的工作还是交给了你。这意味着当处于等待状态时,你可以去做一些工作,当你再次需要IO时,可以再读取一次,那时候IO可能已经完成了,文件的内容会返回,如果还是处于等待状态的话,你可以选择继续做其他事。

多路复用/Multiplexed

非阻塞IO的问题是如果你在等待IO的过程中要做的其他事情就是另外的IO的话,事情会变得很奇怪。

在一个好的场景下,你请求OS去读取文件A的内容,然后去做一些重计算的工作,做完之后再去检查文件A是否完成读取,如果完成了,你再做一些关于这个文件内容的操作,不然就继续做其他的工作,循环往复。但在一个坏的场景中,你没有重计算的工作要去做,而是需要去读取另一个文件B。那除了等待他们还有什么事要做呢?没有了,你的程序就进入了一个死循环,判断文件A是否被读取完毕,接着再去判断文件B,一遍又一遍。要么你使用简单的状态轮询,这会导致过多消耗CPU,或者你手动加入一些随意的休眠时间,不过这也意味着你将延迟知道IO完成,这会降低程序的吞吐。

为了避免这个问题,你可以使用多路复用IO来代替。他所做的是你再次阻塞在IO上,但这次不仅仅是一个一个的IO操作,你可以将所有需要的IO操作塞入队列,阻塞在所有的操作上。当其中有一个IO完成之后OS会唤醒你。一些多路复用的实现提供了更多的控制,你可以设置在特定一些IO操作完成之后再被唤醒,例如A和C文件或B和D文件完成的时候。

所以你可以调用非阻塞读取文件A,然后非阻塞读取文件B,最后告诉操作系统将我的线程置于休眠,当A和B的IO都完成的时候或其中一个完成的时候再唤醒他。

异步/Async

多路复用IO的问题是,在IO准备好供你处理之前,你仍然在休眠。同样,对于许多程序来说,这很好,也许你在等待IO操作完成的时候没有其他事情可做。但有时,你确实有其他事情可以做。也许你正在计算PI的位数,同时也在对一堆文件的值进行求和。你想做的是将所有文件的读取排成队列,当你等待它们被读取时,你将计算PI的数字。当一个文件读完后,你要总结它的值,然后再去计算更多的PI数字,直到另一个文件读完。

为了实现这个目标,你需要一种方法让你的PI数字计算在完成时被IO打断,并且你需要IO在完成时执行中断。

这是通过事件回调完成的。执行读取的调用需要一个回调,并立即返回。在IO完成的时候,操作系统会暂停你的线程,并执行你的回调。一旦回调执行完毕,它将恢复你的线程。

多线程 vs 单线程?

你会注意到,我所描述的各种IO都只说到一个单线程,也就是你的主应用线程。事实上,IO并不需要一个线程来执行,因为正如我在开始时解释的那样,外围设备都是在自己的电路中异步执行IO的。因此,在一个单一的线程模型中,可以做阻塞、非阻塞、多路复用和异步的IO。这就是为什么并发IO可以在没有多线程支持的情况下工作。

现在,如果你需要,对IO操作的结果所做的处理,或者请求IO操作的处理,显然可以是多线程的。这允许你在并发的IO之上进行并发的计算。所以没有什么能阻止多线程和这些IO机制的混合。

事实上,有一种非常流行的第五种IO,它确实依赖于多线程。它经常被混淆地称为非阻塞式IO或异步式IO,因为它以类似于其中一种的界面出现。事实上,它是在伪造真正的非阻塞或异步IO。它的工作方式很简单,它使用阻塞式IO,但每个阻塞式调用是在它自己的线程中进行的。现在,根据不同的实现,它要么需要一个回调,要么使用一个轮询模型,比如返回一个Future。

结束语

我希望这已经澄清了你对各种类型的IO的理解。重要的是要记住,不是所有的操作系统和所有的外围设备都支持这些。同样,也不是所有的编程语言都为操作系统支持的所有种类的IO提供了API。

就这样吧。所有各种IO的解释。

希望你喜欢!

更多阅读

免责声明

我不是一个系统层面的程序员,我也不是一个操作系统提供的所有种类IO方面的专家。这篇文章是我尽可能总结我所知的内容,更偏向于中间层面的知识。所以如果你发现有任何问题的话请指正我。

内容出处