Tokio和Actor
这篇文章是关于直接用Tokio构建 Actor,而不使用任何 Actor 库,如Actix。这被证明是相当容易做到的,但有一些细节你应该注意。
-
把tokio::spawn的调用放在哪里
-
带有运行方法的结构体与裸函数
-
对 Actor 的处理
-
背压和有界通道
-
优雅关闭
本文概述的技术应该适用于任何执行器,但为了简单起见,我们将只讨论Tokio。本文与Tokio教程中的 spawning 和通道章节有一些重叠,我建议也阅读这些章节。
在我们讨论如何编写一个行为体之前,我们需要知道什么是 Actor。Actor 背后的基本想法是催生一个独立的任务,独立于程序的其他部分执行一些工作。通常,这些 Actor 通过使用消息传递通道与程序的其他部分进行通信。由于每个 Actor 都是独立运行的,使用它们设计的程序自然是并行的。
Actor 的一个常见用例是将你想要共享的某些资源的独占所有权分配给 Actor,然后让其他任务通过与 Actor 对话来间接访问该资源。例如,如果你正在实现一个聊天服务器,你可以为每个连接生成一个任务,还有一个主任务在其他任务之间路由聊天信息。这很有用,因为主任务可以避免处理网络IO,而连接任务可以专注于处理网络IO。
方案
Actor 被分成两部分:任务(task)和处理器(handle)。任务(task)是独立产生的Tokio任务,它实际上是执行 actor 的职责,而处理器(handle)是一个结构,允许你与任务进行通信。
让我们考虑一个简单的 Actor。Actor 内部存储一个计数器,用来获得某种唯一的ID。该Actor的基本结构如下:
use tokio::sync::{oneshot, mpsc};
struct MyActor {
receiver: mpsc::Receiver<ActorMessage>,
next_id: u32,
}
enum ActorMessage {
GetUniqueId {
respond_to: oneshot::Sender<u32>,
},
}
impl MyActor {
fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
MyActor {
receiver,
next_id: 0,
}
}
fn handle_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::GetUniqueId { respond_to } => {
self.next_id += 1;
// The `let _ =` ignores any errors when sending.
//
// This can happen if the `select!` macro is used
// to cancel waiting for the response.
let _ = respond_to.send(self.next_id);
},
}
}
}
async fn run_my_actor(mut actor: MyActor) {
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
}
现在我们有了 Actor 本身,我们还需要一个 Actor 的处理器(handle)。处理器(handle)是一个其他代码可以用来与 Actor 对话的对象,也是保持 Actor 活力的因素。
处理器(handle)看起来像这样:
#[derive(Clone)]
pub struct MyActorHandle {
sender: mpsc::Sender<ActorMessage>,
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver);
tokio::spawn(run_my_actor(actor));
Self { sender }
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId {
respond_to: send,
};
// Ignore send errors. If this send fails, so does the
// recv.await below. There's no reason to check for the
// same failure twice.
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
}
让我们仔细看一下这个例子中的不同部分。
ActorMessage。ActorMessage枚举定义了我们可以向actor发送的信息种类。通过使用一个枚举,我们可以有许多不同的消息类型,而且每个消息类型都可以有自己的参数集。我们通过使用 onehot 通道向发送者返回一个值,这是一个允许精确发送一个消息的消息传递通道。
在上面的例子中,我们在 actor 结构体的 handle_message
方法中对枚举进行匹配,但这并不是唯一的结构体方式。我们也可以在 run_my_actor
函数中对枚举进行匹配。这个匹配中的每个分支可以调用各种方法,比如 actor 对象上的 get_unique_id
。
发送消息时的错误。在处理通道时,并非所有的错误都是致命的。正因为如此,这个例子有时使用 let _ =
来忽略错误。一般来说,如果接收器被丢弃,在通道上的发送操作就会失败。
在我们的例子中,这种情况的第一个例子是在 actor 中我们对我们被发送的消息做出反应的那一行。如果接收者对操作的结果不再感兴趣,例如,发送消息的任务可能已经被杀死,这种情况就会发生。
Actor的关闭。我们可以通过查看接收消息的失败来检测 actor 何时应该关闭。在我们的例子中,这发生在下面的while循环中。
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
当所有发送到接收方的发送者都被放弃时,我们知道我们将永远不会收到另一个消息,因此可以关闭 actor 。当这种情况发生时,对 .recv()
的调用返回None
,由于它不符合 Some(msg)
的模式,while循环退出,该函数返回。
#[derive(Clone)]
MyActorHandle 结构体导出了 Clone 特性。它能这样做是因为 mpsc 意味着它是一个多生产者、单消费者的通道。由于该通道允许多个生产者,我们可以自由地克隆我们对 actor 的句柄,允许我们从多个地方与它对话。
结构体上的run方法
我在上面的例子中使用了一个没有定义在任何结构体上的顶级函数作为我们催生的 Tokio 任务,但是很多人发现直接在 MyActor 结构上定义一个run方法并催生它更自然。这当然也可以,但我之所以给出一个使用顶级函数的例子,是因为它更自然地引导你走向不会给你带来很多寿命问题的方法。
为了理解这个原因,我准备了一个不熟悉这个模式的人经常想出的例子:
impl MyActor {
fn run(&mut self) {
tokio::spawn(async move {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
});
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId {
respond_to: send,
};
// Ignore send errors. If this send fails, so does the
// recv.await below. There's no reason to check for the
// same failure twice.
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
}
... and no separate MyActorHandle
这个例子中的两个麻烦来源是:
-
tokio::spawn的调用是在run里面。
-
actor和handle是同一个结构体。
第一个麻烦导致了问题,因为 tokio::spawn
函数要求参数是 'static
。这意味着新任务必须拥有其内部的一切,这是一个问题,因为该方法借用了self,这意味着它不能将self的所有权交给新任务。
第二个麻烦导致了一些问题,因为 Rust 执行的是单一所有权原则。如果你把 actor 和 handle 合并成一个结构体,你(至少从编译器的角度来看)就会让每个 handle 访问 actor 的任务所拥有的字段。例如,next_id
整数应该只由 actor 的任务拥有,而不应该从任何处理器(handle)中直接访问。
这就是说,有一个版本是可行的。通过修复上述两个问题,你最终会得到以下结果:
impl MyActor {
async fn run(&mut self) {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
}
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver);
tokio::spawn(async move { actor.run().await });
Self { sender }
}
}
这与顶层函数的工作原理相同。注意,严格来说,可以写一个版本,其中tokio::spoon在里面运行,但我不推荐这种做法。
模式的变化
我在本文中用作例子的 actor 对消息使用了请求-响应的范式,但你不一定要这样做。在这一节中,我将给出一些灵感,告诉你如何改变这个想法。
不对消息进行响应
我用来介绍这个概念的例子包括了对通过 oneshot 发送的消息的响应,但你并不总是需要响应。在这些情况下,不在消息枚举中包括 oneshot 通道并无不妥。当通道有空间时,这甚至可以让你在消息被处理之前从发送中返回。
你还是应该确保使用一个有边界的通道,这样在通道中等待的消息数量就不会无限制地增长。在某些情况下,这意味着发送仍然需要是异步函数,以便处理发送操作需要在通道中等待更多空间的情况。
然而,有一个替代方法可以使发送成为异步方法。你可以使用 try_send
方法,并通过简单地杀死 actor 来处理发送失败。这在 actor 管理 TcpStream 的情况下很有用,它可以转发任何你发送到连接中的消息。在这种情况下,如果对 TcpStream 的写入速度跟不上,你可能想直接关闭连接。
一个Actor的多个处理器
如果一个 actor 需要从不同的地方发送消息,你可以使用多个处理器结构体来强制规定某些消息只能从某些地方发送。
这样做的时候,你仍然可以在内部重复使用同一个 mpsc 通道,用一个枚举来包含所有可能的消息类型。如果你确实想为此使用单独的通道,actor 可以使用tokio::select!
来同时接收多个通道。
loop {
tokio::select! {
Some(msg) = chan1.recv() => {
// handle msg
},
Some(msg) = chan2.recv() => {
// handle msg
},
else => break,
}
}
你需要小心处理通道关闭时的处理方式,因为在这种情况下他们的recv方法会立即返回None。幸运的是 tokio::select!
宏让你通过提供 Some(msg) 模式来处理这种情况。如果只有一个通道被关闭,该分支被禁用,另一个通道仍然被接收。当两个都关闭时,else 分支运行并使用 break 来退出循环。
Actor向其他Actor发送消息
让 actor向其他 actor 发送消息并无不妥。要做到这一点,你可以简单地给一个 actor 以其他 actor 的处理器。
如果你的 actor 形成了一个循环,你需要小心一点,因为通过抓住对方的处理器结构体,最后的发送者永远不会被放弃,从而防止关闭。为了处理这种情况,你可以让其中一个 actor 有两个处理器结构体,有独立的mpsc通道,但有一个 tokio::select!
,看起来像这样。
loop {
tokio::select! {
opt_msg = chan1.recv() => {
let msg = match opt_msg {
Some(msg) => msg,
None => break,
};
// handle msg
},
Some(msg) = chan2.recv() => {
// handle msg
},
}
}
如果 chan1 被关闭,上述循环将总是退出,即使 chan2 仍然开启。如果 chan2 是 actor 循环的一部分,这将打破循环,让 actor 关闭。
另一种方法是简单地在循环中的一个 actor 上调用abort。
多个actor共享一个处理器
就像每个 actor 可以有多个处理器一样,每个处理器也可以有多个actor。最常见的例子是在处理一个连接时,比如TcpStream,你通常会生成两个任务:一个用于读取,一个用于写入。当使用这种模式时,你让读写任务尽可能的简单–它们唯一的工作就是做IO。读取任务将只是把它收到的任何消息发送给其他任务,通常是另一个actor,而写入任务将只是把它收到的任何消息转发给连接。
这种模式非常有用,因为它隔离了与执行IO相关的复杂性,这意味着程序的其他部分可以假装向连接写东西是即时发生的,尽管实际的写东西是在 actor 处理消息的某个时间后发生的。
提防循环
我已经在 “Actor向其他Actor发送消息” 的标题下谈了一些关于循环的问题,在那里我讨论了形成循环的 actor 的关闭问题。然而,关闭并不是循环可能导致的唯一问题,因为循环也可能导致死锁,即循环中的每个 actor 都在等待下一个 actor 收到消息,但下一个角色不会收到该消息,直到它的下一个 actor 收到消息,如此循环。
为了避免这样的死锁,你必须确保不存在容量受限的通道循环。其原因是,有界通道的发送方法不会立即返回。发送方法总是立即返回的通道不属于这种循环,因为你不能在这样的发送中出现死锁。
请注意,这意味着 oneshot 通道不会成为死锁循环的一部分,因为它们的发送方法总是立即返回。还要注意的是,如果你使用 try_send
而不是 send
来发送消息,这也不能成为死锁循环的一部分。
感谢 matklad 指出了循环和死锁的问题。