在现代C ++中正确地实现SpinLock

2021-06-26 08:44:31

我见过很多很糟糕的旋转锁,围绕着互联网。他们都犯了同样的错误:在读取修改写(RMW)操作上旋转。 IWILL解释为什么这是不好的,如何在C ++中正确地实现Spinlock。解释和实现将很容易地携带到其他编程范围内。在备注结束时,我提供了一个完整的Spinlock,以实现互斥锁名的命名。

最基本的自旋锁通过使用布尔(或单个位)来指示锁定是否保持锁定。要获取锁定原子交换操作,用于将布尔设置为true。如果第一个操作返回的先前值为false,则锁定锁定,否则我们需要在锁定成功获取锁定之前尝试:

struct tas_lock {std ::原子< BOOL> lock_ = {false}; void锁(){while(lock_. Exchange(True)); void解锁(){lock_。商店(假); }};

此实现通常称为测试和设置(TAS)锁定。其他原子化可用于实现旋锁。在我的x86_64Architecture上的基准测试中,我发现可以使用的结合原子操作之间的旋转锁性能的差异很小。此外,根据Agnerfog的指令表和UPOS.INFOALL锁定前缀指令具有非常相似的指令延迟1。

C ++原子的默认内存排序std :: memory_order_seq_cst(顺序 - 一致排序)is ord tearlyive,可以更改为std :: meminer_order_acquire uperations,它获取锁定和std :: memory_order_release for modowssthat释放锁定2.这将允许编译器发出Moreefficent代码。

struct tas_lock {std ::原子< BOOL> lock_ = {false}; void lock(){while(lock_.exchange(true,std :: memory_order_acquire); void解锁(){lock_。商店(false,std :: memory_order_release); }};

当Thelock上存在争用时,就会发生上述实施的问题。为了保留所有CPU高速缓存同步,使用缓存CONECHENCYPROTOCOCOCOC。有Aresemalal不同的缓存一致性协议(Mesi,Moesi,Mesif),但它们都有Incommon,只有单个CPU核心可以写入高速缓存行,但可以从缓存行同时读取多个CPUCITE。原子Exchange操作Quires写入存储存储锁的缓存行的访问。如果多个线程旋转尝试获取锁定,则需要大量高速缓存ConherencyTraffic来连续更改哪个核心具有独占写入访问。

由于一致性协议允许多个同时读取器,而是通过在原子负载操作(自旋等环)上旋转来旋转一致交通。而不是ContinuoS徒劳尝试获得锁定锁定我们等待锁定持有者首先释放锁定。这消除了旋转过程中的缓冲区流量:

struct ttas_lock {... void lock(){for(;;){if(!lock_.exchange(true,std :: memory_order_acquire){break; } whiled(lock_。load(std :: memory_order_relaxed)); }} ...};

此实现通常称为测试和测试和设置(TTA)锁定。注意,针对未接受的情况进行了优化。首先,它试图获取锁,如果这失败了它旋转等待锁定被释放。

今天使用的典型CPU支持同时多线程(SMT)(简称TOAS超线程(HT)BYINTEL)允许多个逻辑CPU核心作为算术逻辑单元(ALU),装载 - 存储单元(LSU)等。

我们上面的改进锁将几乎每个循环执行负载操作,而何先用户。这将使其他逻辑CPU核心共享与旋转核心相同的Load-OtheLit。为了缓解此问题,英特尔推出了暂停素制构建,该暂停素构造提供了一种暗示旋转等待循环正在运行并以某种建筑专横地限制CPU内核,以降低负载存储器上的电力使用和争用。在GCC和Clang上,我们可以使用_mm_pause()使用内置函数__builtin_ia32_pause()和msvc上的暂停指令。添加thisto我们的旋转等待循环我们得到:

struct ttas_lock {... void lock(){for(;;){if(!lock_.exchange(true,std :: memory_order_acquire){break; } whiled(lock_。load(std :: memory_order_relexed)){__builtin_ia32_pause(); }} ...};

请注意,如果您不使用超线程,并且不关心使用暂停指令的权力,将慢下锁定锁定锁定。

我创建了一个简单的基准测试,其中启动n个线程,每个线程获取和释放锁100m / n次,最后计算Alock() - 解锁()对的平均时间。

使用4个核心/ 8个超线程在我的电脑上运行我们可以看到GoodsPinlock实现比Bad Spinlock更好:

我们还可以使用perf命令查看每个实现的缓存未命中和雾化的数量。

sudo perf stat-e l1-dcache-loads,l1-dcache-load-misses,mem_inst_retive.lock_loads ./.out 100000000 8&#39的性能计数器统计数据; ./ a.out 100000000 8&#39 ;: 1,038,332,933 l1 -DCache-Loads 282,408,737 L1-DCache-Load-Misses#27.20%的所有L1-DCache HITS 161,632,506 MEM_INST_RETive.LOCK_LOADS 5.148648364秒时间39.249547000秒用户0.015969000秒SYS

sudo perf stat -e l1-dcache-loads,l1-dcache-load-messes,mem_inst_retive.lock_loads ./a.out 100000000 8&#39的性能计数器统计数据; ./ a.out 100000000 8&#39 ;: 1,067,168,715 l1 -dcache-loads 509,077,190 l1-dcache-load-misses#47.70%的所有L1-DCache匹配552,378,083 Mem_inst_retive.lock_loads 11.225589133秒时间经过82.805909000秒用户0.051584000秒SYS

以下是使用C ++ 11atomics的基本旋转锁的完整实现。它应该很容易到C11原子和其他语言。

更新2020-04-26:正如用户adops_dsponreddit的所建议在尝试toacquire之前首先检查锁是否有用。如果有人滚注循环尝试(),这将防止过度的一致性流量。

struct旋转锁{std ::原子< BOOL> lock_ = {0}; void lock()noexcept {for(;;){//乐观地假设锁定在第一次尝试时自由(!锁定_。Exchange(true,std :: memory_order_acquire){return;等待锁定才能释放而不生成缓存未命中(LOCK_。加载(STD :: Memory_Order_Relaxed)){//发出x86暂停或arm产量指令,以减少// hyper-threads __builtin_ia32_pause()之间的争用; BOOL TRY_LOCK()NOEXCEPT {//首先做一个放松的负载来检查锁是否是免费的,以防止//如果有人这样做(!try_lock())返回,不必要的缓存未命中。锁_ 。加载(std :: memory_order_relaxed)&& !!锁_ 。交换(真实,std :: memory_order_acquire); void解锁()noexcept {lock_。商店(false,std :: memory_order_release); }};

AGNER FOGG的指令延迟对于可用于创建Spinlocks的某些说明的表:

延迟:“这是指令在依赖链中生成的延迟。数字是最小值。缓存未命中,未对准和例外可能会增加时钟数量。在启用超线程的情况下,在另一个螺纹中使用相同的执行单元导致较差的性能。非正规数字,南和无穷大,不会增加延迟。使用的时间单位是核心时钟周期,而不是时间戳计数器给出的参考时钟周期。“ ↩︎

C ++委员会成员Tony Van Eerd与STD :: Memory_Order_Acquire和两个锁A和B的Redditthat评论拨打A.unlock(); B.Lock()和不同锁的锁()可以重新排序到B.Lock(); a.unlock();并引入潜在的僵局。我不这么认为。阅读部分6.9.2.1 C ++标准的数据播放(第9段)无负载ORSTORES可以床床搬入或从负载 - 获取和储存释放裤之间移动。 ↩︎