0%

为什么 std::shared_mutex 这么慢——从一条 LOCK XADD 说起

我在做一个跨语言的算子引擎,同一套语义分别用 Go、Java、C++ 实现。有一处用到了读写锁——一个请求局部的数据结构,多个算子并发读它的字段,偶尔有写。三个实现都用各自语言「标准」的读写锁:Go 的 sync.RWMutex、Java 的 ReentrantReadWriteLock、C++ 的 std::shared_mutex

某次做性能剖析时,我注意到一件有点意思的事:C++ 这边,读写锁的单次拿放在 profile 里是个能看见的开销;而 Go 和 Java 那边,同样形态的拿放几乎不出现在火焰图里。同样一把读写锁,凭什么 C++ 要贵这么多?

这篇文章记录我顺着这个疑问挖下去的整个过程:先用反汇编搞清楚 glibc 的 pthread_rwlock 到底慢在哪、Go 为什么便宜,然后把 Go 的算法移植到 C++(中间失败了一次),最后发现这个原理上完全正确的优化,在真实负载上根本测不出收益。这最后一点,可能是整件事里最值得说的。

一个简单的疑问:同样一把读写锁,凭什么差这么多

先把问题量化。我写了个最简单的 microbench:单线程,循环拿锁、放锁,测一对 lock_shared / unlock_shared 的平均耗时。没有竞争,纯粹测 uncontended fast path 的成本。

实现 单线程 uncontended ns/pair
Go sync.RWMutex RLock/RUnlock 13.75
glibc pthread_rwlock(即 Linux 上的 std::shared_mutex 17 ~ 41(重新编译后波动)

13.75ns 对 17ns 起步,看着差距不大。但 glibc 这一列有一点需要留意:重新编译后,该值会在 17 至 41 之间波动。这个波动后面会专门讲,暂且按下不表。单看下限,C++ 标准库的读写锁就是比 Go 慢了两三成。

为什么?光看 ns 数字回答不了这个问题,得看这两个 fast path 编译出来到底是什么。下面逐项来看。

glibc 的 pthread_rwlock 慢在哪

在 Linux 上,std::shared_mutex 的底层就是 pthread_rwlock。所以我直接 objdump -d libc.so.6,把 __pthread_rwlock_rdlock 的 fast path 拉出来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
endbr64                      ; CET 着陆垫
push %rbp ... push %rbx ; 完整栈帧 + 3 个被调者保存寄存器
mov 0x18(%rdi),%edx ; load __cur_writer
mov %fs:0x2d0,%eax ; ← TLS 读:拿当前线程的 TID
cmp %eax,%edx ; ← 自死锁检测:我是不是已经持有写锁
je <EDEADLK path>
cmpl $0x2,0x30(%rdi) ; ← __flags 策略分支:reader/writer preference?
je <writer-preference path>
mov $0x8,%eax
lock xadd %eax,(%rbx) ; ← 真正干活的一条:readers += 8
add $0x8,%eax
test %eax,%eax
js <overflow path>
test $0x1,%al ; WRPHASE 位检查
jne <writer-phase slow path>
...epilogue...
ret

单次 rdlock 的 fast path 大约 25 条指令。但其中真正在做「读者计数加一」这件事的,只有中间那一条 lock xadd。其余二十多条都是开销。具体贵在四个地方:

第一,这个函数不可能被 inlinestd::shared_mutex 调到 pthread_rwlock_*,是对 libc.so 里导出符号的调用。每次拿锁都要走一次 PLT 间接跳转,配一套完整的函数序言和尾声(你能在反汇编里看到 push %rbp 那一串)。

第二,每次拿锁都做自死锁检测。就是那两行 mov %fs:0x2d0cmp——读一次 TLS 拿到自己的线程 ID,跟锁里记的「当前写者」比一下,看是不是自己已经持有写锁却又来拿锁。POSIX 要求 pthread_rwlock 在这种情况下能返回 EDEADLK,所以这个检测省不掉。

第三,每次拿锁都判定一次 preference 策略pthread_rwlock 支持运行时配置成读者优先还是写者优先(那条 cmpl $0x2,0x30(%rdi)),所以这里是一个数据依赖的分支,每次都要判。

第四,unlock 是对称的,同样二十来条指令。所以一对完整的拿放,大约 50 条指令外加 2 次 PLT 调用。

这些开销没有一条是「写得烂」。它们全部是 POSIX 通用性的代价——glibc 的 pthread_rwlock 必须服务所有可能的用法:可配置的优先级策略、跨进程共享、死锁检测、属性定制。我这个请求局部的小锁一条都用不上,但只要用 std::shared_mutex,这些代价就一分不少地付了。

Go 为什么便宜:fetch_add 加符号位标记

那 Go 的 sync.RWMutex 是怎么做到 fast path 只有三条指令的?把它的源码翻出来(src/sync/rwmutex.go,去掉 race 检测的注解):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// 有写者在等或在持有 → 阻塞
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r) // 拆出去,保证 fast path 能 inline
}
}
func (rw *RWMutex) Lock() {
rw.w.Lock() // 先和其它写者互斥
// 标记:把 readerCount 减去一个极大值,使其变成深度负数
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0) // 等存量读者释放锁
}
}

这套算法的精巧之处,在三个设计点上:

第一,fetch_add 永不失败,符号位本身就是信号。 读者进来无条件执行 readerCount.Add(1),然后看返回值的符号。正数说明没有写者,直接进临界区;负数说明有写者,转去阻塞。写者要介入时,不另设一个标志位,而是把 readerCount 减去一个极大值(1 << 30),使它变成一个深度负数。这样读者的 +1 一旦撞上写者,结果都是负的,符号位就把「有没有写者」这个信息编码了进去。一个变量同时承载了读者计数和写者在场两件事,省掉了第二个标志位。

为什么这比 CAS 好?因为 fetch_add 在 x86 上就是一条 LOCK XADD,硬件保证它一定成功。多个读者并发进来,硬件把这些加法排成队,逐个加成功,没有任何重试。而 CAS(compare-and-swap)的语义是「我认为现在是 5,把它改成 6,如果不是 5 就失败重来」——并发高的时候,多个线程抢着改同一个值,失败重试会形成风暴。fetch_add 从根本上消除了这个问题。

第二,撞上写者的 +1 不回滚。 读者 +1 之后发现有写者,这个 +1 要不要撤销?不需要。Go 的做法是让它保留,由写者的 Unlock 来收尾——写者解锁时把 readerCount 加回那个极大值,得到的新值恰好等于「写者持有期间排队进来的读者数」,按这个数 release 对应次数的信号量,计数始终是平衡的。不回滚意味着不需要第二次原子操作,省下一半开销。

第三,阻塞用信号量,不空转。 被写者挡住的读者,是去 runtime_Semacquire 上阻塞(底层是 futex),把 CPU 让出来,而不是原地空转等待。

这三点加起来的结果:RLock 的 fast path 编译出来是 LOCK XADD + TEST + 一条不命中的 JS(条件跳转),总共三条指令,而且因为 rUnlockSlow 被拆了出去,编译器可以把整个 fast path inline 进调用方,没有函数调用、没有栈帧。

对比就很清楚了:glibc 那 25 条指令里,真正干活的也是同一条 LOCK XADD,区别全在它外面包了多少层通用性。Go 把这些层全部去掉了——代价是它不可重入、不可跨进程、preference 策略写死成写者优先。但对一个语言运行时内部自用的锁来说,这些「不支持」恰好都是不需要的。

第一次尝试,以及它为什么失败

搞清楚原理之后,思路就明确了:在 C++ 里自己实现一个读写锁,把 glibc 那些用不上的通用性砍掉。我第一版(叫它 v1)是这么写的——读者锁的逻辑是「先 load 看一眼有没有写者,没有就 CAS 把计数加一」,解锁也是 CAS 循环;写者在等的时候,读者就 std::this_thread::yield() 自旋让出时间片。

实测的数字并不理想:

实现 单线程 uncontended 16 readers
glibc pthread_rwlock 17 1294
v1(CAS 路线) 22.41 2029

单线程比 glibc 还慢,16 个读者并发时差不多是 glibc 的 1.6 倍——自己实现的「优化版」,比想替换掉的标准库还慢。问题出在三个地方,每一个都正好对应前面讲 Go 时提到的那三个设计点:

根因一,用 CAS 代替了 fetch_add 我每次拿放都是「load 一次 + CMPXCHG 一次」两次原子操作访问同一条 cacheline,而 Go 的 fetch_add 只要一次 LOCK XADD。单线程下这就解释了 22ns 对 10ns 的差距——纯粹是多了一倍的原子操作。

根因二,多读者并发时 CAS 互相干扰。 也就是前面提到的重试风暴:16 个读者同时想把计数加一,每一个成功的 +1 都会让其它 15 个读者手里那个「认定的旧值」失效,于是它们 CAS 失败、重读、再试。竞争越激烈,空转越多。2029ns 这个数字正是由此而来。

根因三,yield 空转白白消耗 CPU。 我的服务跑在一个限制了 CPU 配额的 cgroup 里(只有两个核)。写者在场时读者 yield 空转,看似是「礼貌地让出时间片」,但在核心数本就紧张的环境里,这些空转占用的 CPU,是从同机其它真正在干活的线程那里挤占来的。更严重的是,在一个混合读写的压测里,v1 直接使读者饿死——写者一来,读者全在空转,吞吐量跌到正常值的几十分之一。

这里还有一段插曲值得记一下。v1 之所以走 CAS 这条路,是因为我更早的一版本来用的是 fetch_add,但那一版有个状态损坏的 bug——计数器会变成 0xFFFFFFFF 这样一个无意义的值。当时我没有细查根因,直接想「那就全改成 CAS 吧,CAS 至少能保证读—改—写的原子性」,结果就踩中了上述三个问题。

后来复盘才发现,那个 fetch_add 版本的 bug 根本不在 fetch_add 上:问题出在写者从 pending 转到 holding 状态时用了一个普通的 store,把一个并发读者刚做的 +1 覆盖掉了,导致后续的减法下溢成那个无意义的值。该修的是写者那一处状态转换,我却连读者的 fetch_add 一起改掉了。这是一次典型的矫枉过正——为了绕开一个其实在别处的 bug,把原本正确的部分也推翻了,还顺带换来一个性能更差的实现。

把 Go 的算法移植到 C++

第二版(v2)的思路很直接:不再自行其是,精确复刻 Go 的算法。状态就是四个成员:

1
2
3
4
5
std::atomic<int32_t> reader_count_{0};   // 持锁读者数;写者通过减去 1<<30 将其标记为负
std::atomic<int32_t> reader_wait_{0}; // 写者需要等待的存量读者数
std::mutex writer_mu_; // 写者之间互斥
std::counting_semaphore<> reader_sem_{0};
std::counting_semaphore<1> writer_sem_{0};

读者锁就是一条 fetch_add 加一个符号检查,和 Go 一模一样:

1
2
3
4
5
6
7
void lock_shared() noexcept {
if (reader_count_.fetch_add(1, std::memory_order_acquire) < 0) {
// 有写者在场。本次 +1 已经计入,写者的 unlock 会按排队总数
// 为其 release 一次信号量。阻塞,不空转。
reader_sem_.acquire();
}
}

写者锁首先与其它写者互斥,而后使 reader_count_ 减去 kMaxReaders 作为标记,再等存量的读者释放锁:

1
2
3
4
5
6
7
void lock() noexcept {
writer_mu_.lock();
int32_t r = reader_count_.fetch_sub(kMaxReaders, std::memory_order_acq_rel);
if (r != 0 && reader_wait_.fetch_add(r, std::memory_order_acq_rel) + r != 0) {
writer_sem_.acquire();
}
}

把 Go 翻译成 C++ 时,有一个容易出错的细节:Go 的 atomic.Add 返回的是加完之后的新值,而 C++ 的 fetch_add / fetch_sub 返回的是加之前的旧值。 所以 Go 里 readerCount.Add(-max) + max 这个「先减、再加回来还原成新值」的写法,在 C++ 里直接取 fetch_sub(max) 的返回值就对了——因为它返回的正是减之前的旧值,省去了那次加回来。同理,Go 的 readerWait.Add(r) != 0 对应 C++ 的 reader_wait_.fetch_add(r) + r != 0。这种地方若逐字照抄 Go,符号和时序就全错了,必须理解它在计算什么才能翻译正确。

还有一处实现选择:Go 的阻塞用的是 runtime 自己的信号量(与调度器协作的那一种),C++ 这边我用 C++20 的 std::counting_semaphore,libstdc++ 底层同样是 futex,语义对得上,而且能让整个锁保持 header-only,不引入额外依赖。

唯一不能照搬的是 try_lock_shared。它不能无条件 fetch_add——万一此时 reader_count_ 是负的(有写者),这个 +1 会把自己登记进写者的排队计数里,为了不破坏计数平衡就只能转去阻塞,而这违背了 try 的语义。所以这个接口需要改用「仅当非负时才 CAS」的写法。

实现完成后,它对外暴露的是 lock / unlock / lock_shared / unlock_shared / try_lock / try_lock_shared 这套标准接口,因此可以直接配合 std::shared_lockstd::unique_lock 使用,调用方的代码无需改动。

收效,以及一个意外的发现

先看 microbench 的结果,这一部分相当理想:

场景 glibc pthread_rwlock v1 v2 Go 1.26
单线程 uncontended 17.06 22.41 10.14 13.75
8 readers 708 701 379 357
16 readers 1736 2029 825 707
mixed 8 读 1 写 写者饿死 读者饿死 读写平衡

单线程 10.14ns,不仅达到了 Go 的水准,还略微超过它(13.75ns)——因为 C++ 这边 inline 之后,连 Go 那点函数调用和调度器协作的开销都省掉了。多读者并发的各档位都贴近 Go(相差 5% 到 17%),是 glibc 的 1.7 到 2.2 倍。混合读写的场景对比更明显:glibc 在这个压测里会饿死写者,v1 会饿死读者,只有 v2 两边都不饿死——因为 fetch_add 不会排挤写者的标记,而信号量阻塞不消耗 CPU。正确性也通过了验证:TSan 在并发压测下报告零数据竞争,此前 v1 会活锁的那个 16 读 4 写的 stress,现在瞬间完成。

到这里,目标看起来已经达成。但接下来做的一件事,结果出乎我的预料——我把这把锁真正换进生产负载,做了一次端到端测试。

数字是这样的:

测量环境 glibc QPS v2 QPS 差异
正常压测脚本 227.5 211.5 -7.0%
perf 监控下 173.0 178.4 +3.1%
裸跑 229.3 234.1 +2.1%

三种环境,给出了 -7% 到 +3% 互相矛盾的结果。一把在 microbench 里快了一倍的锁,换进真实负载之后,已经无法判断它究竟是快了还是慢了。

这正好解释了开头留下的那个问题——为什么 glibc 的数字会在 17 到 41 之间波动:这些百分之几的起伏,全部是二进制布局噪声。 每次重新编译,函数在二进制里的地址、对齐方式都会有微小变化,进而影响指令缓存命中和分支预测——这些与你改的那行代码毫无关系,却足以造成 ±5% 到 ±7% 的 QPS 起伏。这是一个已知的现象(学术上称为 Stabilizer 问题)。前面那三组互相矛盾的数字,原因不在锁,而在于它们来自三次独立的编译。

为了看清真实情况,我做了一次 profiling 对照,同一时段、同一压力、同一负载,只替换锁实现。结果值得一看:

glibc build v2 build
pthread_rwlock_rdlock 1.14% 符号消失
pthread_rwlock_unlock 1.03% 符号消失
读路径函数本体 1.97% 3.68%

换成 v2 之后,那两个 pthread_rwlock 的符号确实从 profile 里消失了——省下约 2.2%。但与此同时,读路径那个函数本体的占比从 1.97% 升到 3.68%,增量几乎正好等于消失的那部分。v2 的锁成本并没有消失,只是转移了位置。 glibc 那一版,锁是两个独立的 PLT 符号,单独可见;v2 全部 inline 进了调用方,fetch_add 的那几个周期被计入读路径函数自身,不再单独显示,但开销依然存在。

我们把具体的数字算一遍:锁的 fast path 从 17ns 降到 10ns,确实是 -40%。但这把锁在整个负载里只占大约 2% 的 CPU。2% 的 40%,是 0.8 个百分点。而二进制布局噪声的起伏是 ±5%。也就是说,这个真实存在、且 microbench 可以证明的优化收益,被淹没在了比它大五六倍的测量噪声里。

所以最后的决定是:生产代码维持用 std::shared_mutex 它是标准件,零维护成本,而在这个负载上它和我手写的锁没有可测量的差异。我手写的那个 v2 锁,作为一个已经验证过正确性和性能的实现保留在代码库里,等哪天真的出现了锁占比超过 5% 的负载——更高的并发、更大的数据量、调用方没法把锁提到循环外面——一行 typedef 就能切过去。

小结

这件事可以拆成两个独立的收获,一个关于「为什么慢」,一个关于「要不要快」。

关于为什么慢,答案很简单:std::shared_mutex 的开销,绝大部分是通用性的代价。 glibc 的 pthread_rwlock 那 25 条指令里只有 1 条在干活,其余 24 条买的是可配置优先级、跨进程共享、死锁检测、属性定制——这些能力对一把进程内自用的锁全是浪费,但标准库无从知道你不需要,只能一并提供。Go 的 sync.RWMutex 快,不靠什么特殊技巧,而是因为它是语言运行时的内部组件,可以名正言顺地砍掉一切它不需要支持的功能。fetch_add 加符号位标记这套算法很精巧,但它能这样写的前提,是「不打算支持可重入、不打算跨进程」。性能很多时候不是优化出来的,而是放弃换来的——放弃多少通用性,就换回多少速度。

关于要不要快,这里有一个很微妙的点。我做的是一个完全正确的优化:原理分析对、实现对、microbench 证明单次成本降了一半、正确性有 TSan 背书。但它在真实负载上的收益(0.8 个百分点)小于测量噪声(±5%)——也就是说,如果只看端到端 QPS,我永远无法证明这个优化有用,甚至会被那三组互相矛盾的数字误导,以为它是负优化。这把锁在该负载里只占 2% 的 CPU,再怎么优化,上限也就是那 2 个百分点。真正的瓶颈并不在这里。

这背后是 microbench 与真实负载之间那道经典的鸿沟。microbench 把锁单独拎出来反复施压,自然能放大出一倍的差距;可一旦放回完整系统,它只是 2% 的成分,淹没在更大的开销和测量噪声里。一个优化「能做」和「该做」是两回事——前者看 microbench,后者要看它在整个系统里占多大比重、收益有没有超过噪声。这次我的运气在于,动手之前并没有指望它解决什么大问题,所以代码写完、验证完、留作备用,没有为了一个测不出来的收益硬塞进生产。

最后留一个实用的副产品:手写这把锁最大的价值,也许不在性能,而在于它逼我把 std::shared_mutex 慢在哪、Go 的 sync.RWMutex 凭什么快、fetch_add 和 CAS 在高并发下差在哪,这些原先一知半解的东西,彻底弄清楚了。有时候为了一个用不上的优化把原理啃透,比优化本身更有价值。

附:完整实现

下面是 v2 的完整实现,单个头文件,无外部依赖。它把 Go 的 sync.RWMutex 算法(Go 1.26 src/sync/rwmutex.go)移植到 C++,对外暴露 lock / unlock / lock_shared / unlock_shared / try_lock / try_lock_shared 这套标准接口,因此可以直接配合 std::shared_lockstd::unique_lock 使用。语义与 std::shared_mutex 一致到足以互换的程度:不可重入,写者优先(写者一旦标记,新读者排在它后面,写者不会饿死),对未加锁的锁执行 unlock 是未定义行为(debug 构建下由断言捕获)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#pragma once

#include <atomic>
#include <cassert>
#include <cstdint>
#include <mutex>
#include <semaphore>

// 一把读写锁,将 Go 的 sync.RWMutex 算法移植到 C++。
//
// 存在的理由:glibc 的 pthread_rwlock(Linux 上 std::shared_mutex 的底层)
// 在 uncontended 快速路径上每次加锁约 25 条指令——完整的函数序言/尾声
// (经 PLT 调用,无法 inline)、一次用于自死锁检测的 TLS 读、一个
// reader/writer preference 策略分支,最后才是真正干活的那条 LOCK XADD。
// Go 的 RLock 快速路径 inline 之后是 LOCK XADD 加一个符号判断,约 3 条指令。
//
// 算法要点:
// reader_count_ int32 —— 当前持锁的读者数。写者通过减去 kMaxReaders
// 把它打成深度负数,作为「写者在场」的标记。读者
// 无条件 fetch_add(1),永不重试、永不回滚,靠结果
// 的符号判断前方是否有写者。
// reader_wait_ int32 —— 写者需要等待的、标记之前已在场的读者数。最后一个
// 退场的读者负责 post writer_sem_。
// writer_mu_ —— 写者之间互斥。
// reader_sem_ / writer_sem_ —— 用于阻塞的计数信号量(底层 futex);任何
// 路径都不空转。
//
// 生产流量只会命中快速路径(请求局部的数据结构,写者竞争很少):
// lock_shared: 一条 LOCK XADD + 一个不命中的分支
// unlock_shared: 一条 LOCK XADD + 一个不命中的分支
class SharedMutex {
public:
SharedMutex() noexcept = default;
SharedMutex(const SharedMutex&) = delete;
SharedMutex& operator=(const SharedMutex&) = delete;

// ---- 共享(读者)加锁 ----

void lock_shared() noexcept {
if (reader_count_.fetch_add(1, std::memory_order_acquire) < 0) {
// 有写者在等或在持有。本次 +1 已经计入,写者的 unlock 会按排队
// 总数为我们 post 恰好一次 reader_sem_。阻塞,不空转。
reader_sem_.acquire();
}
}

bool try_lock_shared() noexcept {
// 此处不能无条件 fetch_add:若 reader_count_ 为负(有写者),这个 +1
// 会把自己登记进写者的排队计数,为了维持平衡就只能阻塞。仅当没有
// 写者时才用 CAS。
int32_t s = reader_count_.load(std::memory_order_relaxed);
while (s >= 0) {
if (reader_count_.compare_exchange_weak(s, s + 1, std::memory_order_acquire,
std::memory_order_relaxed)) {
return true;
}
}
return false;
}

void unlock_shared() noexcept {
int32_t r = reader_count_.fetch_sub(1, std::memory_order_release);
if (r < 0) {
// 对应 Go rUnlockSlow 的致命检查:r == 0 表示 unlock 了一把未加锁的锁;
// r == -kMaxReaders 表示在只有写者持有时执行了 unlock_shared。
assert(r != 0 && r != -kMaxReaders && "unlock_shared of unlocked SharedMutex");
// 写者正在等标记之前在场的读者退场,最后一个退场者 post 写者信号量。
if (reader_wait_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
writer_sem_.release();
}
}
}

// ---- 独占(写者)加锁 ----

void lock() noexcept {
// 先解决与其它写者的竞争。
writer_mu_.lock();
// 向读者标记写者在场:把 reader_count_ 打成负数。fetch_sub 返回旧值,
// 即标记时刻仍在持锁的读者数。
int32_t r = reader_count_.fetch_sub(kMaxReaders, std::memory_order_acq_rel);
assert(r < kMaxReaders && "lock() while already write-locked");
// 等这些读者退场。它们可能在我们 fetch_sub 与此处之间已经退场——各自把
// reader_wait_ 减到了零以下,而我们 fetch_add(r) 恰好把它加回到零,
// 表示无需等待。
if (r != 0 && reader_wait_.fetch_add(r, std::memory_order_acq_rel) + r != 0) {
writer_sem_.acquire();
}
}

bool try_lock() noexcept {
if (!writer_mu_.try_lock()) {
return false;
}
int32_t expected = 0;
if (!reader_count_.compare_exchange_strong(expected, -kMaxReaders, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
writer_mu_.unlock();
return false;
}
return true;
}

void unlock() noexcept {
// 撤销标记:把 reader_count_ 加回非负。新值(旧值 + kMaxReaders)就是
// 我们持锁期间到达、此刻全部阻塞在 reader_sem_ 上的读者数。
int32_t r = reader_count_.fetch_add(kMaxReaders, std::memory_order_release) + kMaxReaders;
assert(r < kMaxReaders && "unlock of unlocked SharedMutex");
if (r > 0) {
reader_sem_.release(r);
}
// 放下一个写者进来。
writer_mu_.unlock();
}

private:
static constexpr int32_t kMaxReaders = 1 << 30;

std::atomic<int32_t> reader_count_{0};
std::atomic<int32_t> reader_wait_{0};
std::mutex writer_mu_;
std::counting_semaphore<> reader_sem_{0};
std::counting_semaphore<1> writer_sem_{0};
};
俗话说,投资效率是最好的投资。 如果您感觉我的文章质量不错,读后收获很大,预计能为您提高 10% 的工作效率,不妨小额捐助我一下,让我有动力继续写出更多好文章。