追踪Tokio动态
Tokio动态
- Tokio官方博客
- 控制台开发日记(1)
- Axum宣布
- 宣布 tokio-uring:Tokio 的 io-uring 支持
- 宣布Tonic 0.5版本
- 宣布tower-http:HTTP特定中间件和实用程序的集合
- 宣布Valuable:一个提供对象安全的值检查的类库
- 发明Service特质
- 欢迎Alice Ryhl成为Tokio首位付费贡献者
- 通过自动合作任务的产生来减少尾部延迟
- Alice Ryhl个人博客
Tokio官方博客
控制台开发日记(1)
自从几个月前开始制作 Tokio Console 原型以来,我们一直在努力工作,把它变成一个很棒的异步任务调试器。我们想围绕它的进展情况提供一些集中的更新。
控制台是什么?
Console 是一个Rust异步调试工具。它的目标是让你在试图更好地理解你的异步任务的行为方式时,成为你所要使用的工具。它利用了 tracing
event 和 span,因此旨在实现运行时的无关性。
更新
易于启动和生成器: 我们使向你的应用程序添加仪器变得更加容易。对于大多数人来说,只需在主函数的顶部添加 console_subscriber::init()
就足够了!它将使用合理的默认值,检查一些环境变量进行定制,并建立一个独立的订阅器。如果你需要更多的控制与现有的跟踪或运行时集成,也有一个方便的 console_subscriber::Builder
API。
一切都更漂亮了! 主列表视图看起来好多了。我们把任务的 “name” 变成了一等一的东西,在列表里有了自己的栏目。任务ID变得更漂亮,与用户期望的更一致。颜色和UTF-8的使用更好了,它默认检查终端支持的内容。例如,显示持续时间的字段对不同的量级(纳秒与毫秒与秒)使用微妙的不同颜色。
**选择任务来查看"任务细节"视图。**这包括关于该任务的更多细节和指标。有一个任务所有轮询时间的柱状图,让你看到你的任务需要多长时间才能完成工作。还有关于一个任务被唤醒多少次的信息,你可以将其与轮询次数以及自上次唤醒后的时间进行比较。
时间性。在与用户交流后,我们优先考虑了控制台的一些 “时间控制” 功能。到目前为止,我们已经实现了暂停控制台的功能(仍然可以探索现有的任务),然后恢复到 “活动”。现在有一个选项可以将所有相关事件记录到磁盘上的一个文件中,目的是能够在控制台中重放该文件。
视频演示。我们制作了一个演示,展示了控制台,以及如何使用它来调试一些常见的任务错误行为。
感谢:
- Eliza Weisman
- Sean McArthur
- Zahari Dichev
- Oğuz Bilgener
- @gneito
- @memoryruins
- Jacob Rothstein
- Artem Vorotnikov
- David Barsky
- Wu Aoxiang
我们也要感谢你们在你们的应用中一直在尝试它,并给我们提供了宝贵的反馈意见。
Axum宣布
今天,我们很高兴地宣布 axum
:一个易于使用,但功能强大的网络框架,旨在充分利用 Tokio 的生态系统。
高级特性
- 通过一个不使用宏(macro free?)的API将请求路由到处理程序
- 使用提取器(extractor)对请求进行声明式的解析
- 简单和可预测的错误处理模式。
- 用最少的模板生成响应。
- 充分利用
tower
和tower-http
的中间件、服务和工具的生态系统
特别是最后一点,是 axum 与现有框架不同的地方。axum 没有自己的中间件系统,而是使用tower::Service
。这意味着 axum 可以免费获得超时、跟踪、压缩、授权等功能。它还可以让你与使用 hyper
或 tonic
编写的应用程序共享中间件。
使用示例
axum 的 “hello world” 是这样的:
use axum::prelude::*;
use std::net::SocketAddr;
#[tokio::main]
async fn main() {
let app = route("/", get(root));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
hyper::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn root() -> &'static str {
"Hello, World!"
}
对 GET/
的请求响应是 200 OK
,其中正文是 Hello, World!
。任何其他请求将导致 404 Not Found
响应。
提取器
请求可以使用 “提取器/extractor” 进行声明式的解析。提取器是一个实现了 FromRequest
的类型。提取器可以作为处理程序的参数,如果请求的URI匹配,就会运行。
例如,Json
是一个提取器,它消耗请求主体并将其解析为JSON
:
use axum::{prelude::*, extract::Json};
use serde::Deserialize;
#[derive(Deserialize)]
struct CreateUser {
username: String,
}
async fn create_user(Json(payload): Json<CreateUser>) {
// `payload` is a `CreateUser`
}
let app = route("/users", post(create_user));
axum
提供了许多有用的提取器,例如:
Bytes
,String
,Body
, 和BodyStream
用于获取请求正文Method
,HeaderMap
, 和Uri
用于获取请求的特定部分Form
,Query
,UrlParams
, 和UrlParamsMap
用于更高级别的请求解析Extension
用于跨处理程序共享状态的扩展Request<hyper::Body>
如果你想完全控制Result<T, E>
andOption<T>
使提取器成为可选
你也可以通过实现 FromRequest
来定义你自己的提取器。
构建响应
处理程序可以返回任何实现了 IntoResponse
的东西,它将被自动转换为响应:
use http::StatusCode;
use axum::response::{Html, Json};
use serde_json::{json, Value};
// We've already seen returning &'static str
async fn text() -> &'static str {
"Hello, World!"
}
// String works too
async fn string() -> String {
"Hello, World!".to_string()
}
// Returning a tuple of `StatusCode` and another `IntoResponse` will
// change the status code
async fn not_found() -> (StatusCode, &'static str) {
(StatusCode::NOT_FOUND, "not found")
}
// `Html` gives a content-type of `text/html`
async fn html() -> Html<&'static str> {
Html("<h1>Hello, World!</h1>")
}
// `Json` gives a content-type of `application/json` and works with any type
// that implements `serde::Serialize`
async fn json() -> Json<Value> {
Json(json!({ "data": 42 }))
}
这意味着在实践中,你很少需要建立你自己的响应。你也可以实现 IntoResponse
来创建你自己的特定领域响应。
路由
可以使用一个简单的 DSL 来组合多个路由。
use axum::prelude::*;
let app = route("/", get(root))
.route("/users", get(list_users).post(create_user))
.route("/users/:id", get(show_user).delete(delete_user));
中间件
axum 支持来自 tower
和 tower-http
的中间件。
use axum::prelude::*;
use tower_http::{compression::CompressionLayer, trace::TraceLayer};
use tower::ServiceBuilder;
use std::time::Duration;
let middleware_stack = ServiceBuilder::new()
// timeout all requests after 10 seconds
.timeout(Duration::from_secs(10))
// add high level tracing of requests and responses
.layer(TraceLayer::new_for_http())
// compression responses
.layer(CompressionLayer::new())
// convert the `ServiceBuilder` into a `tower::Layer`
.into_inner();
let app = route("/", get(|| async { "Hello, World!" }))
// wrap our application in the middleware stack
.layer(middleware_stack);
这个功能很关键,因为它允许我们只写一次中间件,并在不同的应用中分享它们。例如,axum
不需要提供自己的 tracing/logging 中间件,可以直接使用来自 tower-http
的 TraceLayer
。同样的中间件也可以用于用 tonic 制作的客户端或服务器。
路由到任何 tower::Service
axum
也可以将请求路由到任何 tower 服务。可以是你用 service_fn
编写的服务,也可以是来自其他 crate 的东西,比如来自 tower-http
的ServeFile
:
use axum::{service, prelude::*};
use http::Response;
use std::convert::Infallible;
use tower::{service_fn, BoxError};
use tower_http::services::ServeFile;
let app = route(
// Any request to `/` goes to a some `Service`
"/",
service::any(service_fn(|_: Request<Body>| async {
let res = Response::new(Body::from("Hi from `GET /`"));
Ok::<_, Infallible>(res)
}))
).route(
// GET `/static/Cargo.toml` goes to a service from tower-http
"/static/Cargo.toml",
service::get(ServeFile::new("Cargo.toml"))
);
了解更多
这只是 axum
提供的一个小例子。错误处理、网络套接字和解析 multipart/form-data
请求是这里没有显示的一些功能。请参阅文档以了解更多细节。
我们也鼓励您查看软件库中的例子,看看一些用 axum
编写的稍大的应用程序。
宣布 tokio-uring:Tokio 的 io-uring 支持
今天,我们发布了 “tokio-uring” crate的第一个版本,为Linux上的 io-uring
系统API提供支持。这个版本提供了异步文件操作,我们将在后续版本中增加对更多操作的支持。
要使用 tokio-uring
,首先要在 crate 上添加一个依赖项:
tokio-uring = "0.1.0"
然后,启动 tokio-uring
运行时 ,并从文件中读取:
use tokio_uring::fs::File;
fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio_uring::start(async {
// Open a file
let file = File::open("hello.txt").await?;
let buf = vec![0; 4096];
// Read some data, the buffer is passed by ownership and
// submitted to the kernel. When the operation completes,
// we get the buffer back.
let (res, buf) = file.read_at(buf, 0).await;
let n = res?;
// Display the contents
println!("{:?}", &buf[..n]);
Ok(())
})
}
tokio-uring
运行时在底下使用Tokio运行时,所以它与Tokio类型和库(如hyper和tonic)兼容。下面是和上面一样的例子,但我们不是写到STDOUT,而是写到一个Tokio TCP套接字。
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio_uring::fs::File;
fn main() {
tokio_uring::start(async {
// Start a TCP listener
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
// Accept new sockets
loop {
let (mut socket, _) = listener.accept().await.unwrap();
// Spawn a task to send the file back to the socket
tokio_uring::spawn(async move {
// Open the file without blocking
let file = File::open("hello.txt").await.unwrap();
let mut buf = vec![0; 16 * 1_024];
// Track the current position in the file;
let mut pos = 0;
loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let n = res.unwrap();
if n == 0 {
break;
}
socket.write_all(&b[..n]).await.unwrap();
pos += n as u64;
buf = b;
}
});
}
});
}
所有的 tokio-uring
操作都是真正的异步,与 tokio::fs
提供的API不同,后者在线程池上运行。从线程池中使用同步的文件系统操作会增加大量的开销。有了io-uring
,我们可以在同一个线程中异步地执行网络和文件系统操作。但是,io-uring
的内容很多。
Tokio目前的Linux实现使用非阻塞系统调用和 epoll
来进行事件通知。使用 epoll
,一个经过调整的TCP代理将花费70%到80%的CPU周期在用户空间之外,包括执行系统调用和在内核和用户空间之间复制数据的周期。Io-uring
通过消除大多数系统调用来减少开销,对于某些操作,提前映射用于字节缓冲区的内存区域。早期将 io-uring
与 epoll
进行比较的基准是有希望的;用C语言实现的TCP echo客户端和服务器显示了高达60%的改进。
最初的 tokio-uring
版本提供了一套适度的API,但我们计划在未来的版本中增加对 io-uring
的所有功能的支持。请看设计文件以了解我们的发展方向。
所以,请尝试一下这个 crate,并随时提出问题或报告问题。
另外,我们要感谢所有在这一过程中提供帮助的人,特别是Glauber Costa(Glommio的作者),他耐心地回答了我的许多问题,withoutboats最初的探索(Ringbahn)和花时间与我讨论设计问题,以及 quininer
在纯Rust `io-uring 绑定上的出色工作。
宣布Tonic 0.5版本
我们很高兴地宣布 Tonic 的0.5版本,它是 Rust 的原生 gRPC 实现。0.5是一个重要的版本,已经酝酿了一段时间。
一些关键的新功能是:
gRPC-Web
gRPC-Web 是一个协议,允许客户端通过 HTTP/1.1 连接到 gRPC 服务,而不是通常的HTTP/2。gRPC-Web 的一个常见用例是在浏览器中运行的JavaScript客户端。以前,这需要使用一个代理来将 HTTP/2 请求翻译成 HTTP/1.1。
然而,新的 crate tonic-web
允许普通的 Tonic 服务器接受 gRPC-Web
请求,而不需要外部代理。
启用gRPC-Web支持很简单:
use tonic::transport::Server;
// code generated by tonic-build
use hello_world::greeter_server::{GreeterServer, Greeter};
struct MyGreeter;
#[tonic::async_trait]
impl Greeter for MyGreeter {
// ...
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:3000".parse().unwrap();
let greeter = GreeterServer::new(MyGreeter);
// enable grpc-web support for our `greeter` service
let service = tonic_web::enable(greeter);
Server::builder()
// by default, tonic servers only accept http2 requests
// so we have to enable receiving http1 as well
.accept_http1(true)
.add_service(service)
.serve(addr)
.await?;
Ok(())
}
更多细节请见tonic-web crate。
压缩
Tonic现在可以透明地压缩和解压请求、响应和数据流。
在客户端上启用压缩功能是这样做的:
let client = GreeterClient::new(channel)
// compress requests
.send_gzip()
/// accept compressed responses
.accept_gzip();
而在服务器上:
let service = GreeterServer::new(greeter)
// accept compressed requests
.accept_gzip()
// compress responses, if supported by the client
.send_gzip();
注意这需要在 Tonic
和 tonic-build
上启用 compression
功能。更多细节请参见文档。
改进的Tower集成
Tonic 一直支持通过 Tower 的 Service trait 来扩展客户端和服务器,但在0.5版本中,新的 Server::layer
方法使其更加简单。例如,我们可以通过使用来自 tower-http
的中间件来为一个服务添加追踪和授权:
use tonic::transport::Server;
use tower_http::{
auth::RequireAuthorizationLayer,
trace::TraceLayer,
};
// The stack of middleware our service will be wrapped in
let layer = tower::ServiceBuilder::new()
// High level tracing of requests and responses
.layer(TraceLayer::new_for_grpc())
// Authorize all requests using a token
.layer(RequireAuthorizationLayer::bearer("my-secret-token"))
// Convert our `ServiceBuilder` into a `tower::Layer`
.into_inner();
Server::builder()
// Apply our middleware stack to the server
.layer(layer)
.add_service(GreeterServer::new(MyGreeter)
.serve(addr)
.await?;
更灵活的拦截器
拦截器是轻量级的中间件,除其他外,可以用来修改传入请求的元数据,也可以选择用状态拒绝它们。
然而,Tonic对拦截器的支持一直是相当有限的。例如,你不能结合多个拦截器,而必须使用Tower的 service
抽象,它更强大,但也需要更多的代码来设置。
在Tonic 0.5中,我们有一个新的拦截器API,和以前一样容易使用,但现在内部使用Tower。这意味着拦截器可以像其他Tower中间件一样被应用。例如,在一个客户端添加多个拦截器,现在可以用:
use tonic::{
Request, Status,
service::interceptor_fn,
transport::Endpoint
};
fn intercept_one(req: Request<()>) -> Result<Request<()>, Status> {
// ...
}
fn intercept_two(req: Request<()>) -> Result<Request<()>, Status> {
// ...
}
let channel = Endpoint::from_static("http://[::1]:50051").connect_lazy()?;
let intercepted_channel = tower::ServiceBuilder::new()
.layer(interceptor_fn(intercept_one))
.layer(interceptor_fn(intercept_two))
.service(channel);
let client = GreeterClient::new(intercepted_channel);
0.5包括许多其他较小的功能和改进。更新日志里有所有的细节。
如同以往,如果你有问题,你可以在Tokio Discord服务器的#tonic找到我们。
宣布tower-http:HTTP特定中间件和实用程序的集合
今天我很高兴地宣布tower-http,它是一个用Tower的 service trait 构建的HTTP特定中间件和实用程序的集合。
Tower本身包含的中间件都是与协议无关的。例如,它的超时中间件与任何服务实现兼容,无论它使用哪种协议。这很好,因为它意味着中间件更可重用,但也意味着你不能使用协议的特定功能。在HTTP的情况下,这意味着Tower中的中间件不知道状态码、头信息或其他HTTP的特定功能。
另一方面,tower-http
包含了针对HTTP的中间件。它使用 http
和 http-body
crate,这意味着它与任何使用这些板块的板块兼容,例如 hyper、tonic 和 warp。
tower-http 的目标是提供一套丰富的中间件来解决构建 HTTP 客户端和服务器时的常见问题。一些亮点是:
-
追踪:轻松地在你的应用程序中添加高级别的 tracing/logging。支持通过状态代码以及 gRPC 特定的头信息来确定成功或失败。有很好的默认值,但也支持深度定制。
-
压缩和解压:自动压缩或解压响应体。这与使用
ServeDir
的静态文件的服务非常吻合。 -
FollowRedirect:自动跟踪重定向响应。
还有一些小工具,如设置请求和响应头,从日志中隐藏敏感头,授权等。
用tower-http中的东西构建一个小服务器看起来像这样:
use tower_http::{
compression::CompressionLayer,
auth::RequireAuthorizationLayer,
trace::TraceLayer,
};
use tower::{ServiceBuilder, make::Shared};
use http::{Request, Response};
use hyper::{Body, Error, server::Server};
use std::net::SocketAddr;
// Our request handler. This is where we would implement the application logic
// for responding to HTTP requests...
async fn handler(request: Request<Body>) -> Result<Response<Body>, Error> {
Ok(Response::new(Body::from("Hello, World!")))
}
#[tokio::main]
async fn main() {
// Use `tower`'s `ServiceBuilder` API to build a stack of middleware
// wrapping our request handler.
let service = ServiceBuilder::new()
// High level tracing of requests and responses.
.layer(TraceLayer::new_for_http())
// Compress responses.
.layer(CompressionLayer::new())
// Authorize requests using a token.
.layer(RequireAuthorizationLayer::bearer("tower-is-cool"))
// Wrap a `Service` in our middleware stack.
.service_fn(handler);
// And run our service using `hyper`.
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
Server::bind(&addr)
.serve(Shared::new(service))
.await
.expect("server error");
}
而建立一个看起来是这样的客户:
use tower_http::{
decompression::DecompressionLayer,
set_header::SetRequestHeaderLayer,
};
use tower::ServiceBuilder;
use hyper::Body;
use http::{Request, Response, HeaderValue, header::USER_AGENT};
let client = ServiceBuilder::new()
// Log failed requests
.layer(
TraceLayer::new_for_http()
.on_request(())
.on_response(())
.on_body_chunk(())
.on_eos(())
// leave the `on_failure` callback as the default
)
// Set a `User-Agent` header on all requests
.layer(SetRequestHeaderLayer::<_, Body>::overriding(
USER_AGENT,
HeaderValue::from_static("my-app")
))
// Decompress response bodies
.layer(DecompressionLayer::new())
// Wrap a `hyper::Client` in our middleware stack
.service(hyper::Client::new());
我们在文档中投入了大量的精力,确保每件事都有易于理解的例子。我们还建立了两个例子,一个用warp,另一个用tonic,向你展示如何把所有东西放在一起。
我们非常欢迎你的贡献,如果你有问题,你可以在Tokio Discord服务器找到我们。
宣布Valuable:一个提供对象安全的值检查的类库
在过去的几周里,我们一直在开发Valuable,一个提供对象安全的值检查的新crate。它几乎已经准备好发布了,所以我想我应该写一篇文章来介绍它。这个 crate 提供了一个对象安全的trait–Valuable,它允许调用者在不知道其类型的情况下检查值的内容,无论是字段、枚举变体,还是基础类型。最初,我们写Valuable是为了支持Tracing;然而,它在几种情况下是有帮助的。对象安全的值检查有点拗口,所以让我们先看看Tracing,以及为什么那里需要它。
Tracing 是一个用于检测Rust程序的框架,收集结构化的、基于事件的诊断信息。有些人认为它是一个结构化的日志框架,虽然它可以满足这个用例,但它可以做更多。例如,Console 的目标是成为调试异步Rust应用程序的强大工具,并使用 Tracing 作为其支柱。Tokio 和其他库通过 Tracing 发出仪表信息。Console 将这些事件聚合到应用程序的执行模型中,使开发者能够深入了解错误和其他问题。
仪器化的应用程序发出带有丰富结构化数据的事件,而收集器则接收这些事件。当然,在编译时,被探测的应用程序和事件收集器并不了解对方。一个 trait 对象将仪器化的一半与收集器的一半连接起来,使收集器能够动态地注册自己。因此,将丰富的、结构化的数据从仪器化的一半传递给收集器需要通过 trait 对象的边界传递。今天,Tracing 在最低水平上支持这个,但不支持传递嵌套数据。
让我们来看看一个实际的用例。给定一个HTTP服务,在一个HTTP请求开始的时候,我们想发出一个包括相关HTTP头的追踪事件。这些数据可能看起来像这样:
{
user_agent: "Mozilla/4.0 (compatible; MSIE5.01; Windows NT)",
host: "www.example.com",
content_type: {
mime: "text/xml",
charset: "utf-8",
},
accept_encoding: ["gzip", "deflate"],
}
在应用程序中,Rust结构体存储头文件:
struct Headers {
user_agent: String,
host: String,
content_type: ContentType,
accept_encoding: Vec<String>,
}
struct ContentType {
mime: String,
charset: String,
}
我们想把这些数据传递给事件收集器,但如何传递呢?事件收集器不知道 Headers
结构,所以我们不能只是定义一个接收 &Headers
的方法。我们可以使用像 serde_json::Value
这样的类型来传递任意的结构化数据,但这需要从我们应用程序的结构体中分配和复制数据来交给收集器。
Valuable crate的目的就是要解决这个问题。在HTTP头的案例中,首先,我们要为我们的 Headers
类型实现 Valuable
。然后,我们可以将一个 &dyn Valuable
引用传递给事件收集器。采集器可以使用 Valuable
的访问者API来检查该值,并提取与其使用情况相关的数据。
// Visit the root of the Headers struct. This visitor will find the
// `accept_encoding` field on `Headers` and extract the contents. All other
// fields are ignored.
struct VisitHeaders {
/// The extracted `accept-encoding` header values.
accept_encoding: Vec<String>,
}
// Visit the `accept-encoding` `Vec`. This visitor iterates the items in
// the list and pushes it into its `accept_encoding` vector.
struct VisitAcceptEncoding<'a> {
accept_encoding: &'a mut Vec<String>,
}
impl Visit for VisitHeaders {
fn visit_value(&mut self, value: Value<'_>) {
// We expect a `Structable` representing the `Headers` struct.
match value {
// Visiting the struct will call `visit_named_fields`.
Value::Structable(v) => v.visit(self),
// Ignore other patterns
_ => {}
}
}
fn visit_named_fields(&mut self, named_values: &NamedValues<'_>) {
// We only care about `accept_encoding`
match named_values.get_by_name("accept_encoding") {
Some(Value::Listable(accept_encoding)) => {
// Create the `VisitAcceptEncoding` instance to visit
// the items in `Listable`.
let mut visit = VisitAcceptEncoding {
accept_encoding: &mut self.accept_encoding,
};
accept_encoding.visit(&mut visit);
}
_ => {}
}
}
}
// Extract the "accept-encoding" headers
let mut visit = VisitHeaders { accept_encoding: vec![] };
valuable::visit(&my_headers, &mut visit);
assert_eq!(&["gzip", "deflate"], &visit.accept_encoding[..]);
注意访问者API如何让我们选择检查哪些数据。我们只关心 accept_encoding
值,所以这是我们唯一访问的字段。我们不访问 content_type
字段。
Valuable crate将每个值表示为 Value
枚举的一个实例。原始的 rust 类型被枚举出来,其他类型被分为可结构化(Structable)、可枚举(Enumerable)、可列表(Listable)或可映射(Mappable),由同名的 trait 表示。实现一个结构体或枚举体的 traits 通常是使用程序性宏来完成的;然而,它可能看起来像这样:
static FIELDS: &[NamedField<'static>] = &[
NamedField::new("user_agent"),
NamedField::new("host"),
NamedField::new("content_type"),
NamedField::new("accept_encoding"),
];
impl Valuable for Headers {
fn as_value(&self) -> Value<'_> {
Value::Structable(self)
}
fn visit(&self, visit: &mut dyn Visit) {
visit.visit_named_fields(&NamedValues::new(
FIELDS,
&[
Value::String(&self.user_agent),
Value::String(&self.host),
Value::Structable(&self.content_type),
Value::Listable(&self.accept_encoding),
]
));
}
}
impl Structable for Headers {
fn definition(&self) -> StructDef<'_> {
StructDef::new_static("Headers", Fields::Named(FIELDS))
}
}
请注意 visit 实现除了原始类型外没有复制任何数据。如果访问者不需要检查子字段,就不需要进一步的工作。
我们期望 Valuable
的作用不仅仅是跟踪。例如,在需要对象安全的时候,它对任何序列化都是有帮助的。Valuable
不是 Serde
的替代品,也不会提供反序列化的API。然而,Valuable
可以补充 Serde
,因为 Serde
的序列化API由于trait的相关类型而不是 trait-object
安全的(erased-serde
的存在可以解决这个问题,但需要为每个嵌套的数据结构分配)。一个 valuable-serde
crate 已经在进行中(感谢 taiki-e),为实现 Valuable 和 Serialize 的类型提供了一个桥梁。为了获得对象安全的序列化,派生 Valuable
而不是 Serialize
,并序列化 Valuable
trait 对象。
作为另一个潜在的用例,Valuable 可以在渲染模板时有效地提供数据。模板引擎在渲染模板时必须按需访问数据字段。例如,Handlebars
crate目前使用serde_json::Value
作为渲染时的参数类型,要求调用者将数据复制到 serde_json::Value
实例中。相反,如果 Handlebars 使用 Valuable,就会跳过复制的步骤。
现在我们需要你试一试Valuable,让我们知道它是否能满足你的使用情况。因为 Tracing 1.0 将依赖于Valuable,我们希望在2022年初稳定发布 Valuable 的1.0版本。这并没有给我们很多时间,所以我们需要尽早找到API漏洞。尝试使用 Valuable 编写库,特别是模板引擎或本帖所暗示的其他用例。我们还可以在 “桥梁” crate(例如 valuable-http)方面得到帮助,这些板块为生态系统中常见的数据类型提供宝贵的实现。我们还有很多工作要做,以扩展配置选项和其他功能的派生宏,所以请到Tokio discord服务器上的#valuable频道打招呼。
发明Service特质
Tower是一个模块化和可重复使用的组件库,用于构建强大的网络客户端和服务器。其核心是 Service
特性。Service
是一个异步函数,它接受一个请求并产生一个响应。然而,其设计的某些方面可能并不那么明显。与其解释今天存在于Tower中的 Service
特性,不如让我们想象一下如果你从头开始,你会如何发明 Service
背后的动机。
想象一下,你正在用Rust构建一个小小的HTTP框架。这个框架将允许用户通过提供接收请求并回复一些响应的代码来实现一个HTTP服务器。
你可能有一个这样的API:
// Create a server that listens on port 3000
let server = Server::new("127.0.0.1:3000").await?;
// Somehow run the user's application
server.run(the_users_application).await?;
问题是,the_users_application应该是什么?
最简单的可能是:
fn handle_request(request: HttpRequest) -> HttpResponse {
// ...
}
其中 HttpRequest
和 HttpResponse
是由我们的框架提供的一些结构体。
有了这个,我们可以像这样实现 Server::run
:
impl Server {
async fn run<F>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> HttpResponse,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// Call the handler provided by the user
let response = handler(request);
write_http_response(connection, response).await?;
}
}
}
在这里,我们有一个异步函数 run
,它接受一个接受 HttpRequest 并返回 HttpResponse 的闭包。
这意味着用户可以像这样使用我们的服务器:
fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!")
} else {
HttpResponse::not_found()
}
}
// Run the server and handle requests using our `handle_request` function
server.run(handle_request).await?;
这并不坏。它使用户可以很容易地运行HTTP服务器,而不必担心任何低层次的细节问题。
然而,我们目前的设计有一个问题:我们不能异步地处理请求。想象一下,我们的用户在处理请求的同时需要查询数据库或向其他一些服务器发送请求。目前,这需要在我们等待处理程序产生响应时进行阻塞。如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时能够为其他请求提供服务。让我们通过让处理程序函数返回一个 future 来解决这个问题:
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
// `handler` now returns a generic type `Fut`...
F: Fn(HttpRequest) -> Fut,
// ...which is a `Future` whose `Output` is an `HttpResponse`
Fut: Future<Output = HttpResponse>,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// Await the future returned by `handler`
let response = handler(request).await;
write_http_response(connection, response).await?;
}
}
}
使用这个API和以前非常相似:
// Now an async function
async fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!")
} else if request.path() == "/important-data" {
// We can now do async stuff in here
let some_data = fetch_data_from_database().await;
make_response(some_data)
} else {
HttpResponse::not_found()
}
}
// Running the server is the same
server.run(handle_request).await?;
这就好得多了,因为我们的请求处理现在可以调用其他异步函数。然而,仍然缺少一些东西。如果我们的处理程序遇到了错误,不能产生响应怎么办?让我们让它返回一个 Result
:
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> Fut,
// The response future is now allowed to fail
Fut: Future<Output = Result<HttpResponse, Error>>,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// Pattern match on the result of the response future
match handler(request).await {
Ok(response) => write_http_response(connection, response).await?,
Err(error) => handle_error_somehow(error, connection),
}
}
}
}
添加更多的行为
现在,假设我们想确保所有的请求及时完成或失败,而不是让客户端无限期地等待一个可能永远不会到达的响应。我们可以通过给每个请求添加一个超时来做到这一点。超时设置了一个处理程序允许的最大持续时间的限制。如果它在这个时间内没有产生一个响应,就会返回一个错误。这允许客户端重试该请求或向用户报告一个错误,而不是永远等待。
你的第一个想法可能是修改Server,使其可以配置一个超时。然后它在每次调用处理程序时都会应用这个超时。然而,事实证明,你实际上可以在不修改Server的情况下添加一个超时。使用 tokio::time::timeout
,我们可以做一个新的处理函数,调用之前的 handle_request
,但超时时间为 30 秒。
async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
let result = tokio::time::timeout(
Duration::from_secs(30),
handle_request(request)
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout_elapsed) => Err(Error::timeout()),
}
}
这提供了一个相当好的关注点分离。我们能够在不改变任何现有代码的情况下增加一个超时功能。
让我们用这种方式再增加一个功能。想象一下,我们正在构建一个JSON API,因此希望在所有的响应上都有一个 Content-Type: application/json
头。我们可以用类似的方式包装 handler_with_timeout
,并像这样修改响应:
async fn handler_with_timeout_and_content_type(
request: HttpRequest,
) -> Result<HttpResponse, Error> {
let mut response = handler_with_timeout(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
}
我们现在有了一个处理程序,它可以处理一个HTTP请求,时间不超过30秒,并且总是有正确的Content-Type头,所有这些都不用修改我们原来的handle_request函数或Server结构。
设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过分层的新行为来扩展库的功能,而不必等待库的维护者为其添加支持。
它也使测试更容易,因为你可以将你的代码分解成小的隔离单元,并为它们编写细粒度的测试,而不必担心所有其他的部分。
然而,有一个问题。我们目前的设计让我们通过将一个处理函数包裹在一个新的处理函数中来组成新的行为,该处理函数实现了该行为,然后调用内部函数。这很有效,但如果我们想增加很多额外的功能,它就不能很好地扩展。想象一下,我们有许多 handle_with_* 函数,每个函数都增加了一点新的行为。要硬编码哪一个中间处理程序调用哪一个,将变得很有挑战性。我们目前的链条是
handler_with_timeout_and_content_type
调用handler_with_timeout
调用handle_request
实际处理请求
如果我们能以某种方式组合(compose)这三个函数而不需要硬编码确切的顺序,那就更好了。比如说:
let final_handler = with_content_type(with_timeout(handle_request));
同时仍然能够像以前一样运行我们的处理程序:
server.run(final_handler).await?;
你可以尝试将 with_content_type
和 with_timeout
作为函数来实现,该函数接受一个 F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>>
类型的参数,并返回一个像 impl Fn(HttpRequest) -> impl Future<Output = Result<HttpResponse, Error>>
的闭包,但由于Rust今天允许 impl Trait
的限制,这实际上不可能。特别是 impl Fn() -> impl Future
是不允许的。使用 Box
是可能的,但这有一个我们想避免的性能代价。
除了调用处理程序之外,你也不能为其添加其他行为,但为什么有必要这样做,我们会再讨论。
Handler特性
让我们尝试另一种方法。与其说 Server::run
接受一个闭包(Fn(HttpRequest) -> ...
),不如说我们做一个新的trait来封装同样的 async fn(HttpRequest) -> Result<HttpResponse, Error>
:
trait Handler {
async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}
有了这样一个特性,我们就可以写出实现它的具体类型,这样我们就不用一直和 Fn
打交道了。
然而,Rust目前不支持异步trait方法,所以我们有两个选择:
-
让
call
返回一个封箱(boxed)的 future,如Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>
。这就是async-trait
箱的作用。 -
给
Handler
添加一个相关类型的Future
,这样用户就可以选择自己的类型。
让我们选择方案二,因为它是最灵活的。有具体的 future 类型的用户可以使用该类型,而不需要付出Box的代价,不关心的用户仍然可以使用Pin<Box<…>。
trait Handler {
type Future: Future<Output = Result<HttpResponse, Error>>;
fn call(&mut self, request: HttpRequest) -> Self::Future;
}
我们仍然必须要求 Handler::Future
实现 Future
,其输出类型为 Result<HttpResponse, Error>
,因为那是 Server::run
所要求的。
让 call
获取 &mut self
很有用,因为它允许处理程序在必要时更新其内部状态。
让我们把原来的 handle_request
函数转换成这个特性的实现。
struct RequestHandler;
impl Handler for RequestHandler {
// We use `Pin<Box<...>>` here for simplicity, but could also define our
// own `Future` type to avoid the overhead
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
// same implementation as we had before
if request.path() == "/" {
Ok(HttpResponse::ok("Hello, World!"))
} else if request.path() == "/important-data" {
let some_data = fetch_data_from_database().await?;
Ok(make_response(some_data))
} else {
Ok(HttpResponse::not_found())
}
})
}
}
支持超时如何?请记住,我们的目标是让我们能够将不同的功能组合在一起,而不需要修改每个单独的部分。
如果我们像这样定义一个通用的超时结构,会怎么样呢:
struct Timeout<T> {
// T will be some type that implements `Handler`
inner_handler: T,
duration: Duration,
}
然后我们可以为 Timeout<T>
实现 Handler
,并委托给 T
的 Handler
实现:
impl<T> Handler for Timeout<T>
where
T: Handler,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
let result = tokio::time::timeout(
self.duration,
self.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}
这里重要的一行是 self.inner_handler.call(request)
。这就是我们委托给内部处理程序并让它做它的事情的地方。我们不知道它是什么,我们只知道它完成后会产生一个 Result<HttpResponse, Error>
。
但这段代码并没有完全被编译。我们得到一个这样的错误:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/lib.rs:145:29
|
144 | fn call(&mut self, request: HttpRequest) -> Self::Future {
| --------- this data with an anonymous lifetime `'_`...
145 | Box::pin(async move {
| _____________________________^
146 | | let result = tokio::time::timeout(
147 | | self.duration,
148 | | self.inner_handler.call(request),
... |
155 | | }
156 | | })
| |_________^ ...is captured here, requiring it to live as long as `'static`
问题是,我们正在捕获一个 &mut self
,并将其移入一个异步块。这意味着我们的 future 的生命周期与 &mut self
的生命周期相关。这对我们来说并不适用,因为我们可能想在多个线程上运行我们的响应 future 以获得更好的性能,或者产生多个响应 future 并将它们全部并行运行。如果对处理程序的引用存在于futures中,这是不可能的。
相反,我们需要将 &mut self
转换为拥有的 self
。这正是 Clone
所做的。
// this must be `Clone` for `Timeout<T>` to be `Clone`
#[derive(Clone)]
struct RequestHandler;
impl Handler for RequestHandler {
// ...
}
#[derive(Clone)]
struct Timeout<T> {
inner_handler: T,
duration: Duration,
}
impl<T> Handler for Timeout<T>
where
T: Handler + Clone,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
// Get an owned clone of `&mut self`
let mut this = self.clone();
Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}
注意,在这种情况下,克隆是非常廉价的,因为 RequestHandler
没有任何数据,而 Timeout<T>
只增加了一个 Duration
(这个会被 Copy
)。
又近了一步。我们现在得到一个不同的错误:
error[E0310]: the parameter type `T` may not live long enough
--> src/lib.rs:149:9
|
140 | impl<T> Handler for Timeout<T>
| - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | / Box::pin(async move {
150 | | let result = tokio::time::timeout(
151 | | this.duration,
152 | | this.inner_handler.call(request),
... |
159 | | }
160 | | })
| |__________^ ...so that the type `impl Future` will meet its required lifetime bounds
TBD
内容出处: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait
欢迎Alice Ryhl成为Tokio首位付费贡献者
感谢 Fly.io、Discord、Embark 等赞助商,我们很高兴地宣布,Tokio将资助 Alice Ryhl 将更多时间投入到Tokio。这是一个支付贡献者为Tokio工作的计划的开始,通过Github赞助的捐款资助。
建立一个像Tokio这样的库是一项不朽的任务。自从2016年发布该项目以来,已经花了很多时间来实验、完善和优化API和运行时间。如果没有数以百计的志愿者和公司,如Mozilla、Dropbox、Buoyant和AWS,资助工程时间来建立Tokio,这项工作是不可能的。然而,我们只是处在Rust和异步I/O的旅程的开始。我们希望增加对 io-uring 的支持,改善对windows的支持,并增加功能以改善调试、分析和测试Tokio应用程序。
为了实现我们的目标并确保 Tokio 的长寿,我们必须建立和支持一个健康和多样化的贡献者基础,这些贡献者涵盖了广泛的用例、技能和分支机构。开源的可持续性是一个热门话题。今天,AWS和Buoyant雇用工程师为Tokio工作,并希望看到这个数字的增长。我们重视这种企业贡献,希望看到其他公司加入。不过,我们也知道,雇佣工程师全职专注于开源工作,对很多人来说是成本高昂的。我们需要增加为Tokio定期奉献时间的贡献者的数量,以及致力于支持Tokio的公司的数量,以跟上一个不断增长的生态系统所带来的维护费用,同时也提供新的功能。
在这种情况下,Tokio项目将开始向选定的贡献者支付报酬,使他们能够花更多的时间为Tokio作出贡献。这个项目将从小处着手,因为这对我们来说是个新事物,我们希望能解决一些问题。Alice Ryhl将兼职为Tokio工作。如果你参加过我们的问题跟踪器或对话频道,你可能已经熟悉Alice了。一年多来,她一直是回答问题、回应问题、审查和合并拉动请求的第一线。她还不断地修复错误,增加用户要求的新功能。作为这个项目的参与者,她将做更多同样的工作,并有更多的时间来帮助培养新的贡献者。我们认为这是一个发展贡献者社区的机会。
那么,我们将如何资助这个项目呢?这就是你们的作用! 我们要求公司通过Github赞助来捐款。如果你工作的地方依赖Tokio,请向你的团队提出这个项目并考虑参与。这些捐款将有助于维持你所使用的项目。
我们已经筹集到足够的资金开始支付Alice,这要感谢我们最初的捐助者。我们扩大这个项目的速度将取决于可用的资金,以及能够找到合适的候选人。候选人将是那些已经成功为Tokio做出贡献的人,并且在资金的支持下能够做出更大的贡献。
我们对这个项目感到非常兴奋。我们认为它是一个使贡献更容易获得的机制,这将导致所有用户的Tokio更加可持续。如果你有任何问题或想参与,请在Tokio Discord频道与我们联系,并请考虑赞助Tokio。
通过自动合作任务的产生来减少尾部延迟
Tokio
是一个异步 Rust 应用程序的运行时。它允许使用 async
& await
语法编写代码。比如说:
let mut listener = TcpListener::bind(&addr).await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// handle socket
});
}
Rust编译器将这些代码转换为状态机。Tokio 运行时执行这些状态机,在少数线程上复用许多任务。Tokio的调度器要求生成任务的状态机将控制权交还给调度器,以便复用任务。每个 .await 调用都是一个向调度器回馈(yield back)的机会。在上面的例子中,listener.accept().await
将返回一个等待中的套接字。如果没有待处理的套接字,控制权就会交还给调度器。
这个系统在大多数情况下运行良好。然而,当系统处于负载状态时,异步资源有可能始终处于准备状态。例如,考虑一个 echo 服务器:
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await?;
if n == 0 {
break;
}
// Write the data back
socket.write(buf[..n]).await?;
}
});
如果收到的数据比处理的速度快,那么当一个数据块的处理完成时,有可能已经收到更多的数据了。在这种情况下,.await
将永远不会把控制权交还给调度器,其他任务将不会被调度,导致饥饿和大的延迟差异。
目前,这个问题的答案是,Tokio的用户要负责在应用程序和库中添加让出点(yield points)。在实践中,很少有人真正这样做,最终容易出现这种问题。
解决这个问题的一个常见办法是抢占(preemption)。对于正常的操作系统线程,内核会每隔一段时间就中断执行,以确保所有线程的公平调度。对执行有完全控制权的运行时(Go、Erlang等)也会使用抢占来确保任务的公平调度。这是通过在编译时注入让出点来实现的,让出点是指检查任务是否已经执行了足够长的时间,如果是,则让出于调度器的代码。不幸的是,Tokio不能使用这种技术,因为Rust的异步生成器没有为执行器(如Tokio)提供任何机制来注入这种让出点。
每个任务的操作预算
尽管Tokio不能抢占,但仍有机会促使一个任务让出调度器中。从0.2.14开始,每个Tokio任务都有一个操作预算。这个预算在调度器切换到任务时被重置。每个 Tokio 资源(套接字、定时器、通道…)都知道这个预算。只要任务有剩余的预算,资源就会像以前那样运行。每个异步操作(用户必须等待的操作)都会减少任务的预算。一旦任务超出预算,所有Tokio资源将永远返回 “未准备好”,直到任务返回到调度器。在这一点上,预算被重置,未来的Tokio资源的.await
将再次正常运行。
让我们回到上面的 echo 服务器的例子。当任务被调度时,它被分配的预算是每 “tick” 128个操作。选择128这个数字主要是因为它感觉很好,而且在我们测试的情况下(Noria和HTTP)似乎效果很好。当 socket.read(..)
和 socket.write(..)
被调用时,预算被递减。如果预算为零,任务就会返回到调度器。如果由于底层套接字没有准备好(没有待处理的数据或发送缓冲区已满),读或写都不能进行,那么任务也会返回到调度器。
这个想法源于我和 Ryan Dahl 的一次谈话。他正在使用 Tokio 作为 Deno 的底层运行时。在前段时间用 Hyper 做一些 HTTP 实验的时候,他在一些基准测试中看到一些高的尾部延迟。这个问题是由于一个循环在负载下没有让出给调度器。在这种情况下,Hyper最终通过手工解决了这个问题,但Ryan提到,当他在 node.js 工作时,他们通过增加每个资源的限制来处理这个问题。所以,如果一个TCP套接字总是准备好的,它就会每隔一段时间就强制让出一次。我向 Jon Gjenset 提到了这个对话,他想出了将限制放在任务本身而不是每个资源上的想法。
最终的结果是,Tokio 应该能够在负载下提供更一致的运行时行为。虽然确切的启发式方法很可能会随着时间的推移而调整,但最初的测量表明,在某些情况下,尾部延迟几乎减少了3倍。
“master” 是在自动让出之前,“preempt” 是在之后。点击查看大图,更多细节请参见原始PR评论。
关于阻塞的说明
虽然自动合作任务的让出在许多情况下提高了性能,但它不能抢占任务。Tokio的用户仍然必须注意避免CPU密集型工作和阻塞的API。spawn_blocking
函数可以用来 “异步化” 这些任务,在允许阻塞的线程池中运行它们。
Tokio不会,也不会试图检测阻塞的任务,并通过在调度器中添加线程来自动进行补偿。这个问题在过去已经出现过很多次了,所以请允许我详细说明。
就背景而言,我们的想法是让调度器包括一个监控线程。这个线程每隔一段时间就会轮询调度器线程,并检查工作者是否在取得进展。如果一个工作者没有取得进展,就会认为该工作者正在执行一个阻塞任务,并应生成一个新的线程来进行补偿。
这个想法并不新鲜。据我所知,这种策略的第一次出现是在 .NET 线程池中,而且是在10多年前引入的。不幸的是,这个策略有很多问题,正因为如此,它没有在其他线程池/调度器(Go、Java、Erlang等)中出现。
第一个问题是很难定义 “进度”。对进度的一个天真的定义是,一个任务是否已经被调度到某个时间单位以上。例如,如果一个工作者在调度同一个任务时被卡住超过100ms,那么这个工作者就会被标记为阻塞,并生成一个新的线程。在这个定义中,如何检测催生新线程会降低吞吐量的情况?这可能发生在调度器普遍处于负载状态的时候,增加线程会使情况变得更加糟糕。为了解决这个问题,.NET线程池采用爬坡法。这篇文章对它的工作原理做了很好的概述。
第二个问题是,任何自动检测策略都容易受到突发性或其他不平衡工作负载的影响。这个具体问题一直是.NET线程池的祸根,被称为 “停顿” 问题。爬坡策略需要一定的时间(数百毫秒)来适应负载变化。这段时间的需要,部分是为了能够确定增加线程是在改善情况,而不是使情况恶化。
停顿问题可以用.NET线程池来管理,部分原因是线程池被设计用来安排粗略的任务,即执行时间在几百毫秒到几十秒的任务。然而,在Rust中,异步任务调度器被设计用来调度那些最多应该在微秒到几十毫秒内运行的任务。在这种情况下,任何基于启发式调度器的呆滞问题都会导致更大的延迟变化。
在这之后,我收到的最常见的后续问题是 “Go调度器不会自动检测阻塞的任务吗?"。简短的回答是:没有。这样做会导致上面提到的同样的卡顿问题。而且,Go没有必要进行通用的阻塞任务检测,因为Go能够抢占。Go调度器所做的是注释潜在的阻塞性系统调用。这大致等同于Tokio的 block_in_place
。
简而言之,截至目前,刚刚介绍的自动合作任务让出策略是我们发现的减少尾部延迟的最佳方法。因为这个策略只需要Tokio的类型选择加入,终端用户不需要改变任何东西就可以获得这个好处。只需升级Tokio的版本就可以包括这个新功能。另外,如果从Tokio运行时之外使用Tokio的类型,它们的行为将和以前一样。
在这个问题上还有很多工作要做。目前还不清楚任务预算应该如何与 “子表程序”(如 FuturesUnordered
)一起工作。任务预算API最终应该公开,以便第三方软件可以与之集成。如果能想出一个办法来普及这个概念,那么不仅仅是Tokio的用户可以利用这个概念,那也是很好的。
我们希望你在这次发布后发现你的尾部延迟有所改善。无论怎样,我们都有兴趣听到这个变化对现实世界的部署有什么影响。欢迎对这个问题发表评论。
—Carl Lerche
Alice Ryhl个人博客
我的名字是Alice Ryhl。我是一名软件开发人员和数学家,目前是丹麦技术大学的学生。我是广泛使用的Rust库Tokio的主要维护者之一。此外,我在谷歌的安卓安全和隐私团队工作,我使其他团队能够在安卓中使用Rust。
Tokio和Actor
这篇文章是关于直接用Tokio构建 Actor,而不使用任何 Actor 库,如Actix。这被证明是相当容易做到的,但有一些细节你应该注意。
-
把tokio::spawn的调用放在哪里
-
带有运行方法的结构体与裸函数
-
对 Actor 的处理
-
背压和有界通道
-
优雅关闭
本文概述的技术应该适用于任何执行器,但为了简单起见,我们将只讨论Tokio。本文与Tokio教程中的 spawning 和通道章节有一些重叠,我建议也阅读这些章节。
在我们讨论如何编写一个行为体之前,我们需要知道什么是 Actor。Actor 背后的基本想法是催生一个独立的任务,独立于程序的其他部分执行一些工作。通常,这些 Actor 通过使用消息传递通道与程序的其他部分进行通信。由于每个 Actor 都是独立运行的,使用它们设计的程序自然是并行的。
Actor 的一个常见用例是将你想要共享的某些资源的独占所有权分配给 Actor,然后让其他任务通过与 Actor 对话来间接访问该资源。例如,如果你正在实现一个聊天服务器,你可以为每个连接生成一个任务,还有一个主任务在其他任务之间路由聊天信息。这很有用,因为主任务可以避免处理网络IO,而连接任务可以专注于处理网络IO。
方案
Actor 被分成两部分:任务(task)和处理器(handle)。任务(task)是独立产生的Tokio任务,它实际上是执行 actor 的职责,而处理器(handle)是一个结构,允许你与任务进行通信。
让我们考虑一个简单的 Actor。Actor 内部存储一个计数器,用来获得某种唯一的ID。该Actor的基本结构如下:
use tokio::sync::{oneshot, mpsc};
struct MyActor {
receiver: mpsc::Receiver<ActorMessage>,
next_id: u32,
}
enum ActorMessage {
GetUniqueId {
respond_to: oneshot::Sender<u32>,
},
}
impl MyActor {
fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
MyActor {
receiver,
next_id: 0,
}
}
fn handle_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::GetUniqueId { respond_to } => {
self.next_id += 1;
// The `let _ =` ignores any errors when sending.
//
// This can happen if the `select!` macro is used
// to cancel waiting for the response.
let _ = respond_to.send(self.next_id);
},
}
}
}
async fn run_my_actor(mut actor: MyActor) {
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
}
现在我们有了 Actor 本身,我们还需要一个 Actor 的处理器(handle)。处理器(handle)是一个其他代码可以用来与 Actor 对话的对象,也是保持 Actor 活力的因素。
处理器(handle)看起来像这样:
#[derive(Clone)]
pub struct MyActorHandle {
sender: mpsc::Sender<ActorMessage>,
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver);
tokio::spawn(run_my_actor(actor));
Self { sender }
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId {
respond_to: send,
};
// Ignore send errors. If this send fails, so does the
// recv.await below. There's no reason to check for the
// same failure twice.
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
}
让我们仔细看一下这个例子中的不同部分。
ActorMessage。ActorMessage枚举定义了我们可以向actor发送的信息种类。通过使用一个枚举,我们可以有许多不同的消息类型,而且每个消息类型都可以有自己的参数集。我们通过使用 onehot 通道向发送者返回一个值,这是一个允许精确发送一个消息的消息传递通道。
在上面的例子中,我们在 actor 结构体的 handle_message
方法中对枚举进行匹配,但这并不是唯一的结构体方式。我们也可以在 run_my_actor
函数中对枚举进行匹配。这个匹配中的每个分支可以调用各种方法,比如 actor 对象上的 get_unique_id
。
发送消息时的错误。在处理通道时,并非所有的错误都是致命的。正因为如此,这个例子有时使用 let _ =
来忽略错误。一般来说,如果接收器被丢弃,在通道上的发送操作就会失败。
在我们的例子中,这种情况的第一个例子是在 actor 中我们对我们被发送的消息做出反应的那一行。如果接收者对操作的结果不再感兴趣,例如,发送消息的任务可能已经被杀死,这种情况就会发生。
Actor的关闭。我们可以通过查看接收消息的失败来检测 actor 何时应该关闭。在我们的例子中,这发生在下面的while循环中。
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
当所有发送到接收方的发送者都被放弃时,我们知道我们将永远不会收到另一个消息,因此可以关闭 actor 。当这种情况发生时,对 .recv()
的调用返回None
,由于它不符合 Some(msg)
的模式,while循环退出,该函数返回。
#[derive(Clone)]
MyActorHandle 结构体导出了 Clone 特性。它能这样做是因为 mpsc 意味着它是一个多生产者、单消费者的通道。由于该通道允许多个生产者,我们可以自由地克隆我们对 actor 的句柄,允许我们从多个地方与它对话。
结构体上的run方法
我在上面的例子中使用了一个没有定义在任何结构体上的顶级函数作为我们催生的 Tokio 任务,但是很多人发现直接在 MyActor 结构上定义一个run方法并催生它更自然。这当然也可以,但我之所以给出一个使用顶级函数的例子,是因为它更自然地引导你走向不会给你带来很多寿命问题的方法。
为了理解这个原因,我准备了一个不熟悉这个模式的人经常想出的例子:
impl MyActor {
fn run(&mut self) {
tokio::spawn(async move {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
});
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId {
respond_to: send,
};
// Ignore send errors. If this send fails, so does the
// recv.await below. There's no reason to check for the
// same failure twice.
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
}
... and no separate MyActorHandle
这个例子中的两个麻烦来源是:
-
tokio::spawn的调用是在run里面。
-
actor和handle是同一个结构体。
第一个麻烦导致了问题,因为 tokio::spawn
函数要求参数是 'static
。这意味着新任务必须拥有其内部的一切,这是一个问题,因为该方法借用了self,这意味着它不能将self的所有权交给新任务。
第二个麻烦导致了一些问题,因为 Rust 执行的是单一所有权原则。如果你把 actor 和 handle 合并成一个结构体,你(至少从编译器的角度来看)就会让每个 handle 访问 actor 的任务所拥有的字段。例如,next_id
整数应该只由 actor 的任务拥有,而不应该从任何处理器(handle)中直接访问。
这就是说,有一个版本是可行的。通过修复上述两个问题,你最终会得到以下结果:
impl MyActor {
async fn run(&mut self) {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
}
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver);
tokio::spawn(async move { actor.run().await });
Self { sender }
}
}
这与顶层函数的工作原理相同。注意,严格来说,可以写一个版本,其中tokio::spoon在里面运行,但我不推荐这种做法。
模式的变化
我在本文中用作例子的 actor 对消息使用了请求-响应的范式,但你不一定要这样做。在这一节中,我将给出一些灵感,告诉你如何改变这个想法。
不对消息进行响应
我用来介绍这个概念的例子包括了对通过 oneshot 发送的消息的响应,但你并不总是需要响应。在这些情况下,不在消息枚举中包括 oneshot 通道并无不妥。当通道有空间时,这甚至可以让你在消息被处理之前从发送中返回。
你还是应该确保使用一个有边界的通道,这样在通道中等待的消息数量就不会无限制地增长。在某些情况下,这意味着发送仍然需要是异步函数,以便处理发送操作需要在通道中等待更多空间的情况。
然而,有一个替代方法可以使发送成为异步方法。你可以使用 try_send
方法,并通过简单地杀死 actor 来处理发送失败。这在 actor 管理 TcpStream 的情况下很有用,它可以转发任何你发送到连接中的消息。在这种情况下,如果对 TcpStream 的写入速度跟不上,你可能想直接关闭连接。
一个Actor的多个处理器
如果一个 actor 需要从不同的地方发送消息,你可以使用多个处理器结构体来强制规定某些消息只能从某些地方发送。
这样做的时候,你仍然可以在内部重复使用同一个 mpsc 通道,用一个枚举来包含所有可能的消息类型。如果你确实想为此使用单独的通道,actor 可以使用tokio::select!
来同时接收多个通道。
loop {
tokio::select! {
Some(msg) = chan1.recv() => {
// handle msg
},
Some(msg) = chan2.recv() => {
// handle msg
},
else => break,
}
}
你需要小心处理通道关闭时的处理方式,因为在这种情况下他们的recv方法会立即返回None。幸运的是 tokio::select!
宏让你通过提供 Some(msg) 模式来处理这种情况。如果只有一个通道被关闭,该分支被禁用,另一个通道仍然被接收。当两个都关闭时,else 分支运行并使用 break 来退出循环。
Actor向其他Actor发送消息
让 actor向其他 actor 发送消息并无不妥。要做到这一点,你可以简单地给一个 actor 以其他 actor 的处理器。
如果你的 actor 形成了一个循环,你需要小心一点,因为通过抓住对方的处理器结构体,最后的发送者永远不会被放弃,从而防止关闭。为了处理这种情况,你可以让其中一个 actor 有两个处理器结构体,有独立的mpsc通道,但有一个 tokio::select!
,看起来像这样。
loop {
tokio::select! {
opt_msg = chan1.recv() => {
let msg = match opt_msg {
Some(msg) => msg,
None => break,
};
// handle msg
},
Some(msg) = chan2.recv() => {
// handle msg
},
}
}
如果 chan1 被关闭,上述循环将总是退出,即使 chan2 仍然开启。如果 chan2 是 actor 循环的一部分,这将打破循环,让 actor 关闭。
另一种方法是简单地在循环中的一个 actor 上调用abort。
多个actor共享一个处理器
就像每个 actor 可以有多个处理器一样,每个处理器也可以有多个actor。最常见的例子是在处理一个连接时,比如TcpStream,你通常会生成两个任务:一个用于读取,一个用于写入。当使用这种模式时,你让读写任务尽可能的简单–它们唯一的工作就是做IO。读取任务将只是把它收到的任何消息发送给其他任务,通常是另一个actor,而写入任务将只是把它收到的任何消息转发给连接。
这种模式非常有用,因为它隔离了与执行IO相关的复杂性,这意味着程序的其他部分可以假装向连接写东西是即时发生的,尽管实际的写东西是在 actor 处理消息的某个时间后发生的。
提防循环
我已经在 “Actor向其他Actor发送消息” 的标题下谈了一些关于循环的问题,在那里我讨论了形成循环的 actor 的关闭问题。然而,关闭并不是循环可能导致的唯一问题,因为循环也可能导致死锁,即循环中的每个 actor 都在等待下一个 actor 收到消息,但下一个角色不会收到该消息,直到它的下一个 actor 收到消息,如此循环。
为了避免这样的死锁,你必须确保不存在容量受限的通道循环。其原因是,有界通道的发送方法不会立即返回。发送方法总是立即返回的通道不属于这种循环,因为你不能在这样的发送中出现死锁。
请注意,这意味着 oneshot 通道不会成为死锁循环的一部分,因为它们的发送方法总是立即返回。还要注意的是,如果你使用 try_send
而不是 send
来发送消息,这也不能成为死锁循环的一部分。
感谢 matklad 指出了循环和死锁的问题。
Tokio和Actor
Rust
的 async
/await
特性是通过一种称为协作式调度(cooperative scheduling)的机制来实现的,这对于编写异步Rust
代码的人来说有一些重要的影响。
这篇博文的目标读者是异步 Rust
的新用户。我将使用 Tokio
运行时作为示例,但这里提出的观点适用于任何异步运行时。
如果你只从这篇文章中记住一件事,那应该是:
切记
异步代码不应该长时间不到达.await
。
(注:指的是运行中)
Blocking vs. non-blocking code
编写一个可以同时处理许多事情的应用程序的质朴的方法是为每个任务生成一个新线程。如果任务的数量很少,这就是一个完美的解决方案,但是随着任务的数量变得越来越多,你最终会因为数量过多的线程而遇到问题。这个问题在不同的编程语言中有不同的解决方案,但它们都归结为同一件事: 非常快速地切换每个线程上当前运行的任务,这样所有的任务都有机会运行。在Rust
中,这种切换发生在当你.await
一些事上。
在写异步rust 时,短语 阻塞线程(blocking the thread)
意味着 阻止运行时切换当前任务
。这可能是一个主要问题,这意味着同一运行时上的其他任务将停止运行,直到线程不再被阻塞。为了防止这种情况,我们应该编写能够快速切换的代码,也就是不要长时间不进行.await
。
让我们来举个例子:
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("Hello World!");
// No .await here!
std::thread::sleep(Duration::from_secs(5));
println!("Five seconds later...");
}
上面的代码看起来是正确的,运行它将看起来正常工作。但它有一个致命的缺陷: 它阻塞了线程。上述场景没有其他任务,所以看不出问题,但在真正的程序中就不一样了。为了说明这一点,考虑下面的例子:
use std::time::Duration;
async fn sleep_then_print(timer: i32) {
println!("Start timer {}.", timer);
// No .await here!
std::thread::sleep(Duration::from_secs(1));
println!("Timer {} done.", timer);
}
#[tokio::main]
async fn main() {
// The join! macro lets you run multiple things concurrently.
tokio::join!(
sleep_then_print(1),
sleep_then_print(2),
sleep_then_print(3),
);
}
Start timer 1.
Timer 1 done.
Start timer 2.
Timer 2 done.
Start timer 3.
Timer 3 done.
这个例子将花费三秒钟运行,timer将在没有任何并发的情况下一个接一个地运行。原因很简单: Tokio
运行时无法将一个任务切换为另一个任务,因为这种切换只能发生在.await
。因为 sleep_then_print
中没有.await
,在运行时就不会发生切换。
然而,如果我们使用Tokio
的 sleep 函数,它使用一个.await
来睡眠,那么这个函数就会正常工作:
use tokio::time::Duration;
async fn sleep_then_print(timer: i32) {
println!("Start timer {}.", timer);
tokio::time::sleep(Duration::from_secs(1)).await;
// ^ execution can be paused here
println!("Timer {} done.", timer);
}
#[tokio::main]
async fn main() {
// The join! macro lets you run multiple things concurrently.
tokio::join!(
sleep_then_print(1),
sleep_then_print(2),
sleep_then_print(3),
);
}
Start timer 1.
Start timer 2.
Start timer 3.
Timer 1 done.
Timer 2 done.
Timer 3 done.
这段代码只需一秒钟就可以运行,并且如预期地在同一时间正确地运行所有三个函数。
要知道,事情并不总是这么明显。通过使用 tokio::join!,这三个任务都保证在同一个线程上运行,但如果你用 tokio::spawn 替换它,并使用一个多线程的运行时,你将能够同时运行多个阻塞任务直到用完线程。默认的 Tokio
运行时按照核心数生成线程,你的电脑通常会有8个CPU
。这足以使你在本地测试时忽略这个问题,并在真实场景下运行代码时非常快地耗尽线程。
为了给出多少时间算的上过久的定义,一个好的经验法则是每个 .await
的时间间隔不超过10到100微秒。这取决于你正在编写的应用程序的类型。
如果我想要阻塞呢?
有时候我们只是想阻塞线程。这是完全正常的。有两个常见的原因:
- 昂贵的
CPU-Bound
(受限于CPU资源的)计算 - Synchronous IO. 同步IO
在这两种情况下,我们都在处理一个会在一段时间内阻止任务 .await
的操作。为了解决这个问题,我们必须将阻塞操作移动到Tokio
线程池外部的线程中去。关于这一点有三种形式可以使用:
- 使用 tokio::task::spawn_blocking 函数
- 使用 rayon crate
- 通过 std::thread::spawn 创建一个专门的线程
让我们仔细浏览下每个解决方案,看看什么时候应该使用它。
spawn_blocking函数
Tokio
运行时包含一个单独的线程池,专门用于运行阻塞函数,你可以使用 spawn_blocking 在其上运行任务。这个线程池的上限是大约500个线程,因此可以在这个线程池上进行大量阻塞操作。
由于线程池有如此多的线程,它最适合用于阻塞 I/O
(如与文件系统交互)或使用阻塞数据库库(如diesel)。
线程池不适合昂贵的CPU-bound
计算,因为线程池的线程数量比计算机上的 CPU核心数量多得多。当线程数等于CPU核心数时,CPU-bound
的计算运行效率最高。也就是说,如果只需要运行几个CPU-bound
的计算,我不会责怪你用 spawn_blocking
运行它们,因为这样做非常简单。
#[tokio::main]
async fn main() {
// This is running on Tokio. We may not block here.
let blocking_task = tokio::task::spawn_blocking(|| {
// This is running on a thread where blocking is fine.
println!("Inside spawn_blocking");
});
// We can wait for the blocking task like this:
// If the blocking task panics, the unwrap below will propagate the
// panic.
blocking_task.await.unwrap();
}
rayon crate
rayon 是一个著名的库,它提供了一个专门用于昂贵的CPU-bound
计算的线程池,你可以将它与Tokio
一起用于此目的。与spawn_blocking
不同,rayon
的线程池的最大线程数量很少,这就是为什么它适合进行昂贵计算的原因。
我们将计算大列表的和作为例子,但是请注意在实践中,除非数组非常非常大,否则只计算和是足够便宜的,可以直接在Tokio
运行。
使用rayon
的主要危险是,在等待rayon
完成时,必须注意不要阻塞线程。要做到这一点,可以将 rayon::spawn 和 tokio::sync::oneshot 结合起来,就像这样:
async fn parallel_sum(nums: Vec<i32>) -> i32 {
let (send, recv) = tokio::sync::oneshot::channel();
// Spawn a task on rayon.
rayon::spawn(move || {
// Perform an expensive computation.
let mut sum = 0;
for num in nums {
sum += num;
}
// Send the result back to Tokio.
let _ = send.send(sum);
});
// Wait for the rayon task.
recv.await.expect("Panic in rayon::spawn")
}
#[tokio::main]
async fn main() {
let nums = vec![1; 1024 * 1024];
println!("{}", parallel_sum(nums).await);
}
上述例子使用了rayon
的线程池来运行昂贵的操作。请注意,例子中对 parallel_sum
的每个调用只使用了rayon
线程池中的一个线程。如果你的应用程序中有很多对parallel_ sum
的调用,那么这是有意义的,但是也可以使用rayon
的并行迭代器在几个线程上计算和:
use rayon::prelude::*;
// Spawn a task on rayon.
rayon::spawn(move || {
// Compute the sum on multiple threads.
let sum = nums.par_iter().sum();
// Send the result back to Tokio.
let _ = send.send(sum);
});
注意,当使用并行迭代器时,仍然需要rayon::spawn
调用,因为并行迭代器是阻塞的。
生成一个专用线程
如果阻塞操作是需要永久运行下去的,那么应该在专用线程上运行它。例如,考虑一个管理数据库连接的线程,他需要使用通道来接收要执行的操作。因为这个线程在循环中监听该通道,所以它永远不会退出。
在上述两个线程池中的任何一个上运行这样的任务都会是一个问题,因为它实际上从池中永久地带走了一个线程。一旦这样做了几次,线程池中就没有更多的线程了,所有其他阻塞任务都无法执行。
当然,如果你不在意每次启动一个新线程的成本,也可以使用专用线程来运行较短生命周期的任务。
总结
如果你忘了,以下是你需要记住的主要事情:
切记
异步代码不应该长时间不到达.await
。
下面你将看到一张备忘单,上面列出了当你想要阻塞的时候可以使用的方法:
CPU-bound computation | Synchronous IO | Running forever | |
---|---|---|---|
spawn_blocking |
次优 | OK | No |
rayon |
OK | No | No |
专用线程 | OK | OK | OK |
最后,我建议阅读Tokio
教程中关于 共享状态 的章节。本章将解释如何在异步代码中正确使用 std::sync::Mutex,并更深入地说明为什么即使锁定Mutex
是阻塞的,也可以这样做。(剧透一下: 如果你的阻塞时间很短,它真的算阻塞吗?)
我还强烈推荐Tokio
博客中的一篇文章: Reducing tail latencies with automatic cooperative task yielding。
感谢 Chris Krycho 和 snocl 阅读本文草稿并提供有用的建议。所有的错误都是我自己的。
内容出处
- 原文: https://ryhl.io/blog/actors-with-tokio/
- 中文翻译:https://bingowith.me/2021/05/09/translation-async-what-is-blocking/