Crust of Rust - Atomics and Memory Ordering
原子操作涉及到内存序,非常底层,比较难以理解。
Rust 的官方文档 std::sync::atomic - Rust 在较高的抽象等级上描述的不错。
什么是原子类型
Rust 有很多 primitive 的原子类型,那么为什么需要他们?
首先最重要的是,原子类型变量可以跨线程边界共享;其次他们的类型需要跟基础的类型有一些区分,因为这些类型实际上对应了 CPU 的不同指令。
核心是如果你需要共享的访问内存中的变量,你需要一些类似于其他的信息,让 CPU 知道什么时候一些线程可以看到另外一些线程的行为;他们如何同步。
原子类型允许你在使用它们时设置一些规则,决定变量的值什么时候会可见。
内存模型
谈到“规则”时指的实际上是语言的“内存模型”,不过 Rust 现在还没有明确完整的内存模型…
不过 Rust 依赖 LLVM,所以 Rust 基本上是遵守 C 的内存模型。
内存序和 C++20 的是一样的,可以见 std::memory_order - cppreference.com
AtomicUsize
AtomicUsize in std::sync::atomic - Rust 这是 usize 的原子版本。你需要使用他的方法才能访问内部的值。
如果你跨线程共享变量,通常你应该选择使用 Arc 来实现共享所有权。
还有一个比较重要的区别是,**使用普通的引用也可以对其内部的值进行修改,例如 store 接受的就是普通引用。**这在普通变量上明显是不行的。
内部使用
UnsafeCell提供“内部可变性”。
我们看他内部包含的方法,首先是:
pub fn load(&self, order: Ordering) -> usize;
pub fn store(&self, val: usize, order: Ordering);
pub fn swap(&self, val: usize, order: Ordering) -> usize;
你会注意到参数中都包含一个 order: Ordering,它是一个枚举,定义:
#[non_exhaustive]
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
之后讲述的内容很大部分都是在讲内存序的,总之它的作用是告诉编译器你期望的访问内存时的顺序保证。
注意这里有个
non_exhaustive,有这个注解你就需要在 match 时永远增加一个_分支。
之后还有三个:
pub fn compare_and_swap(
&self,
current: usize,
new: usize,
order: Ordering,
) -> usize;
pub fn compare_exchange(
&self,
current: usize,
new: usize,
success: Ordering,
failure: Ordering,
) -> Result<usize, usize>;
pub fn compare_exchange_weak(
&self,
current: usize,
new: usize,
success: Ordering,
failure: Ordering,
) -> Result<usize, usize>;
之后会具体讲他们的区别。
总之他们的行为都是:有条件的、原子地读取值,然后修改值。
什么是 原子地 ?例如你先读取再存储,可能有其他线程在两步操作中间进行一些操作;而原子操作保证这两步操作之间没有其他线程介入。
此外还有一大堆 fetch_* 操作。
在一些平台上,比如 x86,任何访问 64 位变量的操作都是原子的,所以理论上你如果你写汇编,那你可以不借助原子类型就完成原子操作。
所有原子操作都是 lock-free,当然不代表他们肯定是 wait-free。
原子操作会改变 CPU 的行为,同时也对编译器的行为有影响。
实现一个 bad mutex
接下来我们自己实现一个简单的 spin lock
你永远不应该在实践中使用 spin lock,除非你知道你在干什么(当然大部分情况下你并不知道)
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};
const LOCKED: bool = true;
const UNLOCKED: bool = false;
pub struct Mutex<T> {
v: UnsafeCell<T>,
locked: AtomicBool,
}
impl<T> Mutex<T> {
pub fn new(v: T) -> Self {
Self {
v: UnsafeCell::new(v),
locked: AtomicBool::new(UNLOCKED),
}
}
}
我们该如何上锁?
impl<T> Mutex<T> {
pub fn with_lock<R>(&mut self, f: impl FnOnce(&mut T) -> R) -> R {
while self.locked.load() != UNLOCKED {}
self.locked.store(LOCKED);
let ret = unsafe { f(&mut *self.v.get()) };
self.locked.store(UNLOCKED);
ret
}
}
最最基础的实现长这样,之后,我们需要指定内存序。那我们需要什么样的内存序呢?我们可以先放一个 relax 让其可以编译。
pub fn with_lock<R>(&mut self, f: impl FnOnce(&mut T) -> R) -> R {
while self.locked.load(Ordering::Relaxed) != UNLOCKED {}
self.locked.store(LOCKED, Ordering::Relaxed);
let ret = unsafe { f(&mut *self.v.get()) };
self.locked.store(UNLOCKED, Ordering::Relaxed);
ret
}
以上的代码实际上也有问题,因为我们的 load 和 store 分开了两条指令。只不过实际上发生的可能性比较低而已。
pub fn with_lock<R>(&mut self, f: impl FnOnce(&mut T) -> R) -> R {
while self.locked.load(Ordering::Relaxed) != UNLOCKED {}
// 可能有多个 thread 走到这里
self.locked.store(LOCKED, Ordering::Relaxed);
let ret = unsafe { f(&mut *self.v.get()) };
self.locked.store(UNLOCKED, Ordering::Relaxed);
ret
}
compare_exchange
首先 compare_and_swap 已经被废弃了,你应该使用 compare_exchange / compare_exchange_weak,因为这两个方法允许你分别指定操作成功/失败时的内存序。
compare_exchange 首先需要接受一个当前值的预期,之后如果预期的值和当前实际的值相同,那就会更新他的值为新的值。
之后,不管操作执行成功还是失败,都会携带当前的值返回。
我们可以修改一下代码,内存序仍然使用 relaxed 来占位:
pub fn with_lock<R>(&mut self, f: impl FnOnce(&mut T) -> R) -> R {
while self
.locked
.compare_exchange(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
// MESI Protocol
while self.locked.load(Ordering::Relaxed) == LOCKED {}
}
let ret = unsafe { f(&mut *self.v.get()) };
self.locked.store(UNLOCKED, Ordering::Relaxed);
ret
}
CAUTION
再次强调一下实践中别写这种 spin 代码。
具体 CPU 多个核心是如何协调的,可以参考 MESI protocol
compare_exchange_weak 允许在当前实际的值和传入的预期值相同时失败(大多数情况下不会失败,但他允许这么做),根据你的平台可能代码效率更高。
例如 x86 的 CPU 实现了 CAS,一条指令就能搞定;而 ARM 则是有 LDREX, STREX,exclusive 的读取/存储(类似读写锁的感觉),所以 ARM 的 compare_exchange 是通过 LDREX STREX 的循环实现的,所以上面这种代码就有嵌套的循环了,虽然可能不是什么大问题,但标准库还是提供了 compare_exhange_weak(x86 架构上和 compare_exchange 一样)。
总之,如果你要在循环里调用,那你应该使用 compare_exchange_weak;否则你就直接使用 compare_exchange 就可以了。
内存序
#[non_exhaustive]
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
这些内存序会影响到多个线程操作同一个内存位置时,观测到的行为。
Relaxed
relaxed 只保证原子性,没有其他顺序保证。
fn too_relaxed() {
// Box::leak() 返回的是 static 引用,不能被 move 到闭包里,要显式标注生命周期
let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
let t1 = std::thread::spawn(move || {
let r1 = y.load(Ordering::Relaxed);
x.store(r1, Ordering::Relaxed);
r1
});
let t2 = std::thread::spawn(move || {
let r2 = x.load(Ordering::Relaxed);
y.store(42, Ordering::Relaxed);
r2
});
let r1 = t1.join().unwrap();
let r2 = t2.join().unwrap();
// r1 == r2 == 42
}
例如这段代码,最后的结果可能出现 r1 == r2 == 42,r2 是有可能为 42 的,即使 store 操作发生在 r2 被读取之后。
Relaxed 序中,CPU 和编译器是可以对指令随意进行重排的,因为 Relaxed 序对这些没有要求,所以最后可能会有出乎意料的结果(看起来像时空穿越了一样)
Acquire/Release
这两个顺序看起来跟使用锁获得资源时差不多,实际上他们就是针对这种场景设计的。
When coupled with a store, all previous operations become ordered before any load of this value with
Acquire(or stronger) ordering. In particular, all previous writes become visible to all threads that perform anAcquire(or stronger) load of this value.
标准库的文档是这样形容的,看起来比较晦涩。再来看看 cppref 的描述
no reads or writes in the current thread can be reordered after this store.
再翻译一下就是:任何使用 Release 序的 store 操作,都要求对同一个变量的 load 操作必须能够看到 store 之前的所有操作。
pub fn with_lock<R>(&mut self, f: impl FnOnce(&mut T) -> R) -> R {
while self
.locked
.compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
// MESI Protocol
while self.locked.load(Ordering::Relaxed) == LOCKED {}
}
let ret = unsafe { f(&mut *self.v.get()) };
// 如果这里是 Relaxed,那么就不能保证之前我们对变量的修改被其他线程所见
self.locked.store(UNLOCKED, Ordering::Release);
ret
}
Acquire 和 Release 成对出现,他们建立了一种 happens-before 关系,使用 Acquire 的线程需要保证能够看到 Release 的线程之前进行的所有操作。
happens-before 指的是所有在 store 及之前的操作也都发生在 load 及之后的操作之前。
cppref 的描述如下:
A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load.
也就是说,上面代码的第九行不能被重排到第二行的 while load 之前。
此外,还有一个 AcqRel 序,你可以给一个同时会进行读写的操作传这个内存序,就比如 compare_exchange,先读取再写入。
他的意思是读取时使用 acquire 语义,store 时使用 release 语义。
对于我们这里的 compare_exchange,问题就是,我们是否在意这个 store 操作是不是 Release,这里实际上无所谓。
AcqRel 通常会用在没有临界区,一步进行修改操作的地方(你之后不会做 store 操作),比如 fetch_add 这种。
那么 compare_exchange 的最后那个参数的内存序该怎么填呢?这个内存序指的是如果 load 的结果表明你不应该 store,这种情况下的内存序。具体在我们的例子中就是,获取不到锁的内存序该是什么。就是说即使获取不到锁,你也需要建立 happens-before 这种关系。这种情况实在比较少见,但确实存在。
在我们这个例子里 Relaxed 就可以,因为这会你也不需要跟其他线程进行同步。
x86 是强内存序的,对于任何操作都保证 acquire-release 语义,所以上面有问题的代码在 x86 上是很难看出问题的;在 ARM 上就可能出问题了。
在实现计数器这种场景,你就可以用 Relaxed,因为根本无所谓线程看到的是几,无所谓他更新的早一点或者晚一点,只需要原子性。
fetch operations
comapre_exchange 就比较严格,比如做一个计数器,你需要先跟当前的值对比,符合预期才能进行下一步操作;而用 fetch_* 则是你告诉 CPU 怎么计算下一个值,CPU 只是单纯的对这个值进行你要的原子操作,并不在乎他是多少,所以这个操作并不会失败。
当然,fetch_* 这种操作会返回你操作前的值,你当然可以选择把他扔掉,但如果你在乎的话就要指定一个内存序来同步。
有一个 fetch_update ,fetch_add 这种并不是靠 fetch_update 传个闭包实现的,因为 CPU 通常会支持 fetch_add/fetch_sub 和位运算之类的操作。实际上 fetch_update 基本上是给你实现了个 compare_exchange 循环。
注意之前说过,保证所有的原子操作都是 lock-free,但不保证是 wait-free,意思是如果在不支持的平台上,可能 fetch_add 这种操作就是通过 CAS 循环实现的。
Sequencially Consistent
考虑以下的代码,z 最后的值会是多少?
fn main() {
let x: &'static _ = Box::leak(Box::new(AtomicBool::new(false)));
let y: &'static _ = Box::leak(Box::new(AtomicBool::new(false)));
let z: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
let tx = std::thread::spawn(move || {
x.store(true, Ordering::Release);
});
let ty = std::thread::spawn(move || {
y.store(true, Ordering::Release);
});
let t1 = std::thread::spawn(move || {
while !x.load(Ordering::Acquire) {}
if y.load(Ordering::Acquire) {
z.fetch_add(1, Ordering::Relaxed);
}
});
let t2 = std::thread::spawn(move || {
while !y.load(Ordering::Acquire) {}
if x.load(Ordering::Acquire) {
z.fetch_add(1, Ordering::Relaxed);
}
});
tx.join().unwrap();
ty.join().unwrap();
t1.join().unwrap();
t2.join().unwrap();
println!("{}", z.load(Ordering::SeqCst));
}
显然,2 是可以的。按照 tx, ty, t1, t2 的顺序执行就行了。
1 也可以,以 tx, t1, ty, t2 这个顺序执行。
0 是可能的吗?
已知,如果 t1 想运行,那么他必须等到 tx 运行后;同样,t2 也要等待 ty。
那什么情况下 z 可以是 0?
假设 t1 先执行,对于 x.load(Ordering::Acquire),我们知道 Acquire 要求能看到对应 Release 之前的所有操作,不过在 tx 中,Release 前没有任何操作,对 y 的操作在 ty 这个线程。所以在这里,我们实际上能够看到任何可能的 y 值。 跟 ty 是不是 run 了没关系。如果 t2 先执行也同理。
这个跟指令重排没有关系,单纯是可见性的问题。
Like
Acquire/Release/AcqRel(for load, store, and load-with-store operations, respectively) with the additional guarantee that all threads see all sequentially consistent operations in the same order.
如果我们现在把所有的 x 和 y 的 store、load 操作都改为 SeqCst,那么现在绝对不会出现 0.
因为它保证了所有线程的可见性,没有线程可以在 y 被改为 ture 后看到 y 是 false,对于 x 也是同理。
**注意,SeqCst 的顺序只与其他对应操作的 SeqCst 顺序有关联。**对于 Acquire 和 Release 来说,SeqCst 比他们更严格,所以假设 store 的顺序是 Release,那么 load 时用 SeqCst 也能保证 happens-before。
总之,SeqCst 是 Acquire + Release 再额外保证所有的线程看到的操作顺序必须一致。
诊断工具
ThreadSanitizer
很多编译器集成了 ThreadSanitizer 可以帮你检测内存并发操作的 bug。
Rust 也可以用。
Loom
loom - crates.io: Rust Package Registry 实现了一个本来用来检查 C/C++ 原子操作的算法,不过 Rust 也可以完美的复刻过来。
其他
fence
Rust 提供了 compiler_fence in std::sync::atomic - Rust,
CPU 可以乱序执行指令,但 compiler 不会把 fence 下面的代码重排到上面。
此外还有 fence in std::sync::atomic - Rust,也是用来同步 load store 操作的。
volatile
在 std::ptr 下有 read/write_volatile,但实际上他跟原子操作没关系。
它通常用在内存映射的设备,比如你的网卡,它的 packet queue 映射到了内存的某个区域,比如他可能是个 ring buffer,然后你在两个线程里分别操作 head 和 tail,读取和写入通常还会有一些副作用。
这种时候你才需要 volatile,它就是告诉编译器,它必须从内存进行操作,此外这个操作还不能被重排。
atomic ptr
基本上是 atomic usize 的一种特化,用来操作指针。
总结
实践中,没事别乱写 lock-free 的结构。大部分情况用锁就好。
除非你真的真的真的能保证你的代码没问题。