控制台开发日记(1)

Console是一个Rust异步调试工具。它的目标是让你在试图更好地理解你的异步任务的行为方式时,成为你所要使用的工具。

自从几个月前开始制作 Tokio Console 原型以来,我们一直在努力工作,把它变成一个很棒的异步任务调试器。我们想围绕它的进展情况提供一些集中的更新。

控制台是什么?

Console 是一个Rust异步调试工具。它的目标是让你在试图更好地理解你的异步任务的行为方式时,成为你所要使用的工具。它利用了 tracing event 和 span,因此旨在实现运行时的无关性。

更新

易于启动和生成器: 我们使向你的应用程序添加仪器变得更加容易。对于大多数人来说,只需在主函数的顶部添加 console_subscriber::init() 就足够了!它将使用合理的默认值,检查一些环境变量进行定制,并建立一个独立的订阅器。如果你需要更多的控制与现有的跟踪或运行时集成,也有一个方便的 console_subscriber::Builder API。

129774465-7bd2ad2f-f1a3-4830-a8fa-f72667028fa1

一切都更漂亮了! 主列表视图看起来好多了。我们把任务的 “name” 变成了一等一的东西,在列表里有了自己的栏目。任务ID变得更漂亮,与用户期望的更一致。颜色和UTF-8的使用更好了,它默认检查终端支持的内容。例如,显示持续时间的字段对不同的量级(纳秒与毫秒与秒)使用微妙的不同颜色。

129774524-288c967b-6066-4f98-973d-099b3e6a2c55

**选择任务来查看"任务细节"视图。**这包括关于该任务的更多细节和指标。有一个任务所有轮询时间的柱状图,让你看到你的任务需要多长时间才能完成工作。还有关于一个任务被唤醒多少次的信息,你可以将其与轮询次数以及自上次唤醒后的时间进行比较。

时间性。在与用户交流后,我们优先考虑了控制台的一些 “时间控制” 功能。到目前为止,我们已经实现了暂停控制台的功能(仍然可以探索现有的任务),然后恢复到 “活动”。现在有一个选项可以将所有相关事件记录到磁盘上的一个文件中,目的是能够在控制台中重放该文件。

视频演示。我们制作了一个演示,展示了控制台,以及如何使用它来调试一些常见的任务错误行为。

感谢:

  • Eliza Weisman
  • Sean McArthur
  • Zahari Dichev
  • Oğuz Bilgener
  • @gneito
  • @memoryruins
  • Jacob Rothstein
  • Artem Vorotnikov
  • David Barsky
  • Wu Aoxiang

我们也要感谢你们在你们的应用中一直在尝试它,并给我们提供了宝贵的反馈意见。

内容出处: https://tokio.rs/blog/2021-09-console-dev-diary-1

Axum宣布

axum 是一个易用而功能强大的网络框架,旨在充分利用 Tokio 的生态系统

今天,我们很高兴地宣布 axum:一个易于使用,但功能强大的网络框架,旨在充分利用 Tokio 的生态系统。

高级特性

  • 通过一个不使用宏(macro free?)的API将请求路由到处理程序
  • 使用提取器(extractor)对请求进行声明式的解析
  • 简单和可预测的错误处理模式。
  • 用最少的模板生成响应。
  • 充分利用 towertower-http 的中间件、服务和工具的生态系统

特别是最后一点,是 axum 与现有框架不同的地方。axum 没有自己的中间件系统,而是使用tower::Service。这意味着 axum 可以免费获得超时、跟踪、压缩、授权等功能。它还可以让你与使用 hypertonic 编写的应用程序共享中间件。

使用示例

axum 的 “hello world” 是这样的:

use axum::prelude::*;
use std::net::SocketAddr;

#[tokio::main]
async fn main() {
    let app = route("/", get(root));

    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    hyper::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn root() -> &'static str {
    "Hello, World!"
}

GET/ 的请求响应是 200 OK,其中正文是 Hello, World!。任何其他请求将导致 404 Not Found 响应。

提取器

请求可以使用 “提取器/extractor” 进行声明式的解析。提取器是一个实现了 FromRequest 的类型。提取器可以作为处理程序的参数,如果请求的URI匹配,就会运行。

例如,Json 是一个提取器,它消耗请求主体并将其解析为JSON:

use axum::{prelude::*, extract::Json};
use serde::Deserialize;

#[derive(Deserialize)]
struct CreateUser {
    username: String,
}

async fn create_user(Json(payload): Json<CreateUser>) {
    // `payload` is a `CreateUser`
}

let app = route("/users", post(create_user));

axum 提供了许多有用的提取器,例如:

你也可以通过实现 FromRequest 来定义你自己的提取器。

构建响应

处理程序可以返回任何实现了 IntoResponse 的东西,它将被自动转换为响应:

use http::StatusCode;
use axum::response::{Html, Json};
use serde_json::{json, Value};

// We've already seen returning &'static str
async fn text() -> &'static str {
    "Hello, World!"
}

// String works too
async fn string() -> String {
    "Hello, World!".to_string()
}

// Returning a tuple of `StatusCode` and another `IntoResponse` will
// change the status code
async fn not_found() -> (StatusCode, &'static str) {
    (StatusCode::NOT_FOUND, "not found")
}

// `Html` gives a content-type of `text/html`
async fn html() -> Html<&'static str> {
    Html("<h1>Hello, World!</h1>")
}

// `Json` gives a content-type of `application/json` and works with any type
// that implements `serde::Serialize`
async fn json() -> Json<Value> {
    Json(json!({ "data": 42 }))
}

这意味着在实践中,你很少需要建立你自己的响应。你也可以实现 IntoResponse 来创建你自己的特定领域响应。

路由

可以使用一个简单的 DSL 来组合多个路由。

use axum::prelude::*;

let app = route("/", get(root))
    .route("/users", get(list_users).post(create_user))
    .route("/users/:id", get(show_user).delete(delete_user));

中间件

axum 支持来自 towertower-http 的中间件。

use axum::prelude::*;
use tower_http::{compression::CompressionLayer, trace::TraceLayer};
use tower::ServiceBuilder;
use std::time::Duration;

let middleware_stack = ServiceBuilder::new()
    // timeout all requests after 10 seconds
    .timeout(Duration::from_secs(10))
    // add high level tracing of requests and responses
    .layer(TraceLayer::new_for_http())
    // compression responses
    .layer(CompressionLayer::new())
    // convert the `ServiceBuilder` into a `tower::Layer`
    .into_inner();

let app = route("/", get(|| async { "Hello, World!" }))
    // wrap our application in the middleware stack
    .layer(middleware_stack);

这个功能很关键,因为它允许我们只写一次中间件,并在不同的应用中分享它们。例如,axum 不需要提供自己的 tracing/logging 中间件,可以直接使用来自 tower-httpTraceLayer 。同样的中间件也可以用于用 tonic 制作的客户端或服务器。

路由到任何 tower::Service

axum 也可以将请求路由到任何 tower 服务。可以是你用 service_fn 编写的服务,也可以是来自其他 crate 的东西,比如来自 tower-httpServeFile

use axum::{service, prelude::*};
use http::Response;
use std::convert::Infallible;
use tower::{service_fn, BoxError};
use tower_http::services::ServeFile;

let app = route(
    // Any request to `/` goes to a some `Service`
    "/",
    service::any(service_fn(|_: Request<Body>| async {
        let res = Response::new(Body::from("Hi from `GET /`"));
        Ok::<_, Infallible>(res)
    }))
).route(
    // GET `/static/Cargo.toml` goes to a service from tower-http
    "/static/Cargo.toml",
    service::get(ServeFile::new("Cargo.toml"))
);

了解更多

这只是 axum 提供的一个小例子。错误处理、网络套接字和解析 multipart/form-data 请求是这里没有显示的一些功能。请参阅文档以了解更多细节。

我们也鼓励您查看软件库中的例子,看看一些用 axum编写的稍大的应用程序。

内容出处: https://tokio.rs/blog/2021-07-announcing-axum

宣布 tokio-uring:Tokio 的 io-uring 支持

发布"tokio-uring"crate的第一个版本,为Linux上的io-uring系统API提供支持。

今天,我们发布了 “tokio-uring” crate的第一个版本,为Linux上的 io-uring 系统API提供支持。这个版本提供了异步文件操作,我们将在后续版本中增加对更多操作的支持。

要使用 tokio-uring,首先要在 crate 上添加一个依赖项:

tokio-uring = "0.1.0"

然后,启动 tokio-uring 运行时 ,并从文件中读取:

use tokio_uring::fs::File;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tokio_uring::start(async {
        // Open a file
        let file = File::open("hello.txt").await?;

        let buf = vec![0; 4096];
        // Read some data, the buffer is passed by ownership and
        // submitted to the kernel. When the operation completes,
        // we get the buffer back.
        let (res, buf) = file.read_at(buf, 0).await;
        let n = res?;

        // Display the contents
        println!("{:?}", &buf[..n]);

        Ok(())
    })
}

tokio-uring 运行时在底下使用Tokio运行时,所以它与Tokio类型和库(如hyper和tonic)兼容。下面是和上面一样的例子,但我们不是写到STDOUT,而是写到一个Tokio TCP套接字。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio_uring::fs::File;

fn main() {
    tokio_uring::start(async {
        // Start a TCP listener
        let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();

        // Accept new sockets
        loop {
            let (mut socket, _) = listener.accept().await.unwrap();

            // Spawn a task to send the file back to the socket
            tokio_uring::spawn(async move {
                // Open the file without blocking
                let file = File::open("hello.txt").await.unwrap();
                let mut buf = vec![0; 16 * 1_024];

                // Track the current position in the file;
                let mut pos = 0;

                loop {
                    // Read a chunk
                    let (res, b) = file.read_at(buf, pos).await;
                    let n = res.unwrap();

                    if n == 0 {
                        break;
                    }

                    socket.write_all(&b[..n]).await.unwrap();
                    pos += n as u64;

                    buf = b;
                }
            });
        }
    });
}

所有的 tokio-uring 操作都是真正的异步,与 tokio::fs 提供的API不同,后者在线程池上运行。从线程池中使用同步的文件系统操作会增加大量的开销。有了io-uring,我们可以在同一个线程中异步地执行网络和文件系统操作。但是,io-uring 的内容很多。

Tokio目前的Linux实现使用非阻塞系统调用和 epoll 来进行事件通知。使用 epoll,一个经过调整的TCP代理将花费70%到80%的CPU周期在用户空间之外,包括执行系统调用和在内核和用户空间之间复制数据的周期。Io-uring 通过消除大多数系统调用来减少开销,对于某些操作,提前映射用于字节缓冲区的内存区域。早期将 io-uringepoll 进行比较的基准是有希望的;用C语言实现的TCP echo客户端和服务器显示了高达60%的改进。

最初的 tokio-uring 版本提供了一套适度的API,但我们计划在未来的版本中增加对 io-uring 的所有功能的支持。请看设计文件以了解我们的发展方向。

所以,请尝试一下这个 crate,并随时提出问题或报告问题。

另外,我们要感谢所有在这一过程中提供帮助的人,特别是Glauber Costa(Glommio的作者),他耐心地回答了我的许多问题,withoutboats最初的探索(Ringbahn)和花时间与我讨论设计问题,以及 quininer 在纯Rust `io-uring 绑定上的出色工作。

内容出处: https://tokio.rs/blog/2021-07-tokio-uring

宣布Tonic 0.5版本

Tonic是Rust的gRPC原生实现。0.5是一个重要的版本

我们很高兴地宣布 Tonic 的0.5版本,它是 Rust 的原生 gRPC 实现。0.5是一个重要的版本,已经酝酿了一段时间。

一些关键的新功能是:

gRPC-Web

gRPC-Web 是一个协议,允许客户端通过 HTTP/1.1 连接到 gRPC 服务,而不是通常的HTTP/2。gRPC-Web 的一个常见用例是在浏览器中运行的JavaScript客户端。以前,这需要使用一个代理来将 HTTP/2 请求翻译成 HTTP/1.1。

然而,新的 crate tonic-web 允许普通的 Tonic 服务器接受 gRPC-Web 请求,而不需要外部代理。

启用gRPC-Web支持很简单:

use tonic::transport::Server;
// code generated by tonic-build
use hello_world::greeter_server::{GreeterServer, Greeter};

struct MyGreeter;

#[tonic::async_trait]
impl Greeter for MyGreeter {
    // ...
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "0.0.0.0:3000".parse().unwrap();

    let greeter = GreeterServer::new(MyGreeter);

    // enable grpc-web support for our `greeter` service
    let service = tonic_web::enable(greeter);

    Server::builder()
        // by default, tonic servers only accept http2 requests
        // so we have to enable receiving http1 as well
        .accept_http1(true)
        .add_service(service)
        .serve(addr)
        .await?;

    Ok(())
}

更多细节请见tonic-web crate。

压缩

Tonic现在可以透明地压缩和解压请求、响应和数据流。

在客户端上启用压缩功能是这样做的:

let client = GreeterClient::new(channel)
    // compress requests
    .send_gzip()
    /// accept compressed responses
    .accept_gzip();

而在服务器上:

let service = GreeterServer::new(greeter)
    // accept compressed requests
    .accept_gzip()
    // compress responses, if supported by the client
    .send_gzip();

注意这需要在 Tonictonic-build 上启用 compression 功能。更多细节请参见文档。

改进的Tower集成

Tonic 一直支持通过 Tower 的 Service trait 来扩展客户端和服务器,但在0.5版本中,新的 Server::layer 方法使其更加简单。例如,我们可以通过使用来自 tower-http 的中间件来为一个服务添加追踪和授权:

use tonic::transport::Server;
use tower_http::{
    auth::RequireAuthorizationLayer,
    trace::TraceLayer,
};

// The stack of middleware our service will be wrapped in
let layer = tower::ServiceBuilder::new()
    // High level tracing of requests and responses
    .layer(TraceLayer::new_for_grpc())
    // Authorize all requests using a token
    .layer(RequireAuthorizationLayer::bearer("my-secret-token"))
    // Convert our `ServiceBuilder` into a `tower::Layer`
    .into_inner();

Server::builder()
    // Apply our middleware stack to the server
    .layer(layer)
    .add_service(GreeterServer::new(MyGreeter)
    .serve(addr)
    .await?;

更灵活的拦截器

拦截器是轻量级的中间件,除其他外,可以用来修改传入请求的元数据,也可以选择用状态拒绝它们。

然而,Tonic对拦截器的支持一直是相当有限的。例如,你不能结合多个拦截器,而必须使用Tower的 service 抽象,它更强大,但也需要更多的代码来设置。

在Tonic 0.5中,我们有一个新的拦截器API,和以前一样容易使用,但现在内部使用Tower。这意味着拦截器可以像其他Tower中间件一样被应用。例如,在一个客户端添加多个拦截器,现在可以用:

use tonic::{
  Request, Status,
  service::interceptor_fn,
  transport::Endpoint
};

fn intercept_one(req: Request<()>) -> Result<Request<()>, Status> {
    // ...
}

fn intercept_two(req: Request<()>) -> Result<Request<()>, Status> {
    // ...
}

let channel = Endpoint::from_static("http://[::1]:50051").connect_lazy()?;

let intercepted_channel = tower::ServiceBuilder::new()
    .layer(interceptor_fn(intercept_one))
    .layer(interceptor_fn(intercept_two))
    .service(channel);

let client = GreeterClient::new(intercepted_channel);

0.5包括许多其他较小的功能和改进。更新日志里有所有的细节。

如同以往,如果你有问题,你可以在Tokio Discord服务器的#tonic找到我们。

内容出处: https://tokio.rs/blog/2021-07-tonic-0-5

宣布tower-http:HTTP特定中间件和实用程序的集合

用Tower的 service trait 构建的HTTP特定中间件和实用程序的集合

今天我很高兴地宣布tower-http,它是一个用Tower的 service trait 构建的HTTP特定中间件和实用程序的集合。

Tower本身包含的中间件都是与协议无关的。例如,它的超时中间件与任何服务实现兼容,无论它使用哪种协议。这很好,因为它意味着中间件更可重用,但也意味着你不能使用协议的特定功能。在HTTP的情况下,这意味着Tower中的中间件不知道状态码、头信息或其他HTTP的特定功能。

另一方面,tower-http 包含了针对HTTP的中间件。它使用 httphttp-body crate,这意味着它与任何使用这些板块的板块兼容,例如 hyper、tonic 和 warp。

tower-http 的目标是提供一套丰富的中间件来解决构建 HTTP 客户端和服务器时的常见问题。一些亮点是:

  • 追踪:轻松地在你的应用程序中添加高级别的 tracing/logging。支持通过状态代码以及 gRPC 特定的头信息来确定成功或失败。有很好的默认值,但也支持深度定制。

  • 压缩和解压:自动压缩或解压响应体。这与使用 ServeDir 的静态文件的服务非常吻合。

  • FollowRedirect:自动跟踪重定向响应。

还有一些小工具,如设置请求和响应头,从日志中隐藏敏感头,授权等。

用tower-http中的东西构建一个小服务器看起来像这样:

use tower_http::{
    compression::CompressionLayer,
    auth::RequireAuthorizationLayer,
    trace::TraceLayer,
};
use tower::{ServiceBuilder, make::Shared};
use http::{Request, Response};
use hyper::{Body, Error, server::Server};
use std::net::SocketAddr;

// Our request handler. This is where we would implement the application logic
// for responding to HTTP requests...
async fn handler(request: Request<Body>) -> Result<Response<Body>, Error> {
    Ok(Response::new(Body::from("Hello, World!")))
}

#[tokio::main]
async fn main() {
    // Use `tower`'s `ServiceBuilder` API to build a stack of middleware
    // wrapping our request handler.
    let service = ServiceBuilder::new()
        // High level tracing of requests and responses.
        .layer(TraceLayer::new_for_http())
        // Compress responses.
        .layer(CompressionLayer::new())
        // Authorize requests using a token.
        .layer(RequireAuthorizationLayer::bearer("tower-is-cool"))
        // Wrap a `Service` in our middleware stack.
        .service_fn(handler);

    // And run our service using `hyper`.
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    Server::bind(&addr)
        .serve(Shared::new(service))
        .await
        .expect("server error");
}

而建立一个看起来是这样的客户:

use tower_http::{
    decompression::DecompressionLayer,
    set_header::SetRequestHeaderLayer,
};
use tower::ServiceBuilder;
use hyper::Body;
use http::{Request, Response, HeaderValue, header::USER_AGENT};

let client = ServiceBuilder::new()
    // Log failed requests
    .layer(
        TraceLayer::new_for_http()
            .on_request(())
            .on_response(())
            .on_body_chunk(())
            .on_eos(())
            // leave the `on_failure` callback as the default
    )
    // Set a `User-Agent` header on all requests
    .layer(SetRequestHeaderLayer::<_, Body>::overriding(
        USER_AGENT,
        HeaderValue::from_static("my-app")
    ))
    // Decompress response bodies
    .layer(DecompressionLayer::new())
    // Wrap a `hyper::Client` in our middleware stack
    .service(hyper::Client::new());

我们在文档中投入了大量的精力,确保每件事都有易于理解的例子。我们还建立了两个例子,一个用warp,另一个用tonic,向你展示如何把所有东西放在一起。

我们非常欢迎你的贡献,如果你有问题,你可以在Tokio Discord服务器找到我们。

内容出处: https://tokio.rs/blog/2021-05-announcing-tower-http

宣布Valuable:一个提供对象安全的值检查的类库

Valuable允许调用者在不知道其类型的情况下检查值的内容,无论是字段、枚举变体,还是基础类型

在过去的几周里,我们一直在开发Valuable,一个提供对象安全的值检查的新crate。它几乎已经准备好发布了,所以我想我应该写一篇文章来介绍它。这个 crate 提供了一个对象安全的trait–Valuable,它允许调用者在不知道其类型的情况下检查值的内容,无论是字段、枚举变体,还是基础类型。最初,我们写Valuable是为了支持Tracing;然而,它在几种情况下是有帮助的。对象安全的值检查有点拗口,所以让我们先看看Tracing,以及为什么那里需要它。

Tracing 是一个用于检测Rust程序的框架,收集结构化的、基于事件的诊断信息。有些人认为它是一个结构化的日志框架,虽然它可以满足这个用例,但它可以做更多。例如,Console 的目标是成为调试异步Rust应用程序的强大工具,并使用 Tracing 作为其支柱。Tokio 和其他库通过 Tracing 发出仪表信息。Console 将这些事件聚合到应用程序的执行模型中,使开发者能够深入了解错误和其他问题。

YJIrHK2

仪器化的应用程序发出带有丰富结构化数据的事件,而收集器则接收这些事件。当然,在编译时,被探测的应用程序和事件收集器并不了解对方。一个 trait 对象将仪器化的一半与收集器的一半连接起来,使收集器能够动态地注册自己。因此,将丰富的、结构化的数据从仪器化的一半传递给收集器需要通过 trait 对象的边界传递。今天,Tracing 在最低水平上支持这个,但不支持传递嵌套数据。

让我们来看看一个实际的用例。给定一个HTTP服务,在一个HTTP请求开始的时候,我们想发出一个包括相关HTTP头的追踪事件。这些数据可能看起来像这样:

{
  user_agent: "Mozilla/4.0 (compatible; MSIE5.01; Windows NT)",
  host: "www.example.com",
  content_type: {
    mime: "text/xml",
    charset: "utf-8",
  },
  accept_encoding: ["gzip", "deflate"],
}

在应用程序中,Rust结构体存储头文件:

struct Headers {
    user_agent: String,
    host: String,
    content_type: ContentType,
    accept_encoding: Vec<String>,
}

struct ContentType {
    mime: String,
    charset: String,
}

我们想把这些数据传递给事件收集器,但如何传递呢?事件收集器不知道 Headers 结构,所以我们不能只是定义一个接收 &Headers 的方法。我们可以使用像 serde_json::Value 这样的类型来传递任意的结构化数据,但这需要从我们应用程序的结构体中分配和复制数据来交给收集器。

Valuable crate的目的就是要解决这个问题。在HTTP头的案例中,首先,我们要为我们的 Headers 类型实现 Valuable。然后,我们可以将一个 &dyn Valuable 引用传递给事件收集器。采集器可以使用 Valuable 的访问者API来检查该值,并提取与其使用情况相关的数据。

// Visit the root of the Headers struct. This visitor will find the
// `accept_encoding` field on `Headers` and extract the contents. All other
// fields are ignored.
struct VisitHeaders {
    /// The extracted `accept-encoding` header values.
    accept_encoding: Vec<String>,
}

// Visit the `accept-encoding` `Vec`. This visitor iterates the items in
// the list and pushes it into its `accept_encoding` vector.
struct VisitAcceptEncoding<'a> {
    accept_encoding: &'a mut Vec<String>,
}

impl Visit for VisitHeaders {
    fn visit_value(&mut self, value: Value<'_>) {
        // We expect a `Structable` representing the `Headers` struct.
        match value {
            // Visiting the struct will call `visit_named_fields`.
            Value::Structable(v) => v.visit(self),
            // Ignore other patterns
            _ => {}
        }
    }

    fn visit_named_fields(&mut self, named_values: &NamedValues<'_>) {
        // We only care about `accept_encoding`
        match named_values.get_by_name("accept_encoding") {
            Some(Value::Listable(accept_encoding)) => {
                // Create the `VisitAcceptEncoding` instance to visit
                // the items in `Listable`.
                let mut visit = VisitAcceptEncoding {
                    accept_encoding: &mut self.accept_encoding,
                };
                accept_encoding.visit(&mut visit);
            }
            _ => {}
        }
    }
}

// Extract the "accept-encoding" headers
let mut visit = VisitHeaders { accept_encoding: vec![] };
valuable::visit(&my_headers, &mut visit);

assert_eq!(&["gzip", "deflate"], &visit.accept_encoding[..]);

注意访问者API如何让我们选择检查哪些数据。我们只关心 accept_encoding 值,所以这是我们唯一访问的字段。我们不访问 content_type 字段。

Valuable crate将每个值表示为 Value 枚举的一个实例。原始的 rust 类型被枚举出来,其他类型被分为可结构化(Structable)、可枚举(Enumerable)、可列表(Listable)或可映射(Mappable),由同名的 trait 表示。实现一个结构体或枚举体的 traits 通常是使用程序性宏来完成的;然而,它可能看起来像这样:

static FIELDS: &[NamedField<'static>] = &[
    NamedField::new("user_agent"),
    NamedField::new("host"),
    NamedField::new("content_type"),
    NamedField::new("accept_encoding"),
];

impl Valuable for Headers {
    fn as_value(&self) -> Value<'_> {
        Value::Structable(self)
    }

    fn visit(&self, visit: &mut dyn Visit) {
        visit.visit_named_fields(&NamedValues::new(
            FIELDS,
            &[
                Value::String(&self.user_agent),
                Value::String(&self.host),
                Value::Structable(&self.content_type),
                Value::Listable(&self.accept_encoding),
            ]
        ));
    }
}

impl Structable for Headers {
    fn definition(&self) -> StructDef<'_> {
        StructDef::new_static("Headers", Fields::Named(FIELDS))
    }
}

请注意 visit 实现除了原始类型外没有复制任何数据。如果访问者不需要检查子字段,就不需要进一步的工作。

我们期望 Valuable 的作用不仅仅是跟踪。例如,在需要对象安全的时候,它对任何序列化都是有帮助的。Valuable 不是 Serde 的替代品,也不会提供反序列化的API。然而,Valuable 可以补充 Serde,因为 Serde 的序列化API由于trait的相关类型而不是 trait-object 安全的(erased-serde 的存在可以解决这个问题,但需要为每个嵌套的数据结构分配)。一个 valuable-serde crate 已经在进行中(感谢 taiki-e),为实现 Valuable 和 Serialize 的类型提供了一个桥梁。为了获得对象安全的序列化,派生 Valuable 而不是 Serialize,并序列化 Valuable trait 对象。

作为另一个潜在的用例,Valuable 可以在渲染模板时有效地提供数据。模板引擎在渲染模板时必须按需访问数据字段。例如,Handlebars crate目前使用serde_json::Value 作为渲染时的参数类型,要求调用者将数据复制到 serde_json::Value 实例中。相反,如果 Handlebars 使用 Valuable,就会跳过复制的步骤。

现在我们需要你试一试Valuable,让我们知道它是否能满足你的使用情况。因为 Tracing 1.0 将依赖于Valuable,我们希望在2022年初稳定发布 Valuable 的1.0版本。这并没有给我们很多时间,所以我们需要尽早找到API漏洞。尝试使用 Valuable 编写库,特别是模板引擎或本帖所暗示的其他用例。我们还可以在 “桥梁” crate(例如 valuable-http)方面得到帮助,这些板块为生态系统中常见的数据类型提供宝贵的实现。我们还有很多工作要做,以扩展配置选项和其他功能的派生宏,所以请到Tokio discord服务器上的#valuable频道打招呼。

内容出处: https://tokio.rs/blog/2021-05-valuable

发明Service特质

Console是一个Rust异步调试工具。它的目标是让你在试图更好地理解你的异步任务的行为方式时,成为你所要使用的工具。

Tower是一个模块化和可重复使用的组件库,用于构建强大的网络客户端和服务器。其核心是 Service 特性。Service 是一个异步函数,它接受一个请求并产生一个响应。然而,其设计的某些方面可能并不那么明显。与其解释今天存在于Tower中的 Service 特性,不如让我们想象一下如果你从头开始,你会如何发明 Service 背后的动机。

想象一下,你正在用Rust构建一个小小的HTTP框架。这个框架将允许用户通过提供接收请求并回复一些响应的代码来实现一个HTTP服务器。

你可能有一个这样的API:

// Create a server that listens on port 3000
let server = Server::new("127.0.0.1:3000").await?;

// Somehow run the user's application
server.run(the_users_application).await?;

问题是,the_users_application应该是什么?

最简单的可能是:

fn handle_request(request: HttpRequest) -> HttpResponse {
    // ...
}

其中 HttpRequestHttpResponse 是由我们的框架提供的一些结构体。

有了这个,我们可以像这样实现 Server::run :

impl Server {
    async fn run<F>(self, handler: F) -> Result<(), Error>
    where
        F: Fn(HttpRequest) -> HttpResponse,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Call the handler provided by the user
            let response = handler(request);

            write_http_response(connection, response).await?;
        }
    }
}

在这里,我们有一个异步函数 run,它接受一个接受 HttpRequest 并返回 HttpResponse 的闭包。

这意味着用户可以像这样使用我们的服务器:

fn handle_request(request: HttpRequest) -> HttpResponse {
    if request.path() == "/" {
        HttpResponse::ok("Hello, World!")
    } else {
        HttpResponse::not_found()
    }
}

// Run the server and handle requests using our `handle_request` function
server.run(handle_request).await?;

这并不坏。它使用户可以很容易地运行HTTP服务器,而不必担心任何低层次的细节问题。

然而,我们目前的设计有一个问题:我们不能异步地处理请求。想象一下,我们的用户在处理请求的同时需要查询数据库或向其他一些服务器发送请求。目前,这需要在我们等待处理程序产生响应时进行阻塞。如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时能够为其他请求提供服务。让我们通过让处理程序函数返回一个 future 来解决这个问题:

impl Server {
    async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
    where
        // `handler` now returns a generic type `Fut`...
        F: Fn(HttpRequest) -> Fut,
        // ...which is a `Future` whose `Output` is an `HttpResponse`
        Fut: Future<Output = HttpResponse>,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Await the future returned by `handler`
            let response = handler(request).await;

            write_http_response(connection, response).await?;
        }
    }
}

使用这个API和以前非常相似:

// Now an async function
async fn handle_request(request: HttpRequest) -> HttpResponse {
    if request.path() == "/" {
        HttpResponse::ok("Hello, World!")
    } else if request.path() == "/important-data" {
        // We can now do async stuff in here
        let some_data = fetch_data_from_database().await;
        make_response(some_data)
    } else {
        HttpResponse::not_found()
    }
}

// Running the server is the same
server.run(handle_request).await?;

这就好得多了,因为我们的请求处理现在可以调用其他异步函数。然而,仍然缺少一些东西。如果我们的处理程序遇到了错误,不能产生响应怎么办?让我们让它返回一个 Result:

impl Server {
    async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
    where
        F: Fn(HttpRequest) -> Fut,
        // The response future is now allowed to fail
        Fut: Future<Output = Result<HttpResponse, Error>>,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Pattern match on the result of the response future
            match handler(request).await {
                Ok(response) => write_http_response(connection, response).await?,
                Err(error) => handle_error_somehow(error, connection),
            }
        }
    }
}

添加更多的行为

现在,假设我们想确保所有的请求及时完成或失败,而不是让客户端无限期地等待一个可能永远不会到达的响应。我们可以通过给每个请求添加一个超时来做到这一点。超时设置了一个处理程序允许的最大持续时间的限制。如果它在这个时间内没有产生一个响应,就会返回一个错误。这允许客户端重试该请求或向用户报告一个错误,而不是永远等待。

你的第一个想法可能是修改Server,使其可以配置一个超时。然后它在每次调用处理程序时都会应用这个超时。然而,事实证明,你实际上可以在不修改Server的情况下添加一个超时。使用 tokio::time::timeout,我们可以做一个新的处理函数,调用之前的 handle_request,但超时时间为 30 秒。

async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
    let result = tokio::time::timeout(
        Duration::from_secs(30),
        handle_request(request)
    ).await;

    match result {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(error)) => Err(error),
        Err(_timeout_elapsed) => Err(Error::timeout()),
    }
}

这提供了一个相当好的关注点分离。我们能够在不改变任何现有代码的情况下增加一个超时功能。

让我们用这种方式再增加一个功能。想象一下,我们正在构建一个JSON API,因此希望在所有的响应上都有一个 Content-Type: application/json 头。我们可以用类似的方式包装 handler_with_timeout,并像这样修改响应:

async fn handler_with_timeout_and_content_type(
    request: HttpRequest,
) -> Result<HttpResponse, Error> {
    let mut response = handler_with_timeout(request).await?;
    response.set_header("Content-Type", "application/json");
    Ok(response)
}

我们现在有了一个处理程序,它可以处理一个HTTP请求,时间不超过30秒,并且总是有正确的Content-Type头,所有这些都不用修改我们原来的handle_request函数或Server结构。

设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过分层的新行为来扩展库的功能,而不必等待库的维护者为其添加支持。

它也使测试更容易,因为你可以将你的代码分解成小的隔离单元,并为它们编写细粒度的测试,而不必担心所有其他的部分。

然而,有一个问题。我们目前的设计让我们通过将一个处理函数包裹在一个新的处理函数中来组成新的行为,该处理函数实现了该行为,然后调用内部函数。这很有效,但如果我们想增加很多额外的功能,它就不能很好地扩展。想象一下,我们有许多 handle_with_* 函数,每个函数都增加了一点新的行为。要硬编码哪一个中间处理程序调用哪一个,将变得很有挑战性。我们目前的链条是

  1. handler_with_timeout_and_content_type 调用
  2. handler_with_timeout 调用
  3. handle_request 实际处理请求

如果我们能以某种方式组合(compose)这三个函数而不需要硬编码确切的顺序,那就更好了。比如说:

let final_handler = with_content_type(with_timeout(handle_request));

同时仍然能够像以前一样运行我们的处理程序:

server.run(final_handler).await?;

你可以尝试将 with_content_typewith_timeout 作为函数来实现,该函数接受一个 F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>> 类型的参数,并返回一个像 impl Fn(HttpRequest) -> impl Future<Output = Result<HttpResponse, Error>> 的闭包,但由于Rust今天允许 impl Trait 的限制,这实际上不可能。特别是 impl Fn() -> impl Future 是不允许的。使用 Box 是可能的,但这有一个我们想避免的性能代价。

除了调用处理程序之外,你也不能为其添加其他行为,但为什么有必要这样做,我们会再讨论。

Handler特性

让我们尝试另一种方法。与其说 Server::run 接受一个闭包(Fn(HttpRequest) -> ...),不如说我们做一个新的trait来封装同样的 async fn(HttpRequest) -> Result<HttpResponse, Error>:

trait Handler {
    async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}

有了这样一个特性,我们就可以写出实现它的具体类型,这样我们就不用一直和 Fn 打交道了。

然而,Rust目前不支持异步trait方法,所以我们有两个选择:

  1. call 返回一个封箱(boxed)的 future,如 Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>。这就是 async-trait 箱的作用。

  2. Handler 添加一个相关类型的 Future,这样用户就可以选择自己的类型。

让我们选择方案二,因为它是最灵活的。有具体的 future 类型的用户可以使用该类型,而不需要付出Box的代价,不关心的用户仍然可以使用Pin<Box<…>。

trait Handler {
    type Future: Future<Output = Result<HttpResponse, Error>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future;
}

我们仍然必须要求 Handler::Future 实现 Future,其输出类型为 Result<HttpResponse, Error>,因为那是 Server::run 所要求的。

call 获取 &mut self 很有用,因为它允许处理程序在必要时更新其内部状态。

让我们把原来的 handle_request 函数转换成这个特性的实现。

struct RequestHandler;

impl Handler for RequestHandler {
    // We use `Pin<Box<...>>` here for simplicity, but could also define our
    // own `Future` type to avoid the overhead
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            // same implementation as we had before
            if request.path() == "/" {
                Ok(HttpResponse::ok("Hello, World!"))
            } else if request.path() == "/important-data" {
                let some_data = fetch_data_from_database().await?;
                Ok(make_response(some_data))
            } else {
                Ok(HttpResponse::not_found())
            }
        })
    }
}

支持超时如何?请记住,我们的目标是让我们能够将不同的功能组合在一起,而不需要修改每个单独的部分。

如果我们像这样定义一个通用的超时结构,会怎么样呢:

struct Timeout<T> {
    // T will be some type that implements `Handler`
    inner_handler: T,
    duration: Duration,
}

然后我们可以为 Timeout<T> 实现 Handler,并委托给 THandler 实现:

impl<T> Handler for Timeout<T>
where
    T: Handler,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            let result = tokio::time::timeout(
                self.duration,
                self.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

这里重要的一行是 self.inner_handler.call(request)。这就是我们委托给内部处理程序并让它做它的事情的地方。我们不知道它是什么,我们只知道它完成后会产生一个 Result<HttpResponse, Error>

但这段代码并没有完全被编译。我们得到一个这样的错误:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/lib.rs:145:29
    |
144 |       fn call(&mut self, request: HttpRequest) -> Self::Future {
    |               --------- this data with an anonymous lifetime `'_`...
145 |           Box::pin(async move {
    |  _____________________________^
146 | |             let result = tokio::time::timeout(
147 | |                 self.duration,
148 | |                 self.inner_handler.call(request),
...   |
155 | |             }
156 | |         })
    | |_________^ ...is captured here, requiring it to live as long as `'static`

问题是,我们正在捕获一个 &mut self,并将其移入一个异步块。这意味着我们的 future 的生命周期与 &mut self 的生命周期相关。这对我们来说并不适用,因为我们可能想在多个线程上运行我们的响应 future 以获得更好的性能,或者产生多个响应 future 并将它们全部并行运行。如果对处理程序的引用存在于futures中,这是不可能的。

相反,我们需要将 &mut self 转换为拥有的 self。这正是 Clone 所做的。

// this must be `Clone` for `Timeout<T>` to be `Clone`
#[derive(Clone)]
struct RequestHandler;

impl Handler for RequestHandler {
    // ...
}

#[derive(Clone)]
struct Timeout<T> {
    inner_handler: T,
    duration: Duration,
}

impl<T> Handler for Timeout<T>
where
    T: Handler + Clone,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        // Get an owned clone of `&mut self`
        let mut this = self.clone();

        Box::pin(async move {
            let result = tokio::time::timeout(
                this.duration,
                this.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

注意,在这种情况下,克隆是非常廉价的,因为 RequestHandler 没有任何数据,而 Timeout<T> 只增加了一个 Duration(这个会被 Copy)。

又近了一步。我们现在得到一个不同的错误:

error[E0310]: the parameter type `T` may not live long enough
   --> src/lib.rs:149:9
    |
140 |   impl<T> Handler for Timeout<T>
    |        - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | /         Box::pin(async move {
150 | |             let result = tokio::time::timeout(
151 | |                 this.duration,
152 | |                 this.inner_handler.call(request),
...   |
159 | |             }
160 | |         })
    | |__________^ ...so that the type `impl Future` will meet its required lifetime bounds

TBD

内容出处: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait

欢迎Alice Ryhl成为Tokio首位付费贡献者

Tokio将资助 Alice Ryhl 将更多时间投入到Tokio

感谢 Fly.io、Discord、Embark 等赞助商,我们很高兴地宣布,Tokio将资助 Alice Ryhl 将更多时间投入到Tokio。这是一个支付贡献者为Tokio工作的计划的开始,通过Github赞助的捐款资助。

建立一个像Tokio这样的库是一项不朽的任务。自从2016年发布该项目以来,已经花了很多时间来实验、完善和优化API和运行时间。如果没有数以百计的志愿者和公司,如Mozilla、Dropbox、Buoyant和AWS,资助工程时间来建立Tokio,这项工作是不可能的。然而,我们只是处在Rust和异步I/O的旅程的开始。我们希望增加对 io-uring 的支持,改善对windows的支持,并增加功能以改善调试、分析和测试Tokio应用程序。

为了实现我们的目标并确保 Tokio 的长寿,我们必须建立和支持一个健康和多样化的贡献者基础,这些贡献者涵盖了广泛的用例、技能和分支机构。开源的可持续性是一个热门话题。今天,AWS和Buoyant雇用工程师为Tokio工作,并希望看到这个数字的增长。我们重视这种企业贡献,希望看到其他公司加入。不过,我们也知道,雇佣工程师全职专注于开源工作,对很多人来说是成本高昂的。我们需要增加为Tokio定期奉献时间的贡献者的数量,以及致力于支持Tokio的公司的数量,以跟上一个不断增长的生态系统所带来的维护费用,同时也提供新的功能。

在这种情况下,Tokio项目将开始向选定的贡献者支付报酬,使他们能够花更多的时间为Tokio作出贡献。这个项目将从小处着手,因为这对我们来说是个新事物,我们希望能解决一些问题。Alice Ryhl将兼职为Tokio工作。如果你参加过我们的问题跟踪器或对话频道,你可能已经熟悉Alice了。一年多来,她一直是回答问题、回应问题、审查和合并拉动请求的第一线。她还不断地修复错误,增加用户要求的新功能。作为这个项目的参与者,她将做更多同样的工作,并有更多的时间来帮助培养新的贡献者。我们认为这是一个发展贡献者社区的机会。

那么,我们将如何资助这个项目呢?这就是你们的作用! 我们要求公司通过Github赞助来捐款。如果你工作的地方依赖Tokio,请向你的团队提出这个项目并考虑参与。这些捐款将有助于维持你所使用的项目。

我们已经筹集到足够的资金开始支付Alice,这要感谢我们最初的捐助者。我们扩大这个项目的速度将取决于可用的资金,以及能够找到合适的候选人。候选人将是那些已经成功为Tokio做出贡献的人,并且在资金的支持下能够做出更大的贡献。

我们对这个项目感到非常兴奋。我们认为它是一个使贡献更容易获得的机制,这将导致所有用户的Tokio更加可持续。如果你有任何问题或想参与,请在Tokio Discord频道与我们联系,并请考虑赞助Tokio。

内容出处: https://tokio.rs/blog/2021-04-welcome-alice

通过自动合作任务的产生来减少尾部延迟

Console是一个Rust异步调试工具。它的目标是让你在试图更好地理解你的异步任务的行为方式时,成为你所要使用的工具。

Tokio 是一个异步 Rust 应用程序的运行时。它允许使用 async & await 语法编写代码。比如说:

let mut listener = TcpListener::bind(&addr).await?;

loop {
    let (mut socket, _) = listener.accept().await?;

    tokio::spawn(async move {
        // handle socket
    });
}

Rust编译器将这些代码转换为状态机。Tokio 运行时执行这些状态机,在少数线程上复用许多任务。Tokio的调度器要求生成任务的状态机将控制权交还给调度器,以便复用任务。每个 .await 调用都是一个向调度器回馈(yield back)的机会。在上面的例子中,listener.accept().await 将返回一个等待中的套接字。如果没有待处理的套接字,控制权就会交还给调度器。

这个系统在大多数情况下运行良好。然而,当系统处于负载状态时,异步资源有可能始终处于准备状态。例如,考虑一个 echo 服务器:

tokio::spawn(async move {
    let mut buf = [0; 1024];

    loop {
        let n = socket.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        // Write the data back
        socket.write(buf[..n]).await?;
    }
});

如果收到的数据比处理的速度快,那么当一个数据块的处理完成时,有可能已经收到更多的数据了。在这种情况下,.await 将永远不会把控制权交还给调度器,其他任务将不会被调度,导致饥饿和大的延迟差异。

目前,这个问题的答案是,Tokio的用户要负责在应用程序和库中添加让出点(yield points)。在实践中,很少有人真正这样做,最终容易出现这种问题。

解决这个问题的一个常见办法是抢占(preemption)。对于正常的操作系统线程,内核会每隔一段时间就中断执行,以确保所有线程的公平调度。对执行有完全控制权的运行时(Go、Erlang等)也会使用抢占来确保任务的公平调度。这是通过在编译时注入让出点来实现的,让出点是指检查任务是否已经执行了足够长的时间,如果是,则让出于调度器的代码。不幸的是,Tokio不能使用这种技术,因为Rust的异步生成器没有为执行器(如Tokio)提供任何机制来注入这种让出点。

每个任务的操作预算

尽管Tokio不能抢占,但仍有机会促使一个任务让出调度器中。从0.2.14开始,每个Tokio任务都有一个操作预算。这个预算在调度器切换到任务时被重置。每个 Tokio 资源(套接字、定时器、通道…)都知道这个预算。只要任务有剩余的预算,资源就会像以前那样运行。每个异步操作(用户必须等待的操作)都会减少任务的预算。一旦任务超出预算,所有Tokio资源将永远返回 “未准备好”,直到任务返回到调度器。在这一点上,预算被重置,未来的Tokio资源的.await 将再次正常运行。

让我们回到上面的 echo 服务器的例子。当任务被调度时,它被分配的预算是每 “tick” 128个操作。选择128这个数字主要是因为它感觉很好,而且在我们测试的情况下(Noria和HTTP)似乎效果很好。当 socket.read(..)socket.write(..) 被调用时,预算被递减。如果预算为零,任务就会返回到调度器。如果由于底层套接字没有准备好(没有待处理的数据或发送缓冲区已满),读或写都不能进行,那么任务也会返回到调度器。

这个想法源于我和 Ryan Dahl 的一次谈话。他正在使用 Tokio 作为 Deno 的底层运行时。在前段时间用 Hyper 做一些 HTTP 实验的时候,他在一些基准测试中看到一些高的尾部延迟。这个问题是由于一个循环在负载下没有让出给调度器。在这种情况下,Hyper最终通过手工解决了这个问题,但Ryan提到,当他在 node.js 工作时,他们通过增加每个资源的限制来处理这个问题。所以,如果一个TCP套接字总是准备好的,它就会每隔一段时间就强制让出一次。我向 Jon Gjenset 提到了这个对话,他想出了将限制放在任务本身而不是每个资源上的想法。

最终的结果是,Tokio 应该能够在负载下提供更一致的运行时行为。虽然确切的启发式方法很可能会随着时间的推移而调整,但最初的测量表明,在某些情况下,尾部延迟几乎减少了3倍。

73222456-4a103300-4131-11ea-9131-4e437ecb9a04 2

“master” 是在自动让出之前,“preempt” 是在之后。点击查看大图,更多细节请参见原始PR评论。

关于阻塞的说明

虽然自动合作任务的让出在许多情况下提高了性能,但它不能抢占任务。Tokio的用户仍然必须注意避免CPU密集型工作和阻塞的API。spawn_blocking 函数可以用来 “异步化” 这些任务,在允许阻塞的线程池中运行它们。

Tokio不会,也不会试图检测阻塞的任务,并通过在调度器中添加线程来自动进行补偿。这个问题在过去已经出现过很多次了,所以请允许我详细说明。

就背景而言,我们的想法是让调度器包括一个监控线程。这个线程每隔一段时间就会轮询调度器线程,并检查工作者是否在取得进展。如果一个工作者没有取得进展,就会认为该工作者正在执行一个阻塞任务,并应生成一个新的线程来进行补偿。

这个想法并不新鲜。据我所知,这种策略的第一次出现是在 .NET 线程池中,而且是在10多年前引入的。不幸的是,这个策略有很多问题,正因为如此,它没有在其他线程池/调度器(Go、Java、Erlang等)中出现。

第一个问题是很难定义 “进度”。对进度的一个天真的定义是,一个任务是否已经被调度到某个时间单位以上。例如,如果一个工作者在调度同一个任务时被卡住超过100ms,那么这个工作者就会被标记为阻塞,并生成一个新的线程。在这个定义中,如何检测催生新线程会降低吞吐量的情况?这可能发生在调度器普遍处于负载状态的时候,增加线程会使情况变得更加糟糕。为了解决这个问题,.NET线程池采用爬坡法。这篇文章对它的工作原理做了很好的概述。

第二个问题是,任何自动检测策略都容易受到突发性或其他不平衡工作负载的影响。这个具体问题一直是.NET线程池的祸根,被称为 “停顿” 问题。爬坡策略需要一定的时间(数百毫秒)来适应负载变化。这段时间的需要,部分是为了能够确定增加线程是在改善情况,而不是使情况恶化。

停顿问题可以用.NET线程池来管理,部分原因是线程池被设计用来安排粗略的任务,即执行时间在几百毫秒到几十秒的任务。然而,在Rust中,异步任务调度器被设计用来调度那些最多应该在微秒到几十毫秒内运行的任务。在这种情况下,任何基于启发式调度器的呆滞问题都会导致更大的延迟变化。

在这之后,我收到的最常见的后续问题是 “Go调度器不会自动检测阻塞的任务吗?"。简短的回答是:没有。这样做会导致上面提到的同样的卡顿问题。而且,Go没有必要进行通用的阻塞任务检测,因为Go能够抢占。Go调度器所做的是注释潜在的阻塞性系统调用。这大致等同于Tokio的 block_in_place

简而言之,截至目前,刚刚介绍的自动合作任务让出策略是我们发现的减少尾部延迟的最佳方法。因为这个策略只需要Tokio的类型选择加入,终端用户不需要改变任何东西就可以获得这个好处。只需升级Tokio的版本就可以包括这个新功能。另外,如果从Tokio运行时之外使用Tokio的类型,它们的行为将和以前一样。

在这个问题上还有很多工作要做。目前还不清楚任务预算应该如何与 “子表程序”(如 FuturesUnordered)一起工作。任务预算API最终应该公开,以便第三方软件可以与之集成。如果能想出一个办法来普及这个概念,那么不仅仅是Tokio的用户可以利用这个概念,那也是很好的。

我们希望你在这次发布后发现你的尾部延迟有所改善。无论怎样,我们都有兴趣听到这个变化对现实世界的部署有什么影响。欢迎对这个问题发表评论。

—Carl Lerche

内容出处: https://tokio.rs/blog/2020-04-preemption