I/O

Tokio教程之I/O

https://tokio.rs/tokio/tutorial/io

Tokio中的 I/O 操作方式与 std 中大致相同,但是是异步的。有一个特质用于读取(AsyncRead)和一个特质用于写入(AsyncWrite)。特定的类型根据情况实现这些特性(TcpStream、File、Stdout)。AsyncRead 和 AsyncWrite 也由一些数据结构实现,如 Vec<u8>&[u8]。这允许在期望有读写器的地方使用字节数组。

本页将介绍 Tokio 的基本 I/O 读取和写入,并通过几个例子进行说明。下一页将介绍一个更高级的 I/O 例子。

AsyncRead 和 AsyncWrite

这两个特性提供了异步读取和写入字节流的设施。这些特质上的方法通常不被直接调用,类似于你不手动调用 Future 特质的 poll 方法。相反,你将通过AsyncReadExtAsyncWriteExt 提供的实用方法使用它们。

让我们简单地看看这些方法中的几个。所有这些函数都是异步的,必须与 .await 一起使用。

async fn read()

AsyncReadExt::read 提供了一个向缓冲区读取数据的异步方法,返回读取的字节数。

注意:当 read() 返回 Ok(0) 时,这表示流已经关闭。任何对 read() 的进一步调用将立即以 Ok(0) 完成。对于TcpStream实例,这意味着套接字的读取部分被关闭。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end 从流中读取所有字节直到EOF。

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // read the whole file
    f.read_to_end(&mut buffer).await?;
    Ok(())
}

async fn write()

AsyncWriteExt::write 将缓冲区写入写入器,返回写入的字节数。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // Writes some prefix of the byte string, but not necessarily all of it.
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

async fn write_all()

AsyncWriteExt::write_all 将整个缓冲区写入写入器。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut buffer = File::create("foo.txt").await?;

    buffer.write_all(b"some bytes").await?;
    Ok(())
}

这两个特性都包括许多其他有用的方法。请参阅API文档以获得一个全面的列表。

辅助函数

此外,就像std一样,tokio::io模块包含一些有用的实用函数,以及用于处理标准输入、标准输出和标准错误的API。例如,tokio::io::copy 异步地将一个 reader 的全部内容复制到一个 writer。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}

注意,这使用了字节数组也实现了 AsyncRead 这一事实。

Echo server

让我们练习做一些异步I/O。我们将编写一个 echo 服务器。

echo服务器绑定了一个 TcpListener 并在一个循环中接受入站连接。对于每个入站连接,数据被从套接字中读取并立即写回套接字。客户端向服务器发送数据,并接收完全相同的数据回来。

我们将使用稍微不同的策略实现两次 echo 服务器。

使用io::copy()

这是一个TCP服务器,需要一个接受循环。一个新的任务被生成以处理每个被接受的套接字。

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Copy data here
        });
    }
}

正如前面所看到的,这个实用程序需要一个 reader 和一个 writer,并将数据从一个复制到另一个。然而,我们只有一个单一的TcpStream。这个单一的值同时实现了 AsyncRead 和 AsyncWrite。因为 io::copy 对 reader 和 writer 两者都要求&mut,所以socket不能用于两个参数。

// This fails to compile
io::copy(&mut socket, &mut socket).await

拆分reader + writer

为了解决这个问题,我们必须把套接字分成一个reader句柄和一个writer句柄。拆分读写器组合的最佳方法取决于具体的类型。

任何读写器类型都可以使用 io::split 函数进行分割。这个函数接受一个单一的值,并返回单独的 reader 和 writer 句柄。这两个句柄可以独立使用,包括来自不同的任务。

例如,echo客户端可以这样处理并发的读和写。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Write data in the background
    let write_task = tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Sometimes, the rust type inferencer needs
        // a little help
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];

    loop {
        let n = rd.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        println!("GOT {:?}", &buf[..n]);
    }

    Ok(())
}

因为 io::split 支持任何实现 AsyncRead + AsyncWrite 并返回独立句柄的值,在内部 io::split 使用一个Arc和一个Mutex。这种开销可以通过TcpStream来避免。TcpStream提供了两个专门的分割函数。

TcpStream::split 接收流的 reference ,并返回 reader 和 writer 句柄。因为使用了reference,所以这两个句柄必须留在 split() 被调用的同一个任务上。这种专门的 split 是零成本的。不需要Arc或Mutex。TcpStream 还提供了 into_split,它支持可以跨任务移动的句柄,但只需要一个Arc。

因为 io::copy() 是在拥有TcpStream的同一个任务中调用的,我们可以使用 TcpStream::split。处理 echo 逻辑的任务变成了。

tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();
    
    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});

手动复制

现在让我们来看看我们如何通过手动复制数据来编写 echo 服务器。为了做到这一点,我们使用 AsyncReadExt::readAsyncWriteExt::write_all

完整的 echo 服务器如下:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await.unwrap();

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => return,
                    Ok(n) => {
                        // Copy the data back to socket
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // Unexpected socket error. There isn't much we can
                            // do here so just stop processing.
                            return;
                        }
                    }
                    Err(_) => {
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
}

让我们把它分解一下。首先,由于使用了AsyncRead 和 AsyncWrite 工具,扩展特性必须被带入范围。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

分配缓冲区

该策略是将一些数据从套接字中读入一个缓冲区,然后将缓冲区的内容写回套接字。

let mut buf = vec![0; 1024];

堆栈缓冲区被明确地避免了。回顾前面,我们注意到,所有跨调用 .await 的任务数据都必须由任务存储。在这种情况下,buf 被用于跨越 .await 的调用。所有的任务数据被存储在一个单一的分配中。你可以把它看作是一个枚举,每个变量都是需要为特定的 .await 调用存储的数据。

如果缓冲区由堆栈数组表示,每个接受的套接字所产生的任务的内部结构可能看起来像。

struct Task {
    // internal task fields here
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}

如果使用堆栈数组作为缓冲区类型,它将被内联存储在任务结构中。这将使任务结构变得非常大。此外,缓冲区的大小通常是以页为单位的。这将反过来使任务成为一个尴尬的大小:$page-size + a-few-bytes。

编译器对异步块的布局的优化比基本枚举更进一步。在实践中,变量不会像枚举所要求的那样在变体之间移动。然而,任务结构的大小至少和最大的变量一样大。

正因为如此,为缓冲区使用专门的分配通常更有效率。

处理EOF

当TCP流的读取部分被关闭时,对 read() 的调用返回 Ok(0) 。在这一点上退出读循环是很重要的。忘记在EOF时退出读循环是一个常见的错误来源。

loop {
    match socket.read(&mut buf).await {
        // Return value of `Ok(0)` signifies that the remote has
        // closed
        Ok(0) => return,
        // ... other cases handled here
    }
}

忘记从读循环中断开,通常会导致100%的CPU无限循环情况。由于套接字被关闭,socket.read()立即返回。循环就会永远重复下去。