这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Alice Ryhl个人博客

https://ryhl.io/

我的名字是Alice Ryhl。我是一名软件开发人员和数学家,目前是丹麦技术大学的学生。我是广泛使用的Rust库Tokio的主要维护者之一。此外,我在谷歌的安卓安全和隐私团队工作,我使其他团队能够在安卓中使用Rust。

Tokio和Actor

用Tower的 service trait 构建的HTTP特定中间件和实用程序的集合

这篇文章是关于直接用Tokio构建 Actor,而不使用任何 Actor 库,如Actix。这被证明是相当容易做到的,但有一些细节你应该注意。

  • 把tokio::spawn的调用放在哪里

  • 带有运行方法的结构体与裸函数

  • 对 Actor 的处理

  • 背压和有界通道

  • 优雅关闭

本文概述的技术应该适用于任何执行器,但为了简单起见,我们将只讨论Tokio。本文与Tokio教程中的 spawning 和通道章节有一些重叠,我建议也阅读这些章节。

在我们讨论如何编写一个行为体之前,我们需要知道什么是 Actor。Actor 背后的基本想法是催生一个独立的任务,独立于程序的其他部分执行一些工作。通常,这些 Actor 通过使用消息传递通道与程序的其他部分进行通信。由于每个 Actor 都是独立运行的,使用它们设计的程序自然是并行的。

Actor 的一个常见用例是将你想要共享的某些资源的独占所有权分配给 Actor,然后让其他任务通过与 Actor 对话来间接访问该资源。例如,如果你正在实现一个聊天服务器,你可以为每个连接生成一个任务,还有一个主任务在其他任务之间路由聊天信息。这很有用,因为主任务可以避免处理网络IO,而连接任务可以专注于处理网络IO。

方案

Actor 被分成两部分:任务(task)和处理器(handle)。任务(task)是独立产生的Tokio任务,它实际上是执行 actor 的职责,而处理器(handle)是一个结构,允许你与任务进行通信。

让我们考虑一个简单的 Actor。Actor 内部存储一个计数器,用来获得某种唯一的ID。该Actor的基本结构如下:

use tokio::sync::{oneshot, mpsc};

struct MyActor {
    receiver: mpsc::Receiver<ActorMessage>,
    next_id: u32,
}
enum ActorMessage {
    GetUniqueId {
        respond_to: oneshot::Sender<u32>,
    },
}

impl MyActor {
    fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
        MyActor {
            receiver,
            next_id: 0,
        }
    }
    fn handle_message(&mut self, msg: ActorMessage) {
        match msg {
            ActorMessage::GetUniqueId { respond_to } => {
                self.next_id += 1;

                // The `let _ =` ignores any errors when sending.
                //
                // This can happen if the `select!` macro is used
                // to cancel waiting for the response.
                let _ = respond_to.send(self.next_id);
            },
        }
    }
}

async fn run_my_actor(mut actor: MyActor) {
    while let Some(msg) = actor.receiver.recv().await {
        actor.handle_message(msg);
    }
}

现在我们有了 Actor 本身,我们还需要一个 Actor 的处理器(handle)。处理器(handle)是一个其他代码可以用来与 Actor 对话的对象,也是保持 Actor 活力的因素。

处理器(handle)看起来像这样:

#[derive(Clone)]
pub struct MyActorHandle {
    sender: mpsc::Sender<ActorMessage>,
}

impl MyActorHandle {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = MyActor::new(receiver);
        tokio::spawn(run_my_actor(actor));

        Self { sender }
    }

    pub async fn get_unique_id(&self) -> u32 {
        let (send, recv) = oneshot::channel();
        let msg = ActorMessage::GetUniqueId {
            respond_to: send,
        };

        // Ignore send errors. If this send fails, so does the
        // recv.await below. There's no reason to check for the
        // same failure twice.
        let _ = self.sender.send(msg).await;
        recv.await.expect("Actor task has been killed")
    }
}

full example

让我们仔细看一下这个例子中的不同部分。

ActorMessage。ActorMessage枚举定义了我们可以向actor发送的信息种类。通过使用一个枚举,我们可以有许多不同的消息类型,而且每个消息类型都可以有自己的参数集。我们通过使用 onehot 通道向发送者返回一个值,这是一个允许精确发送一个消息的消息传递通道。

在上面的例子中,我们在 actor 结构体的 handle_message 方法中对枚举进行匹配,但这并不是唯一的结构体方式。我们也可以在 run_my_actor 函数中对枚举进行匹配。这个匹配中的每个分支可以调用各种方法,比如 actor 对象上的 get_unique_id

发送消息时的错误。在处理通道时,并非所有的错误都是致命的。正因为如此,这个例子有时使用 let _ = 来忽略错误。一般来说,如果接收器被丢弃,在通道上的发送操作就会失败。

在我们的例子中,这种情况的第一个例子是在 actor 中我们对我们被发送的消息做出反应的那一行。如果接收者对操作的结果不再感兴趣,例如,发送消息的任务可能已经被杀死,这种情况就会发生。

Actor的关闭。我们可以通过查看接收消息的失败来检测 actor 何时应该关闭。在我们的例子中,这发生在下面的while循环中。

while let Some(msg) = actor.receiver.recv().await {
    actor.handle_message(msg);
}

当所有发送到接收方的发送者都被放弃时,我们知道我们将永远不会收到另一个消息,因此可以关闭 actor 。当这种情况发生时,对 .recv() 的调用返回None,由于它不符合 Some(msg) 的模式,while循环退出,该函数返回。

#[derive(Clone)] MyActorHandle 结构体导出了 Clone 特性。它能这样做是因为 mpsc 意味着它是一个多生产者、单消费者的通道。由于该通道允许多个生产者,我们可以自由地克隆我们对 actor 的句柄,允许我们从多个地方与它对话。

结构体上的run方法

我在上面的例子中使用了一个没有定义在任何结构体上的顶级函数作为我们催生的 Tokio 任务,但是很多人发现直接在 MyActor 结构上定义一个run方法并催生它更自然。这当然也可以,但我之所以给出一个使用顶级函数的例子,是因为它更自然地引导你走向不会给你带来很多寿命问题的方法。

为了理解这个原因,我准备了一个不熟悉这个模式的人经常想出的例子:

impl MyActor {
    fn run(&mut self) {
        tokio::spawn(async move {
            while let Some(msg) = self.receiver.recv().await {
                self.handle_message(msg);
            }
        });
    }

    pub async fn get_unique_id(&self) -> u32 {
        let (send, recv) = oneshot::channel();
        let msg = ActorMessage::GetUniqueId {
            respond_to: send,
        };

        // Ignore send errors. If this send fails, so does the
        // recv.await below. There's no reason to check for the
        // same failure twice.
        let _ = self.sender.send(msg).await;
        recv.await.expect("Actor task has been killed")
    }
}

... and no separate MyActorHandle

这个例子中的两个麻烦来源是:

  • tokio::spawn的调用是在run里面。

  • actor和handle是同一个结构体。

第一个麻烦导致了问题,因为 tokio::spawn 函数要求参数是 'static。这意味着新任务必须拥有其内部的一切,这是一个问题,因为该方法借用了self,这意味着它不能将self的所有权交给新任务。

第二个麻烦导致了一些问题,因为 Rust 执行的是单一所有权原则。如果你把 actor 和 handle 合并成一个结构体,你(至少从编译器的角度来看)就会让每个 handle 访问 actor 的任务所拥有的字段。例如,next_id 整数应该只由 actor 的任务拥有,而不应该从任何处理器(handle)中直接访问。

这就是说,有一个版本是可行的。通过修复上述两个问题,你最终会得到以下结果:

impl MyActor {
    async fn run(&mut self) {
        while let Some(msg) = self.receiver.recv().await {
            self.handle_message(msg);
        }
    }
}

impl MyActorHandle {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = MyActor::new(receiver);
        tokio::spawn(async move { actor.run().await });

        Self { sender }
    }
}

这与顶层函数的工作原理相同。注意,严格来说,可以写一个版本,其中tokio::spoon在里面运行,但我不推荐这种做法。

模式的变化

我在本文中用作例子的 actor 对消息使用了请求-响应的范式,但你不一定要这样做。在这一节中,我将给出一些灵感,告诉你如何改变这个想法。

不对消息进行响应

我用来介绍这个概念的例子包括了对通过 oneshot 发送的消息的响应,但你并不总是需要响应。在这些情况下,不在消息枚举中包括 oneshot 通道并无不妥。当通道有空间时,这甚至可以让你在消息被处理之前从发送中返回。

你还是应该确保使用一个有边界的通道,这样在通道中等待的消息数量就不会无限制地增长。在某些情况下,这意味着发送仍然需要是异步函数,以便处理发送操作需要在通道中等待更多空间的情况。

然而,有一个替代方法可以使发送成为异步方法。你可以使用 try_send 方法,并通过简单地杀死 actor 来处理发送失败。这在 actor 管理 TcpStream 的情况下很有用,它可以转发任何你发送到连接中的消息。在这种情况下,如果对 TcpStream 的写入速度跟不上,你可能想直接关闭连接。

一个Actor的多个处理器

如果一个 actor 需要从不同的地方发送消息,你可以使用多个处理器结构体来强制规定某些消息只能从某些地方发送。

这样做的时候,你仍然可以在内部重复使用同一个 mpsc 通道,用一个枚举来包含所有可能的消息类型。如果你确实想为此使用单独的通道,actor 可以使用tokio::select! 来同时接收多个通道。

loop {
    tokio::select! {
        Some(msg) = chan1.recv() => {
            // handle msg
        },
        Some(msg) = chan2.recv() => {
            // handle msg
        },
        else => break,
    }
}

你需要小心处理通道关闭时的处理方式,因为在这种情况下他们的recv方法会立即返回None。幸运的是 tokio::select! 宏让你通过提供 Some(msg) 模式来处理这种情况。如果只有一个通道被关闭,该分支被禁用,另一个通道仍然被接收。当两个都关闭时,else 分支运行并使用 break 来退出循环。

Actor向其他Actor发送消息

让 actor向其他 actor 发送消息并无不妥。要做到这一点,你可以简单地给一个 actor 以其他 actor 的处理器。

如果你的 actor 形成了一个循环,你需要小心一点,因为通过抓住对方的处理器结构体,最后的发送者永远不会被放弃,从而防止关闭。为了处理这种情况,你可以让其中一个 actor 有两个处理器结构体,有独立的mpsc通道,但有一个 tokio::select!,看起来像这样。

loop {
    tokio::select! {
        opt_msg = chan1.recv() => {
            let msg = match opt_msg {
                Some(msg) => msg,
                None => break,
            };
            // handle msg
        },
        Some(msg) = chan2.recv() => {
            // handle msg
        },
    }
}

如果 chan1 被关闭,上述循环将总是退出,即使 chan2 仍然开启。如果 chan2 是 actor 循环的一部分,这将打破循环,让 actor 关闭。

另一种方法是简单地在循环中的一个 actor 上调用abort。

多个actor共享一个处理器

就像每个 actor 可以有多个处理器一样,每个处理器也可以有多个actor。最常见的例子是在处理一个连接时,比如TcpStream,你通常会生成两个任务:一个用于读取,一个用于写入。当使用这种模式时,你让读写任务尽可能的简单–它们唯一的工作就是做IO。读取任务将只是把它收到的任何消息发送给其他任务,通常是另一个actor,而写入任务将只是把它收到的任何消息转发给连接。

这种模式非常有用,因为它隔离了与执行IO相关的复杂性,这意味着程序的其他部分可以假装向连接写东西是即时发生的,尽管实际的写东西是在 actor 处理消息的某个时间后发生的。

提防循环

我已经在 “Actor向其他Actor发送消息” 的标题下谈了一些关于循环的问题,在那里我讨论了形成循环的 actor 的关闭问题。然而,关闭并不是循环可能导致的唯一问题,因为循环也可能导致死锁,即循环中的每个 actor 都在等待下一个 actor 收到消息,但下一个角色不会收到该消息,直到它的下一个 actor 收到消息,如此循环。

为了避免这样的死锁,你必须确保不存在容量受限的通道循环。其原因是,有界通道的发送方法不会立即返回。发送方法总是立即返回的通道不属于这种循环,因为你不能在这样的发送中出现死锁。

请注意,这意味着 oneshot 通道不会成为死锁循环的一部分,因为它们的发送方法总是立即返回。还要注意的是,如果你使用 try_send 而不是 send 来发送消息,这也不能成为死锁循环的一部分。

感谢 matklad 指出了循环和死锁的问题。

内容出处: https://ryhl.io/blog/actors-with-tokio/

Tokio和Actor

用Tower的 service trait 构建的HTTP特定中间件和实用程序的集合

Rustasync/await 特性是通过一种称为协作式调度(cooperative scheduling)的机制来实现的,这对于编写异步Rust代码的人来说有一些重要的影响。

这篇博文的目标读者是异步 Rust 的新用户。我将使用 Tokio 运行时作为示例,但这里提出的观点适用于任何异步运行时。

如果你只从这篇文章中记住一件事,那应该是:

(注:指的是运行中)

Blocking vs. non-blocking code

编写一个可以同时处理许多事情的应用程序的质朴的方法是为每个任务生成一个新线程。如果任务的数量很少,这就是一个完美的解决方案,但是随着任务的数量变得越来越多,你最终会因为数量过多的线程而遇到问题。这个问题在不同的编程语言中有不同的解决方案,但它们都归结为同一件事: 非常快速地切换每个线程上当前运行的任务,这样所有的任务都有机会运行。在Rust中,这种切换发生在当你.await一些事上。

在写异步rust 时,短语 阻塞线程(blocking the thread) 意味着 阻止运行时切换当前任务。这可能是一个主要问题,这意味着同一运行时上的其他任务将停止运行,直到线程不再被阻塞。为了防止这种情况,我们应该编写能够快速切换的代码,也就是不要长时间不进行.await

让我们来举个例子:

use std::time::Duration;

#[tokio::main]
async fn main() {
    println!("Hello World!");

    // No .await here!
    std::thread::sleep(Duration::from_secs(5));

    println!("Five seconds later...");
}

上面的代码看起来是正确的,运行它将看起来正常工作。但它有一个致命的缺陷: 它阻塞了线程。上述场景没有其他任务,所以看不出问题,但在真正的程序中就不一样了。为了说明这一点,考虑下面的例子:

use std::time::Duration;

async fn sleep_then_print(timer: i32) {
    println!("Start timer {}.", timer);

    // No .await here!
    std::thread::sleep(Duration::from_secs(1));

    println!("Timer {} done.", timer);
}

#[tokio::main]
async fn main() {
    // The join! macro lets you run multiple things concurrently.
    tokio::join!(
        sleep_then_print(1),
        sleep_then_print(2),
        sleep_then_print(3),
    );
}
Start timer 1.
Timer 1 done.
Start timer 2.
Timer 2 done.
Start timer 3.
Timer 3 done.

这个例子将花费三秒钟运行,timer将在没有任何并发的情况下一个接一个地运行。原因很简单: Tokio运行时无法将一个任务切换为另一个任务,因为这种切换只能发生在.await。因为 sleep_then_print 中没有.await,在运行时就不会发生切换。

然而,如果我们使用Tokiosleep 函数,它使用一个.await来睡眠,那么这个函数就会正常工作:

use tokio::time::Duration;

async fn sleep_then_print(timer: i32) {
    println!("Start timer {}.", timer);

    tokio::time::sleep(Duration::from_secs(1)).await;
//                                            ^ execution can be paused here

    println!("Timer {} done.", timer);
}

#[tokio::main]
async fn main() {
    // The join! macro lets you run multiple things concurrently.
    tokio::join!(
        sleep_then_print(1),
        sleep_then_print(2),
        sleep_then_print(3),
    );
}
Start timer 1.
Start timer 2.
Start timer 3.
Timer 1 done.
Timer 2 done.
Timer 3 done.

这段代码只需一秒钟就可以运行,并且如预期地在同一时间正确地运行所有三个函数。

要知道,事情并不总是这么明显。通过使用 tokio::join!,这三个任务都保证在同一个线程上运行,但如果你用 tokio::spawn 替换它,并使用一个多线程的运行时,你将能够同时运行多个阻塞任务直到用完线程。默认的 Tokio 运行时按照核心数生成线程,你的电脑通常会有8个CPU。这足以使你在本地测试时忽略这个问题,并在真实场景下运行代码时非常快地耗尽线程。

为了给出多少时间算的上过久的定义,一个好的经验法则是每个 .await 的时间间隔不超过10到100微秒。这取决于你正在编写的应用程序的类型。

如果我想要阻塞呢?

有时候我们只是想阻塞线程。这是完全正常的。有两个常见的原因:

  1. 昂贵的CPU-Bound(受限于CPU资源的)计算
  2. Synchronous IO. 同步IO

在这两种情况下,我们都在处理一个会在一段时间内阻止任务 .await 的操作。为了解决这个问题,我们必须将阻塞操作移动到Tokio线程池外部的线程中去。关于这一点有三种形式可以使用:

  1. 使用 tokio::task::spawn_blocking 函数
  2. 使用 rayon crate
  3. 通过 std::thread::spawn 创建一个专门的线程

让我们仔细浏览下每个解决方案,看看什么时候应该使用它。

spawn_blocking函数

Tokio运行时包含一个单独的线程池,专门用于运行阻塞函数,你可以使用 spawn_blocking 在其上运行任务。这个线程池的上限是大约500个线程,因此可以在这个线程池上进行大量阻塞操作。

由于线程池有如此多的线程,它最适合用于阻塞 I/O (如与文件系统交互)或使用阻塞数据库库(如diesel)。

线程池不适合昂贵的CPU-bound计算,因为线程池的线程数量比计算机上的 CPU核心数量多得多。当线程数等于CPU核心数时,CPU-bound的计算运行效率最高。也就是说,如果只需要运行几个CPU-bound的计算,我不会责怪你用 spawn_blocking 运行它们,因为这样做非常简单。

#[tokio::main]
async fn main() {
    // This is running on Tokio. We may not block here.

    let blocking_task = tokio::task::spawn_blocking(|| {
        // This is running on a thread where blocking is fine.
        println!("Inside spawn_blocking");
    });

    // We can wait for the blocking task like this:
    // If the blocking task panics, the unwrap below will propagate the
    // panic.
    blocking_task.await.unwrap();
}

rayon crate

rayon 是一个著名的库,它提供了一个专门用于昂贵的CPU-bound计算的线程池,你可以将它与Tokio一起用于此目的。与spawn_blocking不同,rayon的线程池的最大线程数量很少,这就是为什么它适合进行昂贵计算的原因。

我们将计算大列表的和作为例子,但是请注意在实践中,除非数组非常非常大,否则只计算和是足够便宜的,可以直接在Tokio运行。

使用rayon的主要危险是,在等待rayon完成时,必须注意不要阻塞线程。要做到这一点,可以将 rayon::spawntokio::sync::oneshot 结合起来,就像这样:

async fn parallel_sum(nums: Vec<i32>) -> i32 {
    let (send, recv) = tokio::sync::oneshot::channel();

    // Spawn a task on rayon.
    rayon::spawn(move || {
        // Perform an expensive computation.
        let mut sum = 0;
        for num in nums {
            sum += num;
        }

        // Send the result back to Tokio.
        let _ = send.send(sum);
    });

    // Wait for the rayon task.
    recv.await.expect("Panic in rayon::spawn")
}

#[tokio::main]
async fn main() {
    let nums = vec![1; 1024 * 1024];
    println!("{}", parallel_sum(nums).await);
}

上述例子使用了rayon的线程池来运行昂贵的操作。请注意,例子中对 parallel_sum 的每个调用只使用了rayon 线程池中的一个线程。如果你的应用程序中有很多对parallel_ sum的调用,那么这是有意义的,但是也可以使用rayon的并行迭代器在几个线程上计算和:

use rayon::prelude::*;

// Spawn a task on rayon.
rayon::spawn(move || {
    // Compute the sum on multiple threads.
    let sum = nums.par_iter().sum();

    // Send the result back to Tokio.
    let _ = send.send(sum);
});

注意,当使用并行迭代器时,仍然需要rayon::spawn调用,因为并行迭代器是阻塞的。

生成一个专用线程

如果阻塞操作是需要永久运行下去的,那么应该在专用线程上运行它。例如,考虑一个管理数据库连接的线程,他需要使用通道来接收要执行的操作。因为这个线程在循环中监听该通道,所以它永远不会退出。

在上述两个线程池中的任何一个上运行这样的任务都会是一个问题,因为它实际上从池中永久地带走了一个线程。一旦这样做了几次,线程池中就没有更多的线程了,所有其他阻塞任务都无法执行。

当然,如果你不在意每次启动一个新线程的成本,也可以使用专用线程来运行较短生命周期的任务。

总结

如果你忘了,以下是你需要记住的主要事情:

下面你将看到一张备忘单,上面列出了当你想要阻塞的时候可以使用的方法:

CPU-bound computation Synchronous IO Running forever
spawn_blocking 次优 OK No
rayon OK No No
专用线程 OK OK OK

最后,我建议阅读Tokio教程中关于 共享状态 的章节。本章将解释如何在异步代码中正确使用 std::sync::Mutex,并更深入地说明为什么即使锁定Mutex是阻塞的,也可以这样做。(剧透一下: 如果你的阻塞时间很短,它真的算阻塞吗?)

我还强烈推荐Tokio博客中的一篇文章: Reducing tail latencies with automatic cooperative task yielding

感谢 Chris Krychosnocl 阅读本文草稿并提供有用的建议。所有的错误都是我自己的。

内容出处