rwlock是基于qspinlock的,同一时间多个reader可以获得该lock,但是同一时间只有一个writer可以获得该lock。(下面分析还是基于qspinlock 的base commit a33fda35e3a7655fb7df756ed67822afb5ed5e8d)
1.structure
typedef struct qrwlock {
atomic_t cnts;
arch_spinlock_t lock;
} arch_rwlock_t;
atomic_t cnts;
arch_spinlock_t lock;
} arch_rwlock_t;
cnts: 用来标记是有多个在拿reader锁或者writer锁。
lock: qspin_lock
2. 例子
(1) CPU1/2 reader lock
- CPU1 通过cnts+= _QR_BIAS拿到reader 锁。
- CPU2 通过cnts+= _QR_BIAS拿到reader 锁。
- CPU1 通过cnts -= _QR_BIAS释放reader 锁。
- CPU2 通过cnts -= _QR_BIAS释放reader 锁。
(2) CPU1 reader lock – CPU2 writer lock – CPU3 reader lock
- CPU1 通过cnts+= _QR_BIAS拿到reader 锁。
- CPU2 想要拿写锁,坚持cnts是否为0, 如果不为0,代表已经有读锁或者写锁被其他人拿到(这里有CPU1的读锁拿到了锁),然后CPU2 spin_lock(lock),这里没有其他CPU获得该spin_lock,CPU2 获得了锁,然后利用cnts |= _QW_WAITING通知后续的想要读或者写的CPU走show_patch也就是首先获得spin_lock的逻辑,然后CPU2等待cnts 变为_QW_WAITING(CPU1 已经写入了_QR_BIAS,这里是等CPU1清楚了_QR_BIAS)。
- CPU3 想要获得读锁,但是发现cents & _QW_MASK != 0,所以需要走slow patch,首先尝试获得spin_lock但是该lock被CPU2拿着所以pending。
- CPU1 cnts-=_QR_BIAS 释放reader 锁。
- CPU2 看到了 cents = _QW_WATING,也就是前面的读锁已经释放了,set cnts = _QW_LOCKED 获得该写锁,CPU2 unlock spin_lock,CPU3 因为在等待该spin_lock,所以获得该spin_lock,CPU3 等待 cnts & _QW_WMASK == 0, 也就是所有的写锁被释放。
- CPU2 通过 cnts -= _QW_LOCKED 是否写锁。
- CPU3 看到了cnts & _QW_WMASK == 0,通过cnts += _QR_BIAS获得该读锁,并且释放spin_lock。
3. code
/**
* queue_read_lock_slowpath - acquire read lock of a queue rwlock
* @lock: Pointer to queue rwlock structure
*/
void queue_read_lock_slowpath(struct qrwlock *lock)
{
u32 cnts;
/*
* Readers come here when they cannot get the lock without waiting
*/
if (unlikely(in_interrupt())) {
/*
* Readers in interrupt context will spin until the lock is
* available without waiting in the queue.
*/
cnts = smp_load_acquire((u32 *)&lock->cnts);
rspin_until_writer_unlock(lock, cnts);
return;
}
/* 这里减去了_QR_BIAS 因为该cpu并没有获得reader锁 */
atomic_sub(_QR_BIAS, &lock->cnts);
/*
* Put the reader into the wait queue
*/
arch_spin_lock(&lock->lock);
/*
* At the head of the wait queue now, wait until the writer state
* goes to 0 and then try to increment the reader count and get
* the lock. It is possible that an incoming writer may steal the
* lock in the interim, so it is necessary to check the writer byte
* to make sure that the write lock isn't taken.
*/
while (atomic_read(&lock->cnts) & _QW_WMASK)
cpu_relax_lowlatency();
/* 这里可以确定的是全面又一个writer lock已经释放了。(因为这个writer
* lock,reader lock需要走slowpath
* 但是在下面这个函数之前,也就是 &lock->cnts + _QR_BIAS 之前,
* lock->cnts是可能= 0的,这种情况下,在这个间隙之间,
* 如果有其他的writer lock 来是可以直接拿到锁的.
* 如果有其他的read 想拿到锁(在没有writer拿到锁的情况下)也是可以直接拿到锁了。
*/
cnts = atomic_add_return(_QR_BIAS, &lock->cnts) - _QR_BIAS;
/* 由于上面解释说在间隙间是有可能被另外的writer拿到锁的所以需要等待
* 该writer lock 释放后在继续
*/
rspin_until_writer_unlock(lock, cnts);
/*
* Signal the next one in queue to become queue head
*/
arch_spin_unlock(&lock->lock);
}
EXPORT_SYMBOL(queue_read_lock_slowpath);
* queue_read_lock_slowpath - acquire read lock of a queue rwlock
* @lock: Pointer to queue rwlock structure
*/
void queue_read_lock_slowpath(struct qrwlock *lock)
{
u32 cnts;
/*
* Readers come here when they cannot get the lock without waiting
*/
if (unlikely(in_interrupt())) {
/*
* Readers in interrupt context will spin until the lock is
* available without waiting in the queue.
*/
cnts = smp_load_acquire((u32 *)&lock->cnts);
rspin_until_writer_unlock(lock, cnts);
return;
}
/* 这里减去了_QR_BIAS 因为该cpu并没有获得reader锁 */
atomic_sub(_QR_BIAS, &lock->cnts);
/*
* Put the reader into the wait queue
*/
arch_spin_lock(&lock->lock);
/*
* At the head of the wait queue now, wait until the writer state
* goes to 0 and then try to increment the reader count and get
* the lock. It is possible that an incoming writer may steal the
* lock in the interim, so it is necessary to check the writer byte
* to make sure that the write lock isn't taken.
*/
while (atomic_read(&lock->cnts) & _QW_WMASK)
cpu_relax_lowlatency();
/* 这里可以确定的是全面又一个writer lock已经释放了。(因为这个writer
* lock,reader lock需要走slowpath
* 但是在下面这个函数之前,也就是 &lock->cnts + _QR_BIAS 之前,
* lock->cnts是可能= 0的,这种情况下,在这个间隙之间,
* 如果有其他的writer lock 来是可以直接拿到锁的.
* 如果有其他的read 想拿到锁(在没有writer拿到锁的情况下)也是可以直接拿到锁了。
*/
cnts = atomic_add_return(_QR_BIAS, &lock->cnts) - _QR_BIAS;
/* 由于上面解释说在间隙间是有可能被另外的writer拿到锁的所以需要等待
* 该writer lock 释放后在继续
*/
rspin_until_writer_unlock(lock, cnts);
/*
* Signal the next one in queue to become queue head
*/
arch_spin_unlock(&lock->lock);
}
EXPORT_SYMBOL(queue_read_lock_slowpath);
/**
* queue_write_lock_slowpath - acquire write lock of a queue rwlock
* @lock : Pointer to queue rwlock structure
*/
void queue_write_lock_slowpath(struct qrwlock *lock)
{
u32 cnts;
/* Put the writer into the wait queue */
arch_spin_lock(&lock->lock);
/* 到了这里 writer 现在是head */
/* Try to acquire the lock directly if no reader is present */
if (!atomic_read(&lock->cnts) &&
(atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0))
goto unlock;
/* 因为read_lock首先会查看lock->cnts的值,也就是如果该值没有被设置
* _QW_LOCKED 或则 _QW_WARITING, 即便是write 先拿到了锁,也可能另外的
* read 会首先获得锁,所以需要等待。 因为read_lock fast patch是直接
* 检查_QW_WMASK的值是否被设置。
* 也是有可能另外的writer_lock先获得锁,这种情况发生在
* READER-0 WRITER-0 WRITER-1
* LOCK
* try_get_lock
* UNLOCK
* LOCK
* check other
* writer get lock
*/
/*
* Set the waiting flag to notify readers that a writer is pending,
* or wait for a previous writer to go away.
*/
for (;;) {
cnts = atomic_read(&lock->cnts);
/* 下面等待所有其他的writer释放锁, 并且在设置了_QW_WAITING后
* 所有的在后面的read/writer都需要先lock->lock
*/
if (!(cnts & _QW_WMASK) &&
(atomic_cmpxchg(&lock->cnts, cnts,
cnts | _QW_WAITING) == cnts))
break;
cpu_relax_lowlatency();
}
/* When no more readers, set the locked flag */
for (;;) {
cnts = atomic_read(&lock->cnts);
/* 下面的保证所有持有reader锁已经释放 */
if ((cnts == _QW_WAITING) &&
(atomic_cmpxchg(&lock->cnts, _QW_WAITING,
_QW_LOCKED) == _QW_WAITING))
break;
cpu_relax_lowlatency();
}
unlock:
arch_spin_unlock(&lock->lock);
}
EXPORT_SYMBOL(queue_write_lock_slowpath);
* queue_write_lock_slowpath - acquire write lock of a queue rwlock
* @lock : Pointer to queue rwlock structure
*/
void queue_write_lock_slowpath(struct qrwlock *lock)
{
u32 cnts;
/* Put the writer into the wait queue */
arch_spin_lock(&lock->lock);
/* 到了这里 writer 现在是head */
/* Try to acquire the lock directly if no reader is present */
if (!atomic_read(&lock->cnts) &&
(atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0))
goto unlock;
/* 因为read_lock首先会查看lock->cnts的值,也就是如果该值没有被设置
* _QW_LOCKED 或则 _QW_WARITING, 即便是write 先拿到了锁,也可能另外的
* read 会首先获得锁,所以需要等待。 因为read_lock fast patch是直接
* 检查_QW_WMASK的值是否被设置。
* 也是有可能另外的writer_lock先获得锁,这种情况发生在
* READER-0 WRITER-0 WRITER-1
* LOCK
* try_get_lock
* UNLOCK
* LOCK
* check other
* writer get lock
*/
/*
* Set the waiting flag to notify readers that a writer is pending,
* or wait for a previous writer to go away.
*/
for (;;) {
cnts = atomic_read(&lock->cnts);
/* 下面等待所有其他的writer释放锁, 并且在设置了_QW_WAITING后
* 所有的在后面的read/writer都需要先lock->lock
*/
if (!(cnts & _QW_WMASK) &&
(atomic_cmpxchg(&lock->cnts, cnts,
cnts | _QW_WAITING) == cnts))
break;
cpu_relax_lowlatency();
}
/* When no more readers, set the locked flag */
for (;;) {
cnts = atomic_read(&lock->cnts);
/* 下面的保证所有持有reader锁已经释放 */
if ((cnts == _QW_WAITING) &&
(atomic_cmpxchg(&lock->cnts, _QW_WAITING,
_QW_LOCKED) == _QW_WAITING))
break;
cpu_relax_lowlatency();
}
unlock:
arch_spin_unlock(&lock->lock);
}
EXPORT_SYMBOL(queue_write_lock_slowpath);
4. Notice
rwlock 是中的spin_lock可以保证在有写锁的情况下,读锁的一个顺序性。已经更新cnts的一致性。读写锁在拿到spin_lock的时候如果没有更新cents,其实是可以被其他的读写锁先获得也就是steal的概念(上面代码解析里面有些标注)主要是因为lock fast patch是直接看cnts的指来确实是否lock成功。某个CPUA 写锁在获得 spin_lock后(由于有CPU B 拿了读锁),可能会发现其他的CPU C 已经获得了写锁,可能原因是CPUA check cnts是看到了CPU B拿了读锁,去spin_lock的时候,CPU B刚巧释放了读锁,CPU C 看到了没有人拿锁(cnts ==0) 先拿了锁,那么CPUB在 spin_lock返回后就会看到有CPU C拿到写锁的情况(同理CPUC拿读锁也是一样的情况),CPUB就需要等到CPUC 是否了写锁后在拿到写锁。原本的拿锁顺序是
CPUA读-CPUB写-CPUC写,现在变成了CPUA度-CPUC写-CPUB写。需要看下最新的代码有没有对这个case 处理,或者说这个case 不是问题。