读写锁RWMutex是一个读写互斥锁.可以说是Mutex的一个改进版.在某些场景下可

以发挥更加灵活的控制力.读取数据频率远远大于写数据频率的场景.

1.实现读写锁解决问题:

1).写锁需要阻塞写锁:

一个协程拥有写锁时.其他协程写锁定需要阻塞.

2).写锁需要阻塞读锁:

一个协程拥有写锁时.其他协程读锁定需要阻塞.

3).读锁需要阻塞写锁:

一个协程拥有读锁时.其他协程写锁定需要阻塞.

4).读锁不能阻塞读锁:

一个协程拥有读锁时.其他协程也可以有读锁.

2.读写锁数据结构:

type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}

w:用于控制多个写锁.获得写锁首先要获取该锁.

writerSem:写阻塞等待信号量.最后一个读者释放锁时会释放信号量.

readerSem:读阻塞的协程等待的信号量.持有写锁的协程释放锁后会释放信号量.

readerCount:记录读者个数.

readerWait:记录写阻塞时读者个数.

以上数据结构可见.读写锁内部仍有一个互斥锁..用于将多个写操作隔离开来.其余用

于隔离读操作和写操作.

3.接口定义:

RLock():读锁定(记忆为ReadLock).

RUnlock:解除读锁定(记忆为ReadUnlock).

Lock():写锁定.与Mutex完全一致.

Unlock():解除写锁定.与Mutex完全一致.

TryLock():以非阻塞方式尝试写锁定(Go1.18引入).

TryRLock():以非阻塞方式尝试读锁定(Go1.18引入).

4.Lock()实现逻辑:

获取互斥锁.

阻塞等待所有读操作结束.(如果有).

源码如下:

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
	if race.Enabled {
		race.Read(unsafe.Pointer(&rw.w))
		race.Disable()
	}
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

5.Unlock()实现逻辑:

唤醒因读锁定而被阻塞的协程(如果有).

解除互斥锁.

源码如下:

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked [RWMutex] is not associated with a particular
// goroutine. One goroutine may [RWMutex.RLock] ([RWMutex.Lock]) a RWMutex and then
// arrange for another goroutine to [RWMutex.RUnlock] ([RWMutex.Unlock]) it.
func (rw *RWMutex) Unlock() {
    if race.Enabled {
       race.Read(unsafe.Pointer(&rw.w))
       race.Release(unsafe.Pointer(&rw.readerSem))
       race.Disable()
    }

    // Announce to readers there is no active writer.
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
       race.Enable()
       fatal("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    for i := 0; i < int(r); i++ {
       runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
    if race.Enabled {
       race.Enable()
    }
}

6.RLock实现的逻辑:

增加读操作计数.即readerCount++.

阻塞等待写操作结束.(如果有).

源码如下:

// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the [RWMutex] type.
func (rw *RWMutex) RLock() {
    if race.Enabled {
       race.Read(unsafe.Pointer(&rw.w))
       race.Disable()
    }
    if rw.readerCount.Add(1) < 0 {
       // A writer is pending, wait for it.
       runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
    if race.Enabled {
       race.Enable()
       race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

7.RUnlock()实现的逻辑:

减少读操作计数.即readerCount--.

唤醒等待写操作的协程(如果有).

// RUnlock undoes a single [RWMutex.RLock] call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
    if race.Enabled {
       race.Read(unsafe.Pointer(&rw.w))
       race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
       race.Disable()
    }
    if r := rw.readerCount.Add(-1); r < 0 {
       // Outlined slow-path to allow the fast-path to be inlined
       rw.rUnlockSlow(r)
    }
    if race.Enabled {
       race.Enable()
    }
}

注:

即便有协程阻塞等待写操作.也并不是所有的解除读锁定操作都会唤醒该协程.而是最

后一个解除读锁定的协程才会释放信号量将该协程唤醒.因为只有当所有读操作协程

释放锁后才可以唤醒协程.

8.场景分析:

1).写操作如何阻止写操作的:

读写锁包含一个互斥锁.写锁必定要先获取该互斥锁.如果互斥锁已被协程A获取(或者

协程A在阻塞等待读结束).则意味着协程A获取了互斥锁.协程B只能阻塞等待该互斥

锁.所以写操作依赖互斥锁阻止其他写操作.

2).写操作如何阻止读操作:

RWMutex.readerCount是一个整型值.用于表示读者数量.在不考虑写操作的情况

下.每次执行读锁定操作后该值+1.每次解除读锁定后将该值-1.所以readerCount的

取值为[0,N].N为读者个数.实际上支持最大可支持2的30次方个并发读者.

当执行写锁定的时候.会先将readerCount减去2的30次方.从而readerCount变成

了负值.此时再有读锁定到来时检测到readerCount为负值.便知道有写操作在执行.

只好阻塞等待.真实的操作个数并不会丢失.只需要将readerCount加上2的30次方.

所以.写操作是将readerCount变成负值来阻止读操作的.

3).读操作是如何阻止写操作的.

读锁定回显将RWMutex.readerCount加1.此时写操作到来时发现读者数量不为0.

会阻塞等待所有读操作结束,所以读操作是通过readerCount来阻止写操作的.

4)为什么写锁定不会被饿死:

写操作要等待读操作结束后才可以获得锁.写操作等待期间可能还有新的读操作持续

到来.如果写操作等待所有读操作结束.则很有可能被饿死.通过

RWMutex.readerWait可以完美的解决这个问题.

写操作到来时.会把RWMutex.readerCount值拷贝到RWMutex.readerWait中.

用于标记排在写操作前面的读者个数.

前面的读操作结束后.除了会递减RWMutex.readerCount.还会递减

RWMutex.readerWait值.当RWMutex.readerWait值变为0时唤醒写操作.

我喜欢风的透彻.它抚慰了心灵.

如果大家喜欢我的分享的话.可以关注我的微信公众号

念何架构之路

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐