I/O
Tokio中的 I/O 操作方式与 std 中大致相同,但是是异步的。有一个特质用于读取(AsyncRead)和一个特质用于写入(AsyncWrite)。特定的类型根据情况实现这些特性(TcpStream、File、Stdout)。AsyncRead 和 AsyncWrite 也由一些数据结构实现,如 Vec<u8>
和 &[u8]
。这允许在期望有读写器的地方使用字节数组。
本页将介绍 Tokio 的基本 I/O 读取和写入,并通过几个例子进行说明。下一页将介绍一个更高级的 I/O 例子。
AsyncRead 和 AsyncWrite
这两个特性提供了异步读取和写入字节流的设施。这些特质上的方法通常不被直接调用,类似于你不手动调用 Future 特质的 poll 方法。相反,你将通过AsyncReadExt
和 AsyncWriteExt
提供的实用方法使用它们。
让我们简单地看看这些方法中的几个。所有这些函数都是异步的,必须与 .await
一起使用。
async fn read()
AsyncReadExt::read
提供了一个向缓冲区读取数据的异步方法,返回读取的字节数。
注意:当 read() 返回 Ok(0) 时,这表示流已经关闭。任何对 read() 的进一步调用将立即以 Ok(0) 完成。对于TcpStream实例,这意味着套接字的读取部分被关闭。
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
async fn read_to_end()
AsyncReadExt::read_to_end
从流中读取所有字节直到EOF。
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}
async fn write()
AsyncWriteExt::write 将缓冲区写入写入器,返回写入的字节数。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
async fn write_all()
AsyncWriteExt::write_all 将整个缓冲区写入写入器。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = File::create("foo.txt").await?;
buffer.write_all(b"some bytes").await?;
Ok(())
}
这两个特性都包括许多其他有用的方法。请参阅API文档以获得一个全面的列表。
辅助函数
此外,就像std一样,tokio::io模块包含一些有用的实用函数,以及用于处理标准输入、标准输出和标准错误的API。例如,tokio::io::copy 异步地将一个 reader 的全部内容复制到一个 writer。
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
注意,这使用了字节数组也实现了 AsyncRead 这一事实。
Echo server
让我们练习做一些异步I/O。我们将编写一个 echo 服务器。
echo服务器绑定了一个 TcpListener 并在一个循环中接受入站连接。对于每个入站连接,数据被从套接字中读取并立即写回套接字。客户端向服务器发送数据,并接收完全相同的数据回来。
我们将使用稍微不同的策略实现两次 echo 服务器。
使用io::copy()
这是一个TCP服务器,需要一个接受循环。一个新的任务被生成以处理每个被接受的套接字。
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Copy data here
});
}
}
正如前面所看到的,这个实用程序需要一个 reader 和一个 writer,并将数据从一个复制到另一个。然而,我们只有一个单一的TcpStream。这个单一的值同时实现了 AsyncRead 和 AsyncWrite。因为 io::copy
对 reader 和 writer 两者都要求&mut,所以socket不能用于两个参数。
// This fails to compile
io::copy(&mut socket, &mut socket).await
拆分reader + writer
为了解决这个问题,我们必须把套接字分成一个reader句柄和一个writer句柄。拆分读写器组合的最佳方法取决于具体的类型。
任何读写器类型都可以使用 io::split
函数进行分割。这个函数接受一个单一的值,并返回单独的 reader 和 writer 句柄。这两个句柄可以独立使用,包括来自不同的任务。
例如,echo客户端可以这样处理并发的读和写。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Write data in the background
let write_task = tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Sometimes, the rust type inferencer needs
// a little help
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
因为 io::split
支持任何实现 AsyncRead + AsyncWrite
并返回独立句柄的值,在内部 io::split
使用一个Arc和一个Mutex。这种开销可以通过TcpStream来避免。TcpStream提供了两个专门的分割函数。
TcpStream::split
接收流的 reference ,并返回 reader 和 writer 句柄。因为使用了reference,所以这两个句柄必须留在 split() 被调用的同一个任务上。这种专门的 split 是零成本的。不需要Arc或Mutex。TcpStream 还提供了 into_split,它支持可以跨任务移动的句柄,但只需要一个Arc。
因为 io::copy()
是在拥有TcpStream的同一个任务中调用的,我们可以使用 TcpStream::split
。处理 echo 逻辑的任务变成了。
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});
手动复制
现在让我们来看看我们如何通过手动复制数据来编写 echo 服务器。为了做到这一点,我们使用 AsyncReadExt::read
和 AsyncWriteExt::write_all
。
完整的 echo 服务器如下:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
// Unexpected socket error. There isn't much we can
// do here so just stop processing.
return;
}
}
Err(_) => {
// Unexpected socket error. There isn't much we can do
// here so just stop processing.
return;
}
}
}
});
}
}
让我们把它分解一下。首先,由于使用了AsyncRead 和 AsyncWrite 工具,扩展特性必须被带入范围。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
分配缓冲区
该策略是将一些数据从套接字中读入一个缓冲区,然后将缓冲区的内容写回套接字。
let mut buf = vec![0; 1024];
堆栈缓冲区被明确地避免了。回顾前面,我们注意到,所有跨调用 .await
的任务数据都必须由任务存储。在这种情况下,buf 被用于跨越 .await
的调用。所有的任务数据被存储在一个单一的分配中。你可以把它看作是一个枚举,每个变量都是需要为特定的 .await
调用存储的数据。
如果缓冲区由堆栈数组表示,每个接受的套接字所产生的任务的内部结构可能看起来像。
struct Task {
// internal task fields here
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}
}
}
如果使用堆栈数组作为缓冲区类型,它将被内联存储在任务结构中。这将使任务结构变得非常大。此外,缓冲区的大小通常是以页为单位的。这将反过来使任务成为一个尴尬的大小:$page-size + a-few-bytes。
编译器对异步块的布局的优化比基本枚举更进一步。在实践中,变量不会像枚举所要求的那样在变体之间移动。然而,任务结构的大小至少和最大的变量一样大。
正因为如此,为缓冲区使用专门的分配通常更有效率。
处理EOF
当TCP流的读取部分被关闭时,对 read() 的调用返回 Ok(0) 。在这一点上退出读循环是很重要的。忘记在EOF时退出读循环是一个常见的错误来源。
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
// ... other cases handled here
}
}
忘记从读循环中断开,通常会导致100%的CPU无限循环情况。由于套接字被关闭,socket.read()立即返回。循环就会永远重复下去。