Go进阶并发控制之RWMutex
Go语言中的RWMutex读写锁通过分离读/写操作提高并发性能,适用于读多写少场景。其核心机制包括:1)互斥锁控制写操作独占;2)readerCount记录读操作数量,负值表示有写操作;3)readerWait确保写操作不被饿死。实现上,写锁会阻塞后续读写,读锁仅阻塞写操作而不互斥,并通过信号量协调读写操作。该设计有效平衡了并发性能与数据一致性。
读写锁RWMutex是一个读写互斥锁.可以说是Mutex的一个改进版.在某些场景下可
以发挥更加灵活的控制力.读取数据频率远远大于写数据频率的场景.
1.实现读写锁解决问题:
1).写锁需要阻塞写锁:
一个协程拥有写锁时.其他协程写锁定需要阻塞.
2).写锁需要阻塞读锁:
一个协程拥有写锁时.其他协程读锁定需要阻塞.
3).读锁需要阻塞写锁:
一个协程拥有读锁时.其他协程写锁定需要阻塞.
4).读锁不能阻塞读锁:
一个协程拥有读锁时.其他协程也可以有读锁.
2.读写锁数据结构:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
w:用于控制多个写锁.获得写锁首先要获取该锁.
writerSem:写阻塞等待信号量.最后一个读者释放锁时会释放信号量.
readerSem:读阻塞的协程等待的信号量.持有写锁的协程释放锁后会释放信号量.
readerCount:记录读者个数.
readerWait:记录写阻塞时读者个数.
以上数据结构可见.读写锁内部仍有一个互斥锁..用于将多个写操作隔离开来.其余用
于隔离读操作和写操作.
3.接口定义:
RLock():读锁定(记忆为ReadLock).
RUnlock:解除读锁定(记忆为ReadUnlock).
Lock():写锁定.与Mutex完全一致.
Unlock():解除写锁定.与Mutex完全一致.
TryLock():以非阻塞方式尝试写锁定(Go1.18引入).
TryRLock():以非阻塞方式尝试读锁定(Go1.18引入).
4.Lock()实现逻辑:
获取互斥锁.
阻塞等待所有读操作结束.(如果有).
源码如下:
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

5.Unlock()实现逻辑:
唤醒因读锁定而被阻塞的协程(如果有).
解除互斥锁.
源码如下:
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked [RWMutex] is not associated with a particular
// goroutine. One goroutine may [RWMutex.RLock] ([RWMutex.Lock]) a RWMutex and then
// arrange for another goroutine to [RWMutex.RUnlock] ([RWMutex.Unlock]) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}

6.RLock实现的逻辑:
增加读操作计数.即readerCount++.
阻塞等待写操作结束.(如果有).
源码如下:
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the [RWMutex] type.
func (rw *RWMutex) RLock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.Disable()
}
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

7.RUnlock()实现的逻辑:
减少读操作计数.即readerCount--.
唤醒等待写操作的协程(如果有).
// RUnlock undoes a single [RWMutex.RLock] call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
race.Read(unsafe.Pointer(&rw.w))
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := rw.readerCount.Add(-1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
注:
即便有协程阻塞等待写操作.也并不是所有的解除读锁定操作都会唤醒该协程.而是最
后一个解除读锁定的协程才会释放信号量将该协程唤醒.因为只有当所有读操作协程
释放锁后才可以唤醒协程.

8.场景分析:
1).写操作如何阻止写操作的:
读写锁包含一个互斥锁.写锁必定要先获取该互斥锁.如果互斥锁已被协程A获取(或者
协程A在阻塞等待读结束).则意味着协程A获取了互斥锁.协程B只能阻塞等待该互斥
锁.所以写操作依赖互斥锁阻止其他写操作.
2).写操作如何阻止读操作:
RWMutex.readerCount是一个整型值.用于表示读者数量.在不考虑写操作的情况
下.每次执行读锁定操作后该值+1.每次解除读锁定后将该值-1.所以readerCount的
取值为[0,N].N为读者个数.实际上支持最大可支持2的30次方个并发读者.
当执行写锁定的时候.会先将readerCount减去2的30次方.从而readerCount变成
了负值.此时再有读锁定到来时检测到readerCount为负值.便知道有写操作在执行.
只好阻塞等待.真实的操作个数并不会丢失.只需要将readerCount加上2的30次方.
所以.写操作是将readerCount变成负值来阻止读操作的.
3).读操作是如何阻止写操作的.
读锁定回显将RWMutex.readerCount加1.此时写操作到来时发现读者数量不为0.
会阻塞等待所有读操作结束,所以读操作是通过readerCount来阻止写操作的.
4)为什么写锁定不会被饿死:
写操作要等待读操作结束后才可以获得锁.写操作等待期间可能还有新的读操作持续
到来.如果写操作等待所有读操作结束.则很有可能被饿死.通过
RWMutex.readerWait可以完美的解决这个问题.
写操作到来时.会把RWMutex.readerCount值拷贝到RWMutex.readerWait中.
用于标记排在写操作前面的读者个数.
前面的读操作结束后.除了会递减RWMutex.readerCount.还会递减
RWMutex.readerWait值.当RWMutex.readerWait值变为0时唤醒写操作.
我喜欢风的透彻.它抚慰了心灵.
如果大家喜欢我的分享的话.可以关注我的微信公众号
念何架构之路
更多推荐




所有评论(0)