lock-rwlock

rwlock是基于qspinlock的,同一时间多个reader可以获得该lock,但是同一时间只有一个writer可以获得该lock。(下面分析还是基于qspinlock 的base commit a33fda35e3a7655fb7df756ed67822afb5ed5e8d)

1.structure

typedef struct qrwlock {
        atomic_t                cnts;
        arch_spinlock_t         lock;
} arch_rwlock_t;

cnts: 用来标记是有多个在拿reader锁或者writer锁。
lock: qspin_lock

2. 例子

(1) CPU1/2 reader lock
  • CPU1 通过cnts+= _QR_BIAS拿到reader 锁。
  • CPU2 通过cnts+= _QR_BIAS拿到reader 锁。
  • CPU1 通过cnts -= _QR_BIAS释放reader 锁。
  • CPU2 通过cnts -= _QR_BIAS释放reader 锁。
(2) CPU1 reader lock – CPU2 writer lock – CPU3 reader lock
  • CPU1 通过cnts+= _QR_BIAS拿到reader 锁。
  • CPU2 想要拿写锁,坚持cnts是否为0, 如果不为0,代表已经有读锁或者写锁被其他人拿到(这里有CPU1的读锁拿到了锁),然后CPU2 spin_lock(lock),这里没有其他CPU获得该spin_lock,CPU2 获得了锁,然后利用cnts |= _QW_WAITING通知后续的想要读或者写的CPU走show_patch也就是首先获得spin_lock的逻辑,然后CPU2等待cnts 变为_QW_WAITING(CPU1 已经写入了_QR_BIAS,这里是等CPU1清楚了_QR_BIAS)。
  • CPU3 想要获得读锁,但是发现cents & _QW_MASK != 0,所以需要走slow patch,首先尝试获得spin_lock但是该lock被CPU2拿着所以pending。
  • CPU1 cnts-=_QR_BIAS 释放reader 锁。
  • CPU2 看到了 cents = _QW_WATING,也就是前面的读锁已经释放了,set cnts = _QW_LOCKED 获得该写锁,CPU2 unlock spin_lock,CPU3 因为在等待该spin_lock,所以获得该spin_lock,CPU3 等待 cnts & _QW_WMASK == 0, 也就是所有的写锁被释放。
  • CPU2 通过 cnts -= _QW_LOCKED 是否写锁。
  • CPU3 看到了cnts & _QW_WMASK == 0,通过cnts += _QR_BIAS获得该读锁,并且释放spin_lock。

3. code

/**
 * queue_read_lock_slowpath - acquire read lock of a queue rwlock
 * @lock: Pointer to queue rwlock structure
 */

void queue_read_lock_slowpath(struct qrwlock *lock)
{
        u32 cnts;

        /*
         * Readers come here when they cannot get the lock without waiting
         */

        if (unlikely(in_interrupt())) {
                /*
                 * Readers in interrupt context will spin until the lock is
                 * available without waiting in the queue.
                 */

                cnts = smp_load_acquire((u32 *)&lock->cnts);
                rspin_until_writer_unlock(lock, cnts);
                return;
        }
        /* 这里减去了_QR_BIAS 因为该cpu并没有获得reader锁 */
        atomic_sub(_QR_BIAS, &lock->cnts);

        /*
         * Put the reader into the wait queue
         */

        arch_spin_lock(&lock->lock);

        /*
         * At the head of the wait queue now, wait until the writer state
         * goes to 0 and then try to increment the reader count and get
         * the lock. It is possible that an incoming writer may steal the
         * lock in the interim, so it is necessary to check the writer byte
         * to make sure that the write lock isn't taken.
         */

        while (atomic_read(&lock->cnts) & _QW_WMASK)
                cpu_relax_lowlatency();

        /* 这里可以确定的是全面又一个writer lock已经释放了。(因为这个writer
         * lock,reader lock需要走slowpath
         * 但是在下面这个函数之前,也就是 &lock->cnts + _QR_BIAS 之前,
         * lock->cnts是可能= 0的,这种情况下,在这个间隙之间,
         * 如果有其他的writer lock 来是可以直接拿到锁的.
         * 如果有其他的read 想拿到锁(在没有writer拿到锁的情况下)也是可以直接拿到锁了。
         */


        cnts = atomic_add_return(_QR_BIAS, &lock->cnts) - _QR_BIAS;

        /* 由于上面解释说在间隙间是有可能被另外的writer拿到锁的所以需要等待
         * 该writer lock 释放后在继续
         */

        rspin_until_writer_unlock(lock, cnts);

        /*
         * Signal the next one in queue to become queue head
         */

        arch_spin_unlock(&lock->lock);
}
EXPORT_SYMBOL(queue_read_lock_slowpath);
/**
 * queue_write_lock_slowpath - acquire write lock of a queue rwlock
 * @lock : Pointer to queue rwlock structure
 */

void queue_write_lock_slowpath(struct qrwlock *lock)
{
        u32 cnts;

        /* Put the writer into the wait queue */
        arch_spin_lock(&lock->lock);

        /* 到了这里 writer 现在是head */
        /* Try to acquire the lock directly if no reader is present */
        if (!atomic_read(&lock->cnts) &&
            (atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0))
                goto unlock;

        /* 因为read_lock首先会查看lock->cnts的值,也就是如果该值没有被设置
         * _QW_LOCKED 或则 _QW_WARITING, 即便是write 先拿到了锁,也可能另外的
         * read 会首先获得锁,所以需要等待。 因为read_lock fast patch是直接
         * 检查_QW_WMASK的值是否被设置。
         * 也是有可能另外的writer_lock先获得锁,这种情况发生在
         *      READER-0        WRITER-0        WRITER-1
         *      LOCK
         *                      try_get_lock
         *      UNLOCK
         *                                      LOCK
         *                      check other
         *                      writer get lock
         */

        /*
         * Set the waiting flag to notify readers that a writer is pending,
         * or wait for a previous writer to go away.
         */

        for (;;) {
                cnts = atomic_read(&lock->cnts);
                /* 下面等待所有其他的writer释放锁, 并且在设置了_QW_WAITING后
                 * 所有的在后面的read/writer都需要先lock->lock
                 */

                if (!(cnts & _QW_WMASK) &&
                    (atomic_cmpxchg(&lock->cnts, cnts,
                                    cnts | _QW_WAITING) == cnts))
                        break;

                cpu_relax_lowlatency();
        }

        /* When no more readers, set the locked flag */
        for (;;) {
                cnts = atomic_read(&lock->cnts);
                /* 下面的保证所有持有reader锁已经释放 */
                if ((cnts == _QW_WAITING) &&
                    (atomic_cmpxchg(&lock->cnts, _QW_WAITING,
                                    _QW_LOCKED) == _QW_WAITING))
                        break;

                cpu_relax_lowlatency();
        }
unlock:
        arch_spin_unlock(&lock->lock);
}
EXPORT_SYMBOL(queue_write_lock_slowpath);
4. Notice

rwlock 是中的spin_lock可以保证在有写锁的情况下,读锁的一个顺序性。已经更新cnts的一致性。读写锁在拿到spin_lock的时候如果没有更新cents,其实是可以被其他的读写锁先获得也就是steal的概念(上面代码解析里面有些标注)主要是因为lock fast patch是直接看cnts的指来确实是否lock成功。某个CPUA 写锁在获得 spin_lock后(由于有CPU B 拿了读锁),可能会发现其他的CPU C 已经获得了写锁,可能原因是CPUA check cnts是看到了CPU B拿了读锁,去spin_lock的时候,CPU B刚巧释放了读锁,CPU C 看到了没有人拿锁(cnts ==0) 先拿了锁,那么CPUB在 spin_lock返回后就会看到有CPU C拿到写锁的情况(同理CPUC拿读锁也是一样的情况),CPUB就需要等到CPUC 是否了写锁后在拿到写锁。原本的拿锁顺序是
CPUA读-CPUB写-CPUC写,现在变成了CPUA度-CPUC写-CPUB写。需要看下最新的代码有没有对这个case 处理,或者说这个case 不是问题。

发表评论