发明Service特质

Console是一个Rust异步调试工具。它的目标是让你在试图更好地理解你的异步任务的行为方式时,成为你所要使用的工具。

Tower是一个模块化和可重复使用的组件库,用于构建强大的网络客户端和服务器。其核心是 Service 特性。Service 是一个异步函数,它接受一个请求并产生一个响应。然而,其设计的某些方面可能并不那么明显。与其解释今天存在于Tower中的 Service 特性,不如让我们想象一下如果你从头开始,你会如何发明 Service 背后的动机。

想象一下,你正在用Rust构建一个小小的HTTP框架。这个框架将允许用户通过提供接收请求并回复一些响应的代码来实现一个HTTP服务器。

你可能有一个这样的API:

// Create a server that listens on port 3000
let server = Server::new("127.0.0.1:3000").await?;

// Somehow run the user's application
server.run(the_users_application).await?;

问题是,the_users_application应该是什么?

最简单的可能是:

fn handle_request(request: HttpRequest) -> HttpResponse {
    // ...
}

其中 HttpRequestHttpResponse 是由我们的框架提供的一些结构体。

有了这个,我们可以像这样实现 Server::run :

impl Server {
    async fn run<F>(self, handler: F) -> Result<(), Error>
    where
        F: Fn(HttpRequest) -> HttpResponse,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Call the handler provided by the user
            let response = handler(request);

            write_http_response(connection, response).await?;
        }
    }
}

在这里,我们有一个异步函数 run,它接受一个接受 HttpRequest 并返回 HttpResponse 的闭包。

这意味着用户可以像这样使用我们的服务器:

fn handle_request(request: HttpRequest) -> HttpResponse {
    if request.path() == "/" {
        HttpResponse::ok("Hello, World!")
    } else {
        HttpResponse::not_found()
    }
}

// Run the server and handle requests using our `handle_request` function
server.run(handle_request).await?;

这并不坏。它使用户可以很容易地运行HTTP服务器,而不必担心任何低层次的细节问题。

然而,我们目前的设计有一个问题:我们不能异步地处理请求。想象一下,我们的用户在处理请求的同时需要查询数据库或向其他一些服务器发送请求。目前,这需要在我们等待处理程序产生响应时进行阻塞。如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时能够为其他请求提供服务。让我们通过让处理程序函数返回一个 future 来解决这个问题:

impl Server {
    async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
    where
        // `handler` now returns a generic type `Fut`...
        F: Fn(HttpRequest) -> Fut,
        // ...which is a `Future` whose `Output` is an `HttpResponse`
        Fut: Future<Output = HttpResponse>,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Await the future returned by `handler`
            let response = handler(request).await;

            write_http_response(connection, response).await?;
        }
    }
}

使用这个API和以前非常相似:

// Now an async function
async fn handle_request(request: HttpRequest) -> HttpResponse {
    if request.path() == "/" {
        HttpResponse::ok("Hello, World!")
    } else if request.path() == "/important-data" {
        // We can now do async stuff in here
        let some_data = fetch_data_from_database().await;
        make_response(some_data)
    } else {
        HttpResponse::not_found()
    }
}

// Running the server is the same
server.run(handle_request).await?;

这就好得多了,因为我们的请求处理现在可以调用其他异步函数。然而,仍然缺少一些东西。如果我们的处理程序遇到了错误,不能产生响应怎么办?让我们让它返回一个 Result:

impl Server {
    async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
    where
        F: Fn(HttpRequest) -> Fut,
        // The response future is now allowed to fail
        Fut: Future<Output = Result<HttpResponse, Error>>,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Pattern match on the result of the response future
            match handler(request).await {
                Ok(response) => write_http_response(connection, response).await?,
                Err(error) => handle_error_somehow(error, connection),
            }
        }
    }
}

添加更多的行为

现在,假设我们想确保所有的请求及时完成或失败,而不是让客户端无限期地等待一个可能永远不会到达的响应。我们可以通过给每个请求添加一个超时来做到这一点。超时设置了一个处理程序允许的最大持续时间的限制。如果它在这个时间内没有产生一个响应,就会返回一个错误。这允许客户端重试该请求或向用户报告一个错误,而不是永远等待。

你的第一个想法可能是修改Server,使其可以配置一个超时。然后它在每次调用处理程序时都会应用这个超时。然而,事实证明,你实际上可以在不修改Server的情况下添加一个超时。使用 tokio::time::timeout,我们可以做一个新的处理函数,调用之前的 handle_request,但超时时间为 30 秒。

async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
    let result = tokio::time::timeout(
        Duration::from_secs(30),
        handle_request(request)
    ).await;

    match result {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(error)) => Err(error),
        Err(_timeout_elapsed) => Err(Error::timeout()),
    }
}

这提供了一个相当好的关注点分离。我们能够在不改变任何现有代码的情况下增加一个超时功能。

让我们用这种方式再增加一个功能。想象一下,我们正在构建一个JSON API,因此希望在所有的响应上都有一个 Content-Type: application/json 头。我们可以用类似的方式包装 handler_with_timeout,并像这样修改响应:

async fn handler_with_timeout_and_content_type(
    request: HttpRequest,
) -> Result<HttpResponse, Error> {
    let mut response = handler_with_timeout(request).await?;
    response.set_header("Content-Type", "application/json");
    Ok(response)
}

我们现在有了一个处理程序,它可以处理一个HTTP请求,时间不超过30秒,并且总是有正确的Content-Type头,所有这些都不用修改我们原来的handle_request函数或Server结构。

设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过分层的新行为来扩展库的功能,而不必等待库的维护者为其添加支持。

它也使测试更容易,因为你可以将你的代码分解成小的隔离单元,并为它们编写细粒度的测试,而不必担心所有其他的部分。

然而,有一个问题。我们目前的设计让我们通过将一个处理函数包裹在一个新的处理函数中来组成新的行为,该处理函数实现了该行为,然后调用内部函数。这很有效,但如果我们想增加很多额外的功能,它就不能很好地扩展。想象一下,我们有许多 handle_with_* 函数,每个函数都增加了一点新的行为。要硬编码哪一个中间处理程序调用哪一个,将变得很有挑战性。我们目前的链条是

  1. handler_with_timeout_and_content_type 调用
  2. handler_with_timeout 调用
  3. handle_request 实际处理请求

如果我们能以某种方式组合(compose)这三个函数而不需要硬编码确切的顺序,那就更好了。比如说:

let final_handler = with_content_type(with_timeout(handle_request));

同时仍然能够像以前一样运行我们的处理程序:

server.run(final_handler).await?;

你可以尝试将 with_content_typewith_timeout 作为函数来实现,该函数接受一个 F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>> 类型的参数,并返回一个像 impl Fn(HttpRequest) -> impl Future<Output = Result<HttpResponse, Error>> 的闭包,但由于Rust今天允许 impl Trait 的限制,这实际上不可能。特别是 impl Fn() -> impl Future 是不允许的。使用 Box 是可能的,但这有一个我们想避免的性能代价。

除了调用处理程序之外,你也不能为其添加其他行为,但为什么有必要这样做,我们会再讨论。

Handler特性

让我们尝试另一种方法。与其说 Server::run 接受一个闭包(Fn(HttpRequest) -> ...),不如说我们做一个新的trait来封装同样的 async fn(HttpRequest) -> Result<HttpResponse, Error>:

trait Handler {
    async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}

有了这样一个特性,我们就可以写出实现它的具体类型,这样我们就不用一直和 Fn 打交道了。

然而,Rust目前不支持异步trait方法,所以我们有两个选择:

  1. call 返回一个封箱(boxed)的 future,如 Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>。这就是 async-trait 箱的作用。

  2. Handler 添加一个相关类型的 Future,这样用户就可以选择自己的类型。

让我们选择方案二,因为它是最灵活的。有具体的 future 类型的用户可以使用该类型,而不需要付出Box的代价,不关心的用户仍然可以使用Pin<Box<…>。

trait Handler {
    type Future: Future<Output = Result<HttpResponse, Error>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future;
}

我们仍然必须要求 Handler::Future 实现 Future,其输出类型为 Result<HttpResponse, Error>,因为那是 Server::run 所要求的。

call 获取 &mut self 很有用,因为它允许处理程序在必要时更新其内部状态。

让我们把原来的 handle_request 函数转换成这个特性的实现。

struct RequestHandler;

impl Handler for RequestHandler {
    // We use `Pin<Box<...>>` here for simplicity, but could also define our
    // own `Future` type to avoid the overhead
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            // same implementation as we had before
            if request.path() == "/" {
                Ok(HttpResponse::ok("Hello, World!"))
            } else if request.path() == "/important-data" {
                let some_data = fetch_data_from_database().await?;
                Ok(make_response(some_data))
            } else {
                Ok(HttpResponse::not_found())
            }
        })
    }
}

支持超时如何?请记住,我们的目标是让我们能够将不同的功能组合在一起,而不需要修改每个单独的部分。

如果我们像这样定义一个通用的超时结构,会怎么样呢:

struct Timeout<T> {
    // T will be some type that implements `Handler`
    inner_handler: T,
    duration: Duration,
}

然后我们可以为 Timeout<T> 实现 Handler,并委托给 THandler 实现:

impl<T> Handler for Timeout<T>
where
    T: Handler,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            let result = tokio::time::timeout(
                self.duration,
                self.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

这里重要的一行是 self.inner_handler.call(request)。这就是我们委托给内部处理程序并让它做它的事情的地方。我们不知道它是什么,我们只知道它完成后会产生一个 Result<HttpResponse, Error>

但这段代码并没有完全被编译。我们得到一个这样的错误:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/lib.rs:145:29
    |
144 |       fn call(&mut self, request: HttpRequest) -> Self::Future {
    |               --------- this data with an anonymous lifetime `'_`...
145 |           Box::pin(async move {
    |  _____________________________^
146 | |             let result = tokio::time::timeout(
147 | |                 self.duration,
148 | |                 self.inner_handler.call(request),
...   |
155 | |             }
156 | |         })
    | |_________^ ...is captured here, requiring it to live as long as `'static`

问题是,我们正在捕获一个 &mut self,并将其移入一个异步块。这意味着我们的 future 的生命周期与 &mut self 的生命周期相关。这对我们来说并不适用,因为我们可能想在多个线程上运行我们的响应 future 以获得更好的性能,或者产生多个响应 future 并将它们全部并行运行。如果对处理程序的引用存在于futures中,这是不可能的。

相反,我们需要将 &mut self 转换为拥有的 self。这正是 Clone 所做的。

// this must be `Clone` for `Timeout<T>` to be `Clone`
#[derive(Clone)]
struct RequestHandler;

impl Handler for RequestHandler {
    // ...
}

#[derive(Clone)]
struct Timeout<T> {
    inner_handler: T,
    duration: Duration,
}

impl<T> Handler for Timeout<T>
where
    T: Handler + Clone,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        // Get an owned clone of `&mut self`
        let mut this = self.clone();

        Box::pin(async move {
            let result = tokio::time::timeout(
                this.duration,
                this.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

注意,在这种情况下,克隆是非常廉价的,因为 RequestHandler 没有任何数据,而 Timeout<T> 只增加了一个 Duration(这个会被 Copy)。

又近了一步。我们现在得到一个不同的错误:

error[E0310]: the parameter type `T` may not live long enough
   --> src/lib.rs:149:9
    |
140 |   impl<T> Handler for Timeout<T>
    |        - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | /         Box::pin(async move {
150 | |             let result = tokio::time::timeout(
151 | |                 this.duration,
152 | |                 this.inner_handler.call(request),
...   |
159 | |             }
160 | |         })
    | |__________^ ...so that the type `impl Future` will meet its required lifetime bounds

TBD

内容出处: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait