分帧

Tokio教程之分帧

https://tokio.rs/tokio/tutorial/framing

我们现在将应用我们刚刚学到的关于I/O的知识,实现 Mini-Redis 的分帧层。成帧是将一个字节流转换为一个分帧流的过程。一个帧是两个对等体之间传输的数据单位。Redis协议的帧定义如下。

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

请注意,该分帧只由数据组成,没有任何语义。命令的解析和执行发生在更高的层次。

对于HTTP,分帧可能看起来像:

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

为了实现 Mini-Redis 的分帧,我们将实现一个 Connection 结构,它包裹着一个TcpStream并读写 mini_redis::Frame 值。

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... other fields here
}

impl Connection {
    /// Read a frame from the connection.
    /// 
    /// Returns `None` if EOF is reached
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // implementation here
    }

    /// Write a frame to the connection.
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // implementation here
    }
}

带缓冲的读取

read_frame 方法在返回之前会等待一整帧的接收。对 TcpStream::read() 的一次调用可以返回一个任意数量的数据。它可能包含一整个帧,一个部分帧,或者多个帧。如果收到一个部分帧,数据被缓冲,并从套接字中读取更多数据。如果收到多个帧,则返回第一个帧,其余的数据被缓冲,直到下次调用 read_frame。

为了实现这一点,Connection需要一个读取缓冲区字段。数据从套接字中读入读取缓冲区。当一个帧被解析后,相应的数据就会从缓冲区中移除。

我们将使用BytesMut作为缓冲区类型。这是Bytes的一个可变版本。

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

以及Connection上的read_frame()函数:

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Ensure the buffer has capacity
        if self.buffer.len() == self.cursor {
            // Grow the buffer
            self.buffer.resize(self.cursor * 2, 0);
        }

        // Read into the buffer, tracking the number
        // of bytes read
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Update our cursor
            self.cursor += n;
        }
    }
}

在处理字节数组和 read 时,我们还必须维护一个游标,跟踪有多少数据被缓冲。我们必须确保将缓冲区的空部分传递给 read()。否则,我们会覆盖缓冲区的数据。如果我们的缓冲区被填满了,我们必须增加缓冲区,以便继续读取。在 parse_frame() 中(不包括),我们需要解析 self.buffer[..self.cursor] 所包含的数据。

因为将字节数组与游标配对是非常常见的,bytes crate提供了一个代表字节数组和游标的抽象概念。Buf 特性是由可以读取数据的类型实现的。BufMut 特性是由可以写入数据的类型实现的。当把一个 T: BufMut 传递给 read_buf() 时,缓冲区的内部游标会被 read_buf 自动更新。正因为如此,在我们版本的read_frame中,我们不需要管理我们自己的游标。

此外,当使用 Vec<u8> 时,缓冲区必须被初始化。 vec![0; 4096] 分配了一个4096字节的数组,并将0写入每个条目。当调整缓冲区的大小时,新的容量也必须用零来初始化。初始化过程是不没有开销的。当使用 BytesMutBufMut 时,容量是不初始化的。BytesMut 的抽象防止我们读取未初始化的内存。这让我们避免了初始化的步骤。

解析

现在,让我们看一下parse_frame()函数。解析工作分两步进行。

  1. 确保一个完整的帧被缓冲,并找到该帧的结束索引。
  2. 解析该帧。

mini-redis crate 为我们提供了一个用于这两个步骤的函数:

  1. Frame::check
  2. 2Frame::parse

我们还将重用 Buf 的抽象来帮助。 Buf 被传入 Frame::check。当检查函数遍历传入的缓冲区时,内部游标将被推进。当check返回时,缓冲区的内部游标指向帧的末端。

对于Buf类型,我们将使用 std::io::Cursor<&[u8]>

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Create the `T: Buf` type.
    let mut buf = Cursor::new(&self.buffer[..]);

    // Check whether a full frame is available
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Get the byte length of the frame
            let len = buf.position() as usize;

            // Reset the internal cursor for the
            // call to `parse`.
            buf.set_position(0);

            // Parse the frame
            let frame = Frame::parse(&mut buf)?;

            // Discard the frame from the buffer
            self.buffer.advance(len);

            // Return the frame to the caller.
            Ok(Some(frame))
        }
        // Not enough data has been buffered
        Err(Incomplete) => Ok(None),
        // An error was encountered
        Err(e) => Err(e.into()),
    }
}

完整的 Frame::check 函数可以在这里找到。我们将不涉及它的全部内容。

需要注意的是,Buf的 “字节迭代器” 风格的API被使用。这些获取数据并推进内部游标。例如,为了解析一个帧,第一个字节被检查以确定该帧的类型。使用的函数是 Buf::get_u8。这个函数获取当前光标位置的字节并使光标前进一个。

在Buf特性上还有更多有用的方法。查看API文档以了解更多细节。

带缓冲的写入

分帧API的另一半是 write_frame(frame) 函数。这个函数将整个帧写到套接字中。为了尽量减少 write 系统调用,写将被缓冲。一个写缓冲区被维护,帧在被写入套接字之前被编码到这个缓冲区。然而,与 read_frame() 不同的是,在写入套接字之前,整个帧并不总是被缓冲到一个字节数组中。

考虑一个批量流帧。被写入的值是 Frame::Bulk(Bytes)。散装帧的线格式是一个帧头,它由$字符和以字节为单位的数据长度组成。帧的大部分是Bytes值的内容。如果数据很大,把它复制到一个中间缓冲区将是很昂贵的。

为了实现缓冲写入,我们将使用 BufWriter 结构。这个结构被初始化为一个 T: AsyncWrite 并实现 AsyncWrite 本身。当在 BufWriter 上调用写时,写不会直接进入内部写器,而是进入一个缓冲区。当缓冲区满的时候,内容会被刷到内部写入器上,内部缓冲区被清空。还有一些优化措施,允许在某些情况下绕过缓冲区。

作为教程的一部分,我们将不尝试完整地实现 write_frame()。请看这里的完整实现。

首先,Connection结构被更新。

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

接下来,write_frame()被实现:

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

这里使用的函数是由 AsyncWriteExt 提供的。 它们在 TcpStream 上也是可用的,但是在没有中间缓冲区的情况下发出单字节的写入是不可取的。

  • write_u8 向写程序写一个字节。
  • write_all 将整个片断写入写入器。
  • write_decimal 是由 mini-redis 实现的。

该函数以调用 self.stream.flush().await 结束。因为 BufWriter 将写入的数据存储在一个中间缓冲区中,对写入的调用并不能保证数据被写入套接字。在返回之前,我们希望帧被写到套接字中。对 flush() 的调用将缓冲区中的任何数据写到套接字。

另一个选择是不在 write_frame() 中调用 flush()。相反,在 Connection 上提供一个 flush() 函数。这将允许调用者在写缓冲区中写入多个小帧的队列,然后用一个写系统调用将它们全部写入套接字。这样做会使 Connection API 复杂化。简化是 Mini-Redis 的目标之一,所以我们决定将 flush().await 调用包含在 fn write_frame() 中。