Rust标准库中的同步
- 1: Rust标准库中的同步介绍
- 2: Rust标准库中的原子类型
- 3: Rust标准库中的原子引用计数(Arc)
- 4: Rust标准库中的屏障(Barrier)
- 5: Rust标准库中的条件变量(Condvar)
- 6: Rust标准库中的多生产者单消费者队列(mpsc)
- 7: Rust标准库中的互斥(Mutex)
- 8: Rust标准库中的一次性(Once)
- 9: Rust标准库中的读写锁(RwLock)
1 - Rust标准库中的同步介绍
https://doc.rust-lang.org/std/sync/index.html
有用的同步原语。
同步化的需要
从概念上讲,Rust程序是指在计算机上执行的一系列操作。程序中发生事件的时间线与代码中的操作顺序是一致的。
考虑一下下面的代码,在一些全局静态变量上进行操作。
static mut A: u32 = 0;
static mut B: u32 = 0;
static mut C: u32 = 0;
fn main() {
unsafe {
A = 3;
B = 4;
A = A + B;
C = B;
println!("{} {} {}", A, B, C);
C = A;
}
}
看起来好像是改变了一些存储在内存中的变量,进行了一次加法,结果存储在A中,变量C被修改了两次。
当只涉及到一个线程时,结果和预期的一样:7 4 4 4行被打印出来。
至于幕后会发生什么,当启用优化后,最终生成的机器代码可能会和代码中的样子大不相同。
-
第一个存储到C的存储可能会被移到A或B之前,就像我们写的C = 4; A = 3; B = 4一样。
-
A+B的赋值可能会被删除,因为和可以存储在一个临时位置,直到它被打印出来,而全局变量永远不会被更新。
-
最终的结果可以通过在编译时查看代码来确定,所以恒定折叠可能会把整个代码块变成一个简单的println!(“7 4 4 4”)。
编译器可以执行这些优化的任意组合,只要最终优化后的代码在执行时,产生的结果与没有优化的代码相同。
由于现代计算机的并发性,对程序执行顺序的假设往往是错误的。对全局变量的访问可能会导致非确定性的结果,即使是在编译器优化被禁用的情况下,仍然有可能引入同步bug。
需要注意的是,由于Rust的安全保证,访问全局(静态)变量需要不安全的代码,假设我们没有使用本模块中的任何同步基元,那么访问全局(静态)变量就需要不安全的代码。
越级执行
由于各种原因,指令的执行顺序可能与我们定义的顺序不同。
-
编译器重排序:如果编译器可以在更早的时候发出指令,它就会尝试这样做。例如,它可能会在代码块的顶部吊起内存负载,这样CPU就可以开始从内存中预取值。
-
在单线程场景中,当编写信号处理程序或某些类型的低级代码时,这可能会引起问题。使用编译器栅栏来防止这种重新排序。
-
单个处理器执行指令的顺序失序。现代CPU能够超尺度执行,即可能同时执行多条指令,即使机器代码描述的是一个顺序过程。
这种重排序是由CPU透明地处理的。
- 一个多处理器系统同时执行多个硬件线程的情况。在多线程场景中,可以使用两种基元来处理同步问题。
- 内存栅栏,确保内存访问以正确的顺序被其他CPU看到。
- 二是原子操作,以确保同时访问同一内存位置不会导致非定义行为。
更高层次的同步对象
大多数低级的同步原语都相当容易出错,使用起来也不方便,这也是为什么标准库也暴露了一些更高级别的同步对象的原因。
这些抽象对象可以从低级原语中构建出来。为了提高效率,标准库中的同步对象通常是在操作系统内核的帮助下实现的,当线程在获取锁时被阻塞时,内核能够对线程进行重新安排。
下面是可用的同步对象的概述:
-
Arc:原子引用计数(Atomically Reference-Counted)指针,可以在多线程环境下使用,以延长某些数据的使用寿命,直到所有线程都使用完为止。
-
Barrier:屏障。确保多个线程相互等待对方到达程序中的某个点,然后再一起继续执行。
-
Condvar:条件变量,提供在等待事件发生时阻止一个线程的能力。
-
mpsc:多生产者,单消费(Multi-producer, single-consumer)队列,用于基于消息的通信。可以提供轻量级的线程间同步机制,代价是增加一些额外的内存。
-
互斥机制(Mutex)。互斥机制,确保每次最多只有一个线程能够访问一些数据。
-
Once:用于全局变量的线程安全的一次性初始化。
-
RwLock:用于全局变量的初始化。提供了一个相互排斥机制,允许多个读同时访问,同时一次只允许一个写。在某些情况下,这可能比mutex更有效。
2 - Rust标准库中的原子类型
https://doc.rust-lang.org/std/sync/atomic/index.html
原子类型
原子类型提供了线程之间的原始共享内存通信,也是其他并发类型的构件。
这个模块定义了一些精选的原始类型的原子版本,包括AtomicBool、AtomicIsize、AtomicUsize、AtomicI8、AtomicU16等。Atomic类型呈现的操作,正确使用时,可以在线程之间同步更新。
每个方法都取 Ordering
,它代表该操作的内存屏障强度。这些顺序与C++20的原子顺序相同。更多信息,请参见nomicon。
原子变量在线程之间共享是安全的(它们实现了Sync),但它们本身并没有提供共享的机制,遵循Rust的线程模型。共享原子变量最常见的方式是将其放入Arc(原子引用计数的共享指针)。
原子类型可以存储在静态变量中,使用像 AtomicBool:::new 这样的常量初始化器进行初始化。原子静态常量经常被用于延迟的全局初始化。
可移植性
这个模块中的所有原子类型都保证是无锁的,只要可用。这意味着它们不会在内部使用全局mutex。原子类型和操作不保证是无等待的。这意味着像 fetch_or 这样的操作可以通过比较和交换循环(compare-and-swap)来实现。
原子操作可以在指令层用较大尺寸的原子类型来实现。例如有些平台使用4字节的原子指令来实现AtomicI8。注意,这种模拟应该不会对代码的正确性产生影响,只是需要注意的地方。
这个模块中的原子类型可能并不是所有平台都能使用。但是,这里的原子类型都是广泛可用的,一般来说,可以依赖现有的原子类型。一些值得注意的例外是。
-
具有32位指针的PowerPC和MIPS平台没有AtomicU64或AtomicI64类型。
-
非Linux的ARM平台,如armv5te,根本就没有AtomicU64或AtomicI64类型。
-
采用thumbv6m的ARM目标完全没有原子操作。
需要注意的是,未来可能会加入同样不支持某些原子操作的平台。最大限度的可移植代码要注意使用哪些原子类型。一般来说,AtomicUsize和AtomicIsize是最可移植的,但即便如此,也不是到处都有。作为参考,std库需要指针大小的原子类型,虽然core不需要。
目前你需要使用#[[cfg(target_arch)]]主要是为了有条件地在代码中编译原子体。还有一个不稳定的#[cfg(target_has_atomic)]也是不稳定的,将来可能会稳定下来。
例子
一个简单的自旋锁:
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
let spinlock = Arc::new(AtomicUsize::new(1));
let spinlock_clone = spinlock.clone();
let thread = thread::spawn(move|| {
spinlock_clone.store(0, Ordering::SeqCst);
});
// Wait for the other thread to release the lock
while spinlock.load(Ordering::SeqCst) != 0 {}
if let Err(panic) = thread.join() {
println!("Thread had an error: {:?}", panic);
}
}
保持全局的活跃线程数:
use std::sync::atomic::{AtomicUsize, Ordering};
static GLOBAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::SeqCst);
println!("live threads: {}", old_thread_count + 1);
Ordering枚举
https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
内存顺序指定了原子操作同步内存的方式。在其最弱的Relaxed中,只有操作直接接触到的内存才会被同步。另一方面,SeqCst操作的存储-加载对在同步其他内存的同时,还额外保留了所有线程中此类操作的总顺序。
Rust的内存顺序与C++20的内存顺序相同。
更多的信息请参见nomicon。
-
Relaxed
没有排序约束,只有原子操作。
对应于C++20中的 memory_order_relaxed。
-
Release
当与存储结合在一起时,所有之前的操作都会在加载该值的任何 Acquire(或更强的)排序之前变得有序。特别是,所有之前的所有写操作都会对执行该值的Acquire(或更强)加载的线程可见。
请注意,对结合了加载和存储的操作使用此命令会导致 Relaxed 加载操作!
这个命令只适用于可以执行存储的操作。
对应于C++20中的 memory_order_release。
-
Acquire
当与加载结合在一起时,如果加载的值是由具有Release(或更强的)排序的存储操作写入的,那么后续的所有操作都会在该存储之后成为排序。特别是,所有后续的加载操作都会看到在存储操作之前写入的数据。
请注意,对一个结合了加载和存储的操作使用这种排序会导致一个Relaxed存储操作!
这个命令只适用于可以执行加载的操作。
对应于C++20中的memory_order_acquire。
-
AcqRel
同时具有Acquire和Release的效果。对于负载,它使用的是Acquire命令。对于存储,它使用的是Release命令。
注意,在compare_and_swap的情况下,有可能操作最终没有执行任何存储,因此它只执行Acquire命令。然而,AcqRel永远不会执行Relaxed访问。
这种命令只适用于同时执行加载和存储的操作。
对应于C++20中的memory_order_acq_rel。
-
SeqCst
类似于Acquire/Release/AcqRel(分别用于加载、存储和加载与存储操作),额外保证了所有线程以相同的顺序查看所有顺序一致的操作。
对应于C++20中的memory_order_seq_cst。
compiler_fence函数
pub fn compiler_fence(order: Ordering)
编译器的内存围栏。
compiler_fence不发出任何机器代码,但限制了编译器允许做的内存重排序的种类。具体来说,根据给定的 Ordering 语义,编译器可能会被禁止从调用 compiler_fence 之前或之后的读或写移动到调用 compiler_fence 的另一端。注意,这并不妨碍硬件进行这种重新排序。这在单线程、执行上下文中不是问题,但当其他线程可能同时修改内存时,需要更强的同步基元(如fence)。
不同的排序语义所阻止的重排序是。
- 用SeqCst,不允许跨这个点的读和写的重新排序。
- 使用Release,前面的读和写不能越过后续的写。
- 使用Acquire,后续的读和写不能在前面的读之前移动。
- 使用AcqRel,以上两条规则都会被执行。
compiler_fence通常只对防止线程与自己赛跑有用。也就是说,如果一个给定的线程正在执行一段代码,然后被中断,并开始执行其他地方的代码(同时仍然在同一个线程中,并且在概念上仍然在同一个内核上)。在传统程序中,这种情况只有在注册了信号处理程序后才会出现。在更多的底层代码中,在处理中断时、实现绿色线程预置时等也会出现这种情况。鼓励好奇的读者阅读Linux内核中关于内存障碍的讨论。
例子:
如果没有 compiler_fence,下面的代码中的assert_eq!中的assert_eq!不能保证成功,尽管一切都发生在一个线程中。要了解原因,请记住,编译器可以自由地将存储空间调换成import_variable和is_read,因为它们都是Ording:::Relaxed。如果它这样做了,并且信号处理程序在IS_READY被更新后立即被调用,那么信号处理程序将看到IS_READY=1,但IMPORTANT_VARIABLE=0。 使用compiler_fence可以纠正这种情况。
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::Ordering;
use std::sync::atomic::compiler_fence;
static IMPORTANT_VARIABLE: AtomicUsize = AtomicUsize::new(0);
static IS_READY: AtomicBool = AtomicBool::new(false);
fn main() {
IMPORTANT_VARIABLE.store(42, Ordering::Relaxed);
// prevent earlier writes from being moved beyond this point
compiler_fence(Ordering::Release);
IS_READY.store(true, Ordering::Relaxed);
}
fn signal_handler() {
if IS_READY.load(Ordering::Relaxed) {
assert_eq!(IMPORTANT_VARIABLE.load(Ordering::Relaxed), 42);
}
}
fence函数
https://doc.rust-lang.org/std/sync/atomic/fn.fence.html
pub fn fence(order: Ordering)
一个原子栅栏。
根据指定的顺序,栅栏可以防止编译器和CPU围绕着它重新排序某些类型的内存操作。这就在它和其他线程中的原子操作或栅栏之间建立了同步关系。
一个具有(至少)释放排序语义的栅栏’A’与具有(至少)获得语义的栅栏’B’同步,如果且仅当存在操作X和Y,都在某些原子对象’M’上操作,那么A在X之前被排序,Y在B之前被同步,而Y观察到M的变化。
Thread 1 Thread 2
fence(Release); A --------------
x.store(3, Relaxed); X --------- |
| |
| |
-------------> Y if x.load(Relaxed) == 3 {
|-------> B fence(Acquire);
...
}
具有Release或Acquire语义的原子操作也可以与栅栏同步。
一个具有SeqCst排序的栅栏,除了具有Acquire和Release语义外,还可以参与其他SeqCst操作和/或栅栏的全局程序排序。
接受Acquire、Release、AcqRel和SeqCst命令。
例子:
use std::sync::atomic::AtomicBool;
use std::sync::atomic::fence;
use std::sync::atomic::Ordering;
// 基于自旋锁的互斥基元。
pub struct Mutex {
flag: AtomicBool,
}
impl Mutex {
pub fn new() -> Mutex {
Mutex {
flag: AtomicBool::new(false),
}
}
pub fn lock(&self) {
while !self.flag.compare_and_swap(false, true, Ordering::Relaxed) {}
// This fence synchronizes-with store in `unlock`.
fence(Ordering::Acquire);
}
pub fn unlock(&self) {
self.flag.store(false, Ordering::Release);
}
}
spin_loop_hint函数
https://doc.rust-lang.org/std/sync/atomic/fn.spin_loop_hint.html
pub fn spin_loop_hint()
向处理器发出信号,表示它处于忙等待的自旋循环(“spin lock”)中。
接收到自旋循环信号后,处理器可以通过节省功耗或切换超线程等方式优化自己的行为。
这个函数不同于 std::thread::yield_now,后者直接向系统的调度器输出,而spin_loop_hint不与操作系统交互。
spin_loop_hint的一个常见用例是在同步基元中实现CAS循环中的约束优化旋转。为了避免优先级反转等问题,强烈建议在有限的迭代次数后终止自旋循环,并进行适当的阻塞syscall。
注意:在不支持接收自旋循环提示的平台上,这个函数根本不做任何事情。
3 - Rust标准库中的原子引用计数(Arc)
https://doc.rust-lang.org/std/sync/struct.Arc.html
线程安全的引用计数指针。‘Arc’代表 “Atomically Reference Counted/原子引用计数”。
类型 Arc
Rust中的共享引用默认不允许改变,Arc也不例外:一般情况下,你无法获得Arc内部的东西的可变引用。如果你需要通过Arc进行改变,请使用Mutex、RwLock或Atomic类型。
线程安全
与 Rc
只要T实现了Send和Sync,ArcArc<RefCell<T>>
。RefCellArc<RefCell<T>
也会是Send。但这样一来,我们就有问题了。RefCell
最后,这意味着你可能需要将Arc
用 weak 打破循环
downgrade方法可以用来创建一个无主的Weak指针。Weak指针可以升级为Arc,但是如果存储在分配中的值已经被降级,则会返回None。换句话说,Weak指针不会使分配中的值保持活的,但是,它们会使分配(值的后备存储)保持活的。
Arc指针之间的循环永远不会被dealocated。基于这个原因,Weak被用来打破循环。例如,一棵树可以有从父节点到子节点的强Arc指针,而从子节点回到父节点的弱指针。
4 - Rust标准库中的屏障(Barrier)
https://doc.rust-lang.org/std/sync/struct.Barrier.html
Barrier/屏障可以让多个线程同步开始某些计算。
use std::sync::{Arc, Barrier};
use std::thread;
let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
let c = barrier.clone();
// The same messages will be printed together.
// You will NOT see any interleaving.
handles.push(thread::spawn(move|| {
println!("before wait");
c.wait();
println!("after wait");
}));
}
// Wait for other threads to finish.
for handle in handles {
handle.join().unwrap();
}
new 方法
use std::sync::Barrier;
let barrier = Barrier::new(10);
创建一个新的屏障,可以阻止给定数量的线程。
屏障将阻止n-1个线程调用等待,然后当第n个线程调用等待时,立即唤醒所有线程。
wait 方法
屏蔽当前线程,直到所有线程在这里会合。
Barrier在所有线程会合后可以重复使用,并且可以连续使用。
单个(任意)线程在从这个函数返回时,会收到一个 is_leader返回 true 的 BarrierWaitResult,其他所有线程都会收到is_leader返回false的结果。
5 - Rust标准库中的条件变量(Condvar)
https://doc.rust-lang.org/std/sync/struct.Condvar.html
条件变量
条件变量代表阻止线程的能力,使其在等待事件发生时不消耗CPU时间。条件变量通常与布尔谓词(一个条件/condition)和mutex关联。在确定线程必须阻止之前,该谓词总是在mutex内部进行验证。
这个模块中的函数将阻止当前线程的执行,并尽可能地绑定到系统提供的条件变量。注意,这个模块对系统条件变量有一个额外的限制:每个condvar在运行时只能使用一个mutex。任何试图在同一个条件变量上使用多个mutexes的行为都会导致运行时的恐慌。如果不希望这样,那么sys中的不安全基元就没有这个限制,但可能会导致未定义的行为。
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
wait-while方法
阻止当前线程,直到这个条件变量收到通知,并且提供的条件为false。
这个函数将原子化地解锁指定的mutex(用 guard 表示),并阻塞当前线程。这意味着,任何在mutex解锁后逻辑上发生的notify_one或notify_all的调用都是唤醒这个线程的候选函数。当这个函数调用返回时,指定的锁将被重新获得。
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
// As long as the value inside the `Mutex<bool>` is `true`, we wait.
let _guard = cvar.wait_while(lock.lock().unwrap(), |pending| { *pending }).unwrap();
6 - Rust标准库中的多生产者单消费者队列(mpsc)
https://doc.rust-lang.org/std/sync/mpsc/index.html
多生产者、单消费者FIFO队列通信原语。
mpsc = Multiple Producer Single Consumer
该模块提供在通道(channel)上基于消息的通信,具体定义为三种类型。
- Sender
- SyncSender
- Receiver
Sender 或 SyncSender 用于向 Receiver 发送数据。这两个 sender 都是可克隆的(多生产者),这样,许多线程可以同时向一个 Receiver 发送数据(单消费者)。
这些通道(channel)有两种类型。
-
异步的、无限缓冲的通道(channel)。channel 函数将返回一个(Sender, Receiver)元组,其中所有的发送都是异步的(它们从不阻塞)。该通道在概念上有一个无限缓冲。
-
同步的、有边界的通道(channel)。sync_channel 函数将返回一个(SyncSender, Receiver) tuple,在这个函数中,等待消息的存储是一个预先分配的固定大小的缓冲区。所有的发送都将通过阻塞来同步,直到有可用的缓冲区空间。请注意,允许边界为0,这将使该通道成为一个 “会合 “通道,每个发送方都会将消息原子化地交给接收方。
断连
通道上的发送和接收操作都会返回一个结果,表示操作是否成功。一个不成功的操作通常表示一个通道的另一半通道被丢弃在相应的线程中而 “挂断”。
一旦一个通道的一半被deocallated,大多数操作就不能再继续进行,所以会返回Err。许多应用程序会继续解包这个模块返回的结果,如果其中一个线程意外死亡,就会在线程之间传播失败。
7 - Rust标准库中的互斥(Mutex)
https://doc.rust-lang.org/std/sync/struct.Mutex.html
用于保护共享数据的互斥原语
这个mutex将阻止等待锁可用的线程。mutex也可以被静态初始化或通过 new 构造函数创建。每个mutex都有一个 type 参数,表示它所保护的数据。这些数据只能通过 lock 和 try_lock 返回的 RAII 守护来访问,这保证了只有当mutex被锁定时,数据才会被访问。
毒化/Poisoning
这个模块中的 mutex 实现了一种叫做 " poisoning “的策略,每当持有 mutex 的线程恐慌时,就会认为 mutex 中毒。一旦 mutex 被毒化,所有其他线程都无法默认访问该数据,因为它很可能被污染了(某些不变性没有被维护)。
对于mutex来说,这意味着 lock 和 try_lock 方法会返回一个Result值,表示mutex是否被毒化。大多数使用mutex的方法都会简单地将这些结果 unwrap(),在线程之间传播恐慌,以确保不会出现可能无效的变量。
然而,被毒化的 mutex 并不会阻止对底层数据的所有访问。PoisonError 类型有一个 into_inner 方法,它将返回在成功锁定时返回的守护。这样,尽管锁被毒化了,但仍然可以访问数据。
new方法
在解锁状态下创建一个新的mutex,可以随时使用。
pub fn new(t: T) -> Mutex<T>
use std::sync::Mutex;
let mutex = Mutex::new(0);
lock方法
获取一个mutex,阻塞当前线程,直到它能够这样做。
这个函数将阻塞本地线程,直到它可以获取mutex为止。返回后,该线程是唯一一个持有锁的线程。返回一个RAII守护,允许范围化解锁。当保护罩超出范围时,mutex将被解锁。
在已经持有锁的线程中锁定一个mutex的确切行为没有说明。但是,这个函数在第二次调用时不会返回(例如,它可能会出现恐慌或死锁)。
try_lock方法
尝试获取此锁。
如果此时无法获得该锁,则返回Err。否则,将返回一个RAII护卫。当护卫被丢弃时,该锁将被解锁。
此功能不阻塞。
8 - Rust标准库中的一次性(Once)
https://doc.rust-lang.org/std/sync/struct.Once.html
一个同步原语,可用于运行一次性全局初始化。用于FFI或相关功能的一次性初始化。这种类型只能用 Once:::new 构造函数构造。
call_once 方法
执行初始化例程一次,并且只执行一次。如果这是 call_once 第一次被调用,给定的闭包将被执行,否则不会调用该例程。
如果当前有另一个初始化例程正在运行,该方法将阻止调用的线程。
当这个函数返回时,保证一些初始化已经运行并完成(可能不是指定的闭包)。它还保证由执行的闭包所执行的任何内存写入都能被其他线程在这时可靠地观察到(闭包和返回后执行的代码之间存在着发生前的关系)。
如果给定的闭包在同一个 Once 实例上递归调用 call_once,则没有指定确切的行为,允许的结果是恐慌或死锁。
9 - Rust标准库中的读写锁(RwLock)
https://doc.rust-lang.org/std/sync/struct.RwLock.html
读写锁
这种类型的锁允许在任何时间点上有多个读或最多一个写。这种锁的写部分通常允许修改底层数据(独占访问),而读部分通常允许只读访问(共享访问)。
相比之下,Mutex不区分获取该锁的读写,因此会阻止任何等待该锁可用的线程。RwLock将允许任何数量的读获取该锁,只要一个写不持有该锁。
锁的优先级策略取决于底层操作系统的实现,这种类型并不保证会使用任何特定的策略。
类型参数T代表这个锁所保护的数据。它要求T满足Send满足跨线程共享和满足Sync以便通过reader并发访问。从锁方法中返回的RAII守护实现了Deref(和写方法的DerefMut),允许访问锁的内容。
毒化
像Mutex一样,RwLock会在恐慌时中毒。但是,请注意,只有在RwLock被完全锁定时(写模式)发生恐慌时,RwLock才会中毒。如果恐慌发生在任何读卡器中,那么该锁将不会中毒。