这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Tokio技术栈

介绍Tokio的技术栈:Runtime/Bytes/Mio/Tower/Tonic/Hyper/Tracing

1 - Tokio Runtime

Tokio技术栈之Runtime

2 - Tokio Bytes

Tokio技术栈之Bytes

2.1 - Tokio Bytes概述

Tokio Bytes概述

crate 介绍

Tokio Bytes crate 为处理 byte 的工作提供抽象。

bytes crate 提供了一个高效的字节缓冲结构(Bytes)和用于处理缓 buffer 实现的特征(Buf, BufMut)。

资料:

Bytes

Bytes 是一个高效的容器,用于存储和操作连续的内存片段。它主要用于网络代码中,但也可以应用于其他地方。

Bytes 通过允许多个 Bytes 对象指向相同的底层内存来促进零拷贝(zero-copy)网络编程。这是通过使用一个引用计数来跟踪内存何时不再需要并可以被释放来进行管理的。

Bytes 句柄可以直接从现有的字节存储(如 &[u8]Vec<u8> )中创建,但通常先使用 BytesMut 并写入。比如说。

use bytes::{BytesMut, BufMut};

let mut buf = BytesMut::with_capacity(1024);
buf.put(&b"hello world"[..]);
buf.put_u16(1234);

let a = buf.split();
assert_eq!(a, b"hello world\x04\xD2"[..]);

buf.put(&b"goodbye world"[..]);

let b = buf.split();
assert_eq!(b, b"goodbye world"[..]);

assert_eq!(buf.capacity(), 998);

在上面的例子中,只分配了一个 1024 的缓冲区。句柄 a 和 b 将共享底层的 buffer,并维护跟踪进入句柄所代表的缓冲区的视图的索引。

Buf, BufMut

这两个特征提供了对缓冲区的读和写访问。底层存储可能是也可能不是连续的内存。例如,Bytes 是一个保证连续内存的缓冲区,但是 rope 将字节存储在不连续的块中。Buf 和 BufMut 维护跟踪底层字节存储的当前位置的游标。当字节被读取或写入时,游标被推进。

读和写的关系

乍一看,BufBufMut 似乎与 std::io::Readstd::io::Write 在功能上有重合。然而,它们有不同的用途。buffer 是作为参数提供给 Read::readWrite::write 的值。然后,ReadWrite可能会执行一个系统调用,这有可能会失败。对BufBufMut的操作是不是失败的。

crate的内容

结构体:

  • Bytes :廉价的可克隆和可切分的连续内存块。
  • BytesMut: 对连续的内存片的唯一引用。

模块:

  • buf: 用于处理缓冲区的实用程序。

trait:

  • Buf : 从 buffer 中读取字节。
  • BufMut : 一种用于提供对 bytes 连续写访问的特性

2.2 - Bytes 结构体

Tokio Bytes 结构体,廉价的可克隆和可切分的连续内存块。

https://docs.rs/bytes/1.0.1/bytes/struct.Bytes.html

文档

介绍

廉价的可克隆和可切分的连续内存块。

Bytes 是一个高效的容器,用于存储和操作连续的内存片段。它主要用于网络代码中,但也可以应用于其他地方。

通过允许多个 Bytes 对象指向相同的底层内存,Bytes 有助于零拷贝(zero-copy )网络编程。

Bytes 没有单一的实现。它是一个接口,其确切的行为是通过 Bytes 的几个底层实现中的动态调度来实现的。

所有 Bytes 的实现必须满足以下要求:

  • 它们是廉价的可克隆的,因此可以在无限多的组件之间共享,例如通过修改引用计数。
  • 实例可以被切分以引用原始缓冲区的一个子集。
use bytes::Bytes;

let mut mem = Bytes::from("Hello world");
let a = mem.slice(0..5);

assert_eq!(a, "Hello");

let b = mem.split_to(6);

assert_eq!(mem, "world");
assert_eq!(b, "Hello ");

内存布局

Bytes 结构本身相当小,仅限于4个usize字段,用于跟踪 Bytes 句柄可以访问的底层内存段的信息。

Bytes 同时保留了一个指向包含完整内存片的共享状态的指针和一个指向手柄可见区域的起始点的指针。字节还跟踪其进入内存的长度。

共享

Bytes 包含一个vtable,它允许 Bytes 的实现者详细定义共享/克隆的实现方式。当 Bytes::clone() 被调用时,Bytes 将调用 vtable 函数来克隆后备存储,以便在多个Bytes实例之间共享它。

对于指向恒定内存的 Bytes 实现(例如,通过 Bytes::from_static() 创建),克隆实现将是无消耗的。

对于指向引用计数的共享存储的字节实现(例如 Arc<[u8]>),共享将通过增加引用计数来实现。

由于这种机制,多个 Bytes 实例可以指向同一个共享内存区域。每个 Bytes 实例可以指向该内存区域中的不同部分,而且 Bytes 实例可能有也可能没有对内存的重叠视图。

下图直观地展示了这样一个场景:2个 Bytes 实例利用基于 Arc 的后端存储,并提供对不同视图的访问。

   Arc ptrs                   +---------+
   ________________________ / | Bytes 2 |
  /                           +---------+
 /          +-----------+     |         |
|_________/ |  Bytes 1  |     |         |
|           +-----------+     |         |
|           |           | ___/ data     | tail
|      data |      tail |/              |
v           v           v               v
+-----+---------------------------------+-----+
| Arc |     |           |               |     |
+-----+---------------------------------+-----+

源码

结构体定义

pub struct Bytes {
    ptr: *const u8,
    len: usize,
    // inlined "trait object"
    data: AtomicPtr<()>,
    vtable: &'static Vtable,
}

pub(crate) struct Vtable {
    /// fn(data, ptr, len)
    pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes,
    /// fn(data, ptr, len)
    pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}

构造方法

Bytes::new() 方法创建一个新的空Bytes。这将不会分配,返回的字节句柄将是空的。

pub const fn new() -> Bytes {
    // Make it a named const to work around
    // "unsizing casts are not allowed in const fn"
    const EMPTY: &[u8] = &[];
    Bytes::from_static(EMPTY)
}

Bytes::from_static 从一个静态片断创建一个新的Bytes。返回的 Bytes 将直接指向静态片断。无需分配或复制。

pub const fn from_static(bytes: &'static [u8]) -> Bytes {
    Bytes {
        ptr: bytes.as_ptr(),
        len: bytes.len(),
        data: AtomicPtr::new(ptr::null_mut()),
        vtable: &STATIC_VTABLE,
    }
}

Bytes::copy_from_slice 通过复制,从 slice 中创建 Bytes 实例。

pub fn copy_from_slice(data: &[u8]) -> Self {
    data.to_vec().into()
}

slice()

slice()

返回所提供范围内的自己的一个片断。

这将增加底层内存的引用计数,并返回一个新的字节句柄,设置为该分片。

这个操作是O(1)。

use bytes::Bytes;

let a = Bytes::from(&b"hello world"[..]);
let b = a.slice(2..5);

assert_eq!(&b[..], b"llo");

代码实现:

pub fn slice(&self, range: impl RangeBounds<usize>) -> Bytes {
    use core::ops::Bound;

    let len = self.len();

    let begin = match range.start_bound() {
        Bound::Included(&n) => n,
        Bound::Excluded(&n) => n + 1,
        Bound::Unbounded => 0,
    };

    let end = match range.end_bound() {
        Bound::Included(&n) => n.checked_add(1).expect("out of range"),
        Bound::Excluded(&n) => n,
        Bound::Unbounded => len,
    };

    assert!(
        begin <= end,
        "range start must not be greater than end: {:?} <= {:?}",
        begin,
        end,
    );
    assert!(
        end <= len,
        "range end out of bounds: {:?} <= {:?}",
        end,
        len,
    );

    if end == begin {
        return Bytes::new();
    }

    // clone一个新 Bytes
    let mut ret = self.clone();

    // 新 Bytes 的长度为 end - begin
    ret.len = end - begin;
    // 新 Bytes 的 ptr 指向原来的 prt + 偏移量begin
    ret.ptr = unsafe { ret.ptr.offset(begin as isize) };

    ret
}

slice_ref()

返回一个相当于给定子集的自我片断。

当用其他工具处理 Bytes 缓冲区时,通常会得到一个 &[u8],它实际上是 Bytes 的一个slice,也就是它的一个子集。这个函数将 &[u8] 变成另一个 Bytes,就像我们调用 self.slice() 的偏移量来对应子集一样。

这个操作是O(1)。

use bytes::Bytes;

let bytes = Bytes::from(&b"012345678"[..]);
let as_slice = bytes.as_ref();
let subset = &as_slice[2..6];
let subslice = bytes.slice_ref(&subset);
assert_eq!(&subslice[..], b"2345");

代码实现:

pub fn slice_ref(&self, subset: &[u8]) -> Bytes {
    // Empty slice and empty Bytes may have their pointers reset
    // so explicitly allow empty slice to be a subslice of any slice.
    if subset.is_empty() {
        return Bytes::new();
    }

    let bytes_p = self.as_ptr() as usize;
    let bytes_len = self.len();

    let sub_p = subset.as_ptr() as usize;
    let sub_len = subset.len();

    assert!(
        sub_p >= bytes_p,
        "subset pointer ({:p}) is smaller than self pointer ({:p})",
        sub_p as *const u8,
        bytes_p as *const u8,
    );
    assert!(
        sub_p + sub_len <= bytes_p + bytes_len,
        "subset is out of bounds: self = ({:p}, {}), subset = ({:p}, {})",
        bytes_p as *const u8,
        bytes_len,
        sub_p as *const u8,
        sub_len,
    );

    let sub_offset = sub_p - bytes_p;

    self.slice(sub_offset..(sub_offset + sub_len))
}

split()

split_off()

在给定的索引处将字节分成两个。

之后self包含元素 [0, at),而返回的Bytes包含元素 [at, len)

这是一个O(1)操作,只是增加了引用计数并设置了一些索引。

use bytes::Bytes;

let mut a = Bytes::from(&b"hello world"[..]);
let b = a.split_off(5);

assert_eq!(&a[..], b"hello");
assert_eq!(&b[..], b" world");

代码实现:

pub fn split_off(&mut self, at: usize) -> Bytes {
    assert!(
        at <= self.len(),
        "split_off out of bounds: {:?} <= {:?}",
        at,
        self.len(),
    );

    if at == self.len() {
        return Bytes::new();
    }

    if at == 0 {
        return mem::replace(self, Bytes::new());
    }

    let mut ret = self.clone();

    self.len = at;

    unsafe { ret.inc_start(at) };

    ret
}

split_to()

在给定的索引处将字节分成两个。

之后self包含元素 [at, len),而返回的 Bytes包含元素 [0, at)

这是一个O(1)操作,只是增加了引用计数和设置了一些索引。

use bytes::Bytes;

let mut a = Bytes::from(&b"hello world"[..]);
let b = a.split_to(5);

assert_eq!(&a[..], b" world");
assert_eq!(&b[..], b"hello");

代码实现:

pub fn split_to(&mut self, at: usize) -> Bytes {
    assert!(
        at <= self.len(),
        "split_to out of bounds: {:?} <= {:?}",
        at,
        self.len(),
    );

    if at == self.len() {
        return mem::replace(self, Bytes::new());
    }

    if at == 0 {
        return Bytes::new();
    }

    let mut ret = self.clone();

    unsafe { self.inc_start(at) };

    ret.len = at;
    ret
}

truncate()

缩短缓冲区的长度,保留第一个len字节,放弃其余的字节。

如果len大于缓冲区的当前长度,则没有影响。

split_off 方法可以模拟Truncate,但是这导致多余的字节被返回而不是丢弃。

use bytes::Bytes;

let mut buf = Bytes::from(&b"hello world"[..]);
buf.truncate(5);
assert_eq!(buf, b"hello"[..]);

代码实现:

pub fn truncate(&mut self, len: usize) {
    if len < self.len {
        // The Vec "promotable" vtables do not store the capacity,
        // so we cannot truncate while using this repr. We *have* to
        // promote using `split_off` so the capacity can be stored.
        if self.vtable as *const Vtable == &PROMOTABLE_EVEN_VTABLE
            || self.vtable as *const Vtable == &PROMOTABLE_ODD_VTABLE
        {
            drop(self.split_off(len));
        } else {
            self.len = len;
        }
    }
}

clear()

清除缓冲区,删除所有数据。

use bytes::Bytes;

let mut buf = Bytes::from(&b"hello world"[..]);
buf.clear();
assert!(buf.is_empty());

代码实现:

pub fn clear(&mut self) {
    self.truncate(0);
}

2.3 - BytesMul 结构体

Tokio BytesMul 结构体,对连续的内存片段的唯一引用

https://docs.rs/bytes/1.0.1/bytes/struct.BytesMut.html

文档

介绍

对连续的内存片段的唯一引用。

BytesMut 代表了对一个潜在的共享内存区域的唯一视图。鉴于唯一性的保证,BytesMut句柄的拥有者能够改变内存。

BytesMut 可以被认为是包含一个 Buf:Arc<Vec<u8>>,一个Buf偏移量,一个分片长度,以及一个保证,即同一 Buf 的其他 BytesMut 不会与它的分片重叠。这个保证意味着不需要写锁。

增长: BytesMut 的 BufMut 实现将在必要时隐式地增长其缓冲区。然而,在一系列插入之前明确地预留所需的空间会更有效率。

use bytes::{BytesMut, BufMut};

let mut buf = BytesMut::with_capacity(64);

buf.put_u8(b'h');
buf.put_u8(b'e');
buf.put(&b"llo"[..]);

assert_eq!(&buf[..], b"hello");

// Freeze the buffer so that it can be shared
let a = buf.freeze();

// This does not allocate, instead `b` points to the same memory.
let b = a.clone();

assert_eq!(&a[..], b"hello");
assert_eq!(&b[..], b"hello");

源码

结构体定义

pub struct BytesMut {
    ptr: NonNull<u8>,
    len: usize,
    cap: usize,
    data: *mut Shared,
}

构造函数

with_capacity()

创建一个具有指定容量的新的BytesMut。

返回的BytesMut将能够容纳至少容量的字节而不需要重新分配。

值得注意的是,这个函数没有指定返回的BytesMut的长度,而只是指定了容量。

use bytes::{BytesMut, BufMut};

let mut bytes = BytesMut::with_capacity(64);

// `bytes` contains no data, even though there is capacity
assert_eq!(bytes.len(), 0);

bytes.put(&b"hello world"[..]);

assert_eq!(&bytes[..], b"hello world");

代码实现:

pub fn with_capacity(capacity: usize) -> BytesMut {
    BytesMut::from_vec(Vec::with_capacity(capacity))
}

pub(crate) fn from_vec(mut vec: Vec<u8>) -> BytesMut {
  let ptr = vptr(vec.as_mut_ptr());
  let len = vec.len();
  let cap = vec.capacity();
  mem::forget(vec);

  let original_capacity_repr = original_capacity_to_repr(cap);
  let data = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC;

  BytesMut {
    ptr,
    len,
    cap,
    data: data as *mut _,
  }
}

new()

创建一个新的具有默认容量的BytesMut。

结果对象的长度为0,容量未指定。该函数不进行分配。

use bytes::{BytesMut, BufMut};

let mut bytes = BytesMut::new();

assert_eq!(0, bytes.len());

bytes.reserve(2);
bytes.put_slice(b"xy");

assert_eq!(&b"xy"[..], &bytes[..]);

代码实现:

pub fn new() -> BytesMut {
    // 直接 capacity = 0
    BytesMut::with_capacity(0)
}

freeze()

将自己转换为一个不可变的 Bytes。

这个转换是零成本的,用来表示由句柄引用的片断将不再被改变。一旦转换完成,句柄可以被克隆并在线程间共享。

use bytes::{BytesMut, BufMut};
use std::thread;

let mut b = BytesMut::with_capacity(64);
b.put(&b"hello world"[..]);
let b1 = b.freeze();
let b2 = b1.clone();

let th = thread::spawn(move || {
    assert_eq!(&b1[..], b"hello world");
});

assert_eq!(&b2[..], b"hello world");
th.join().unwrap();

split()

split_off()

在给定的索引处将字节分成两个。

之后 self包含元素 [0, at),而返回的 BytesMut 包含元素 [at, capacity)

这是一个O(1)操作,只是增加了引用计数并设置了一些索引。

use bytes::BytesMut;

let mut a = BytesMut::from(&b"hello world"[..]);
let mut b = a.split_off(5);

a[0] = b'j';
b[0] = b'!';

assert_eq!(&a[..], b"jello");
assert_eq!(&b[..], b"!world");

代码实现:

pub fn split_off(&mut self, at: usize) -> BytesMut {
    assert!(
        at <= self.capacity(),
        "split_off out of bounds: {:?} <= {:?}",
        at,
        self.capacity(),
    );
    unsafe {
        let mut other = self.shallow_clone();
        other.set_start(at);
        self.set_end(at);
        other
    }
}

split()

移除当前视图中的字节,在一个新的 BytesMut 句柄中返回。

之后,self 将是空的,但会保留操作前的任何额外容量。这与 self.split_to(self.len()) 是相同的。

这是一个O(1)操作,只是增加了引用计数并设置了一些索引。

use bytes::{BytesMut, BufMut};

let mut buf = BytesMut::with_capacity(1024);
buf.put(&b"hello world"[..]);

let other = buf.split();

assert!(buf.is_empty());
assert_eq!(1013, buf.capacity());

assert_eq!(other, b"hello world"[..]);

代码实现:

pub fn split(&mut self) -> BytesMut {
    let len = self.len();
    self.split_to(len)
}

split_to()

在给定的索引处将缓冲区一分为二。

之后self包含元素 [at, len),而返回的 BytesMut 包含元素 [0, at)

这是一个O(1)操作,只是增加了引用计数和设置了一些索引。

use bytes::BytesMut;

let mut a = BytesMut::from(&b"hello world"[..]);
let mut b = a.split_to(5);

a[0] = b'!';
b[0] = b'j';

assert_eq!(&a[..], b"!world");
assert_eq!(&b[..], b"jello");

代码实现:

pub fn split_to(&mut self, at: usize) -> BytesMut {
    assert!(
        at <= self.len(),
        "split_to out of bounds: {:?} <= {:?}",
        at,
        self.len(),
    );

    unsafe {
        let mut other = self.shallow_clone();
        other.set_end(at);
        self.set_start(at);
        other
    }
}

truncate()

缩短缓冲区的长度,保留第一个 len 字节,放弃其余的字节。

如果 len 大于缓冲区的当前长度,则没有影响。

split_off 方法可以模拟 Truncate,但是这导致多余的字节被返回而不是丢弃。

use bytes::BytesMut;

let mut buf = BytesMut::from(&b"hello world"[..]);
buf.truncate(5);
assert_eq!(buf, b"hello"[..]);

代码实现:

pub fn truncate(&mut self, len: usize) {
    if len <= self.len() {
        unsafe {
            self.set_len(len);
        }
    }
}

pub unsafe fn set_len(&mut self, len: usize) {
  debug_assert!(len <= self.cap, "set_len out of bounds");
  self.len = len;
}

2.4 - trait Buf

Tokio trait Buf,从缓冲区读取字节。

https://docs.rs/bytes/1.0.1/bytes/trait.Buf.html

文档

介绍

从缓冲区读取字节。

缓冲区将字节存储在内存中,这样的读取操作是无懈可击的。底层存储可能是也可能不是在连续的内存中。一个Buf值是一个进入缓冲区的光标。从Buf中读出的数据会使游标位置前进。它可以被认为是一个有效的字节集合的Iterator。

最简单的Buf是 &[u8]

use bytes::Buf;

let mut buf = &b"hello world"[..];

assert_eq!(b'h', buf.get_u8());
assert_eq!(b'e', buf.get_u8());
assert_eq!(b'l', buf.get_u8());

let mut rest = [0; 8];
buf.copy_to_slice(&mut rest);

assert_eq!(&rest[..], &b"lo world"[..]);

源码

remaining()

返回当前位置和缓冲区末端之间的字节数。

这个值大于或等于 chunk() 返回的 slice 的长度。

use bytes::Buf;

let mut buf = &b"hello world"[..];

assert_eq!(buf.remaining(), 11);

buf.get_u8();

assert_eq!(buf.remaining(), 10);

实现注意: remaining的实现应该确保返回值不发生变化,除非调用 advance 或其他任何有记录的函数来改变Buf的当前位置。

方法定义为:

fn remaining(&self) -> usize;

chunk()

返回一个从当前位置开始,长度在 0 和 Buf::remaining() 之间的片断。注意,这可以返回更短的片断(这允许非连续的内部表示)。

这是一个较低级别的函数。大多数操作是由其他函数完成的。

use bytes::Buf;

let mut buf = &b"hello world"[..];

assert_eq!(buf.chunk(), &b"hello world"[..]);

buf.advance(6);

assert_eq!(buf.chunk(), &b"world"[..]);

实现说明:

这个函数不应该panic。一旦到达缓冲区的末端,即 Buf::remaining 返回0,对 chunk() 的调用应该返回一个空片段。

方法定义为:

fn chunk(&self) -> &[u8];

advance()

推进 Buf 的内部游标。

下一次对 chunk() 的调用将返回一个开始于底层缓冲区的 cnt 个字节的分片。

use bytes::Buf;

let mut buf = &b"hello world"[..];

assert_eq!(buf.chunk(), &b"hello world"[..]);

buf.advance(6);

assert_eq!(buf.chunk(), &b"world"[..]);

实现须知:

建议 advance 的实现在 cnt > self.relative() 的情况下进行 panic。如果实现没有panic,调用必须表现为 cnt == self.restaining()

一个 cnt == 0 的调用不应该panic,而应该是 no-op。

方法定义为:

fn advance(&mut self, cnt: usize);

chunks_vectored()

从自己的当前位置开始,用潜在的多个片断填充dst。

如果 Buf 是由不相干的字节片断支持的,chunk_vectored 可以一次获取多个片断。 dst是一个 IoSlice 的引用,使得该片可以直接被 writev 使用,而不需要进一步转换。dst 中所有缓冲区的长度之和将小于或等于 Buf::reserved()

dst 中的条目将被覆盖,但分片所包含的数据不会被修改。如果 chunk_vectored 没有填满dst中的每一个条目,那么dst将保证包含 ``self`中的所有剩余片断。

这是一个较低级别的函数。大多数操作是由其他函数完成的。

实现说明:

这个函数不应该panic。一旦到达缓冲区的末端,即 Buf::remaining 返回0,对 chunk_vectored 的调用必须返回0而不改变dst。

实现者还应该注意正确处理dst为零长度片断时的调用。

代码实现为:

fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
    if dst.is_empty() {
        return 0;
    }

    if self.has_remaining() {
        dst[0] = IoSlice::new(self.chunk());
        1
    } else {
        0
    }
}

remaining()

如果还有更多的字节需要消耗,则返回true。

这等同于 self.remaining() != 0

use bytes::Buf;

let mut buf = &b"a"[..];

assert!(buf.has_remaining());

buf.get_u8();

assert!(!buf.has_remaining());

代码实现:

fn has_remaining(&self) -> bool {
    self.remaining() > 0
}

copy_to_slice()

pub fn copy_to_slice(&mut self, dst: &mut [u8])

从 self 复制字节到 dst。

光标按复制的字节数前进,self 必须有足够的剩余字节来填充dst。

use bytes::Buf;

let mut buf = &b"hello world"[..];
let mut dst = [0; 5];

buf.copy_to_slice(&mut dst);
assert_eq!(&b"hello"[..], &dst);
assert_eq!(6, buf.remaining());

代码实现:

fn copy_to_slice(&mut self, dst: &mut [u8]) {
    let mut off = 0;

    assert!(self.remaining() >= dst.len());

    while off < dst.len() {
        let cnt;

        unsafe {
            let src = self.chunk();
            cnt = cmp::min(src.len(), dst.len() - off);

            ptr::copy_nonoverlapping(src.as_ptr(), dst[off..].as_mut_ptr(), cnt);

            off += cnt;
        }

        self.advance(cnt);
    }
}

get_u8()

pub fn get_u8(&mut self) -> u8

从自身获取一个无符号的8位整数。

当前位置推进 1。

代码实现:

fn get_u8(&mut self) -> u8 {
    assert!(self.remaining() >= 1);
    let ret = self.chunk()[0];
    self.advance(1);
    ret
}

get_i8()

pub fn get_i8(&mut self) -> i8

从自身获取一个有符号的8位整数。

当前位置推进 1。

copy_to_bytes()

pub fn copy_to_bytes(&mut self, len: usize) -> Bytes

消耗 self 内部的 len 个字节,并返回带有这些数据的新 Bytes 实例。

这个函数可能被底层类型优化以避免实际的拷贝。例如,Bytes 的实现会做一个浅层拷贝(ref-count增量)。

use bytes::Buf;

let bytes = (&b"hello world"[..]).copy_to_bytes(5);
assert_eq!(&bytes[..], &b"hello"[..]);

代码实现为:

fn copy_to_bytes(&mut self, len: usize) -> crate::Bytes {
    use super::BufMut;

    assert!(len <= self.remaining(), "`len` greater than remaining");

    let mut ret = crate::BytesMut::with_capacity(len);
    ret.put(self.take(len));
    ret.freeze()
}

take()

pub fn take(self, limit: usize) -> Take<Self>

创建一个适配器,它将从 self 读取最多 limit 个字节。

这个函数返回新的 Buf 实例,它将最多读取 limit 个字节。

use bytes::{Buf, BufMut};

let mut buf = b"hello world"[..].take(5);
let mut dst = vec![];

dst.put(&mut buf);
assert_eq!(dst, b"hello");

let mut buf = buf.into_inner();
dst.clear();
dst.put(&mut buf);
assert_eq!(dst, b" world");

代码实现为:

fn take(self, limit: usize) -> Take<Self>
where
    Self: Sized,
{
    take::new(self, limit)
}

chain()

pub fn chain<U: Buf>(self, next: U) -> Chain<Self, U>

创建一个适配器,将这个缓冲区与另一个缓冲区连接起来。

返回的 Buf 实例将首先消耗来自 self 的所有字节。之后的输出相当于Next的输出。

use bytes::Buf;

let mut chain = b"hello "[..].chain(&b"world"[..]);

let full = chain.copy_to_bytes(11);
assert_eq!(full.chunk(), b"hello world");

3 - Tokio Mio

Tokio技术栈之Mio

4 - Tokio Tower

Tokio技术栈之Tower

5 - Tokio Hyper

Tokio技术栈之Hyper

6 - Tokio Tonic

Tokio技术栈之Tonic

7 - Tokio Tracing

Tokio技术栈之Tracing