月度归档:2020年11月

lock-qspinlock

qspinlock是为了解决ticketlock存在的多个cpu 同时等待同一个变量时cache 的改变对性能会产生很大的影响的问题,因为一个内存数据的改变会invalid 所有的等待的cpu上的cache。
下面分析机遇最初版本的qspinlock() a33fda35e3a7655fb7df756ed67822afb5ed5e8d

1. Structure

typedef struct qspinlock {
       atomic_t        val;
} arch_spinlock_t;
 /*
  * Bitfields in the atomic value:
  *
  * When NR_CPUS < 16K
  *  0- 7: locked byte
  *     8: pending
  *  9-15: not used
  * 16-17: tail index
  * 18-31: tail cpu (+1)
  *
  * When NR_CPUS >= 16K
  *  0- 7: locked byte
  *     8: pending
  *  9-10: tail index
  * 11-31: tail cpu (+1)
 */

struct mcs_spinlock {
       struct mcs_spinlock *next;
       int locked; /* 1 if lock acquired */
       int count;  /* nesting count, see qspinlock.c */
};

2. qspinlock的过程

(1). host 两个CPU1/2同时抢锁

  • CPU 1 set locked = 1 获得该锁
  • CPU 2 set pend = 1 然后等待锁的locked = 0
  • CPU1 set locked = 1 释放该锁
  • CPU2 atomic set locked = 1 && set pend = 0 获得该锁

(2). host 4个CPU同时抢夺锁

CPU1/2 获得锁的过程和上面第一图一致

  • CPU3 想要获得锁,发现 pend = 1,代表着已经有两个或者以上CPU在等待该锁,所以CPU3选择了queue,CPU3 首先将自己的mcs_node对应的id + cpu 写到到 cpu/idx(tail)中,然后等待locked =0 && pend = 0
  • CPU4想要获得锁,和CPU3一样会选择queue,CPU4 会得到当前cpu/idx 的指,通过atomic_xchg将自己对应的4/0写入,同时获得CPU3 对应的mcs_node,并将自己的mcs_node链接到CPU3对应的mcs_node->next,CPU4等待mcs_node->locked 变为1
  • CPU1 释放锁后,设置locked = 0
  • CPU2 设置 locked = 1 pend = 0 获得锁
  • CPU2 设置locked = 0 释放锁
  • CPU3 看到locked = 0 后,set locked = 1 获得该锁,并且通过自己的mcs_node 获得next 也就是CPU4对应的mcs_node,并且设置mcs_node->locked = 1,CPU4看到自己 mcs_node ->locked =1 后,改为等待spin_lock->locked = 0 && pend = 0
  • CPU3 set spin_lock->locked = 0,释放锁
  • CPU4 看到了spin_lock->locked == 0 && pend = 0,并且看到了spin_lock->cpu/idx 对应的值是自己,代表CPU4是最后一个获取锁的CPU,设置spin_lock->locked = 1 并且清楚其他bit

(3)guest 3 个 vcpu同时抢锁

针对虚拟化的场景,spinlock的处理不同点在与,当guest vcpu 等待lock的时候,是可以vmexit 到host中,host 调度可以根据需要运行其他的进程。同时当一个锁释放后,需要通过kick的动作将唤醒该vcpu对应的线程,并且vmentry 到guest中,继续guest 拿锁的动作

struct pv_node {
        struct mcs_spinlock     mcs;
        struct mcs_spinlock     __res[3];

        int                     cpu;
        u8                      state;
};

针对虚拟化场景,pv_node在原来mcs_lock上主要添加了两个变量,cpu代表是vcpu cpuid, state 代表的是当前vcpu运行的状态(halt/running)

  • vcpu1 set locked = 1拿锁
  • vcpu2 看到锁已经被其他vcpu获得后,(这里没有看到pv对应pending的处理,只要有vcpu等待都需要vmexit的,所以逻辑和host两个cpu同时抢锁有区别的,同时因为vmexit后的vcpu在unlock时需要被kick唤醒,所有只要有多于一个vcpu在等待该锁都需要附加的pv_node,这也是为什么没有用pending的原因(不知道最新版本的有没有优化))获得新的mcs并初始化cpu = 2 stat = run, 同时设置spin_lock->cpu/idx = 2/0,在等待spin_lock->locked 被设置前,会调用pv_wait_head通过该函数halt该vcpu,vmexit到host,使得host可以运行其他的认为,在该函数中,首先会修改pv_node->state = halt。然后修改spin_locked->locked = _Q_SLOW_VAL,并且wait spin_locked->locked != _Q_SLOW_VAL,等待的过程是调动halt()类似的函数,vcpu在调用halt后会vmexit到host中,host调度策略可以选取其他任务进行运行。
    这里几个点需要注意,vcpu2现在是第一个等待的vcpu,所以它对应的pv_node会和spin_lock值做一定的hash,这样在unlock的时候,通过spin_lock本身的地址就可以获得第一个等待pv_node。
  • vcpu3 想要获得该锁的时候前4个步骤都是一样的,到了第5步,vcpu3 检查pv_node->mcs.locked是否为0(为0 代表需要等待,为1代表需要等待),如果为0,vcpu3会调用pv_wait等待vcpu3的状态被修改为running。和vcpu2一样vcpu3也会通过halt vmexit到host上。
  • vcpu1 释放锁,修改spin_locked->locked = 0,并且通过spin_lock hash得到头部的pv_node,现在的pv_node为vcpu2的pv_node,拿到pv_node后获取到pv_node->cpu,
    通过pv_kick唤醒该vcpu。(pv_kick的原理后续写虚拟化会在讲,基本就是拿到vcpu对应的pcpu 然后发一个reschedule ipi)。
  • host 调度选择调度vcpu2 并且vcpu2通过vmentery 进入guest mode后,会检查spin_locked->locked 是否为0(我们看到unlock的时候是会先修改locked = 0 然后在kick, 但是呢vcpu有可能因为其他的一些中断被唤醒,例如timer 中断),为0,说明锁被释放了,修改spin_locked->locked = 1获得该锁,然后发现后面还有vcpu在等待(vcpu3),所以获得vcpu3 对应的pv_node,设置pv_node->mcs->locked = 1(vcpu3 当前是先看mcs->locked 是否为0才做等待state != halted的等待动作的),修改pv_node->state = running,然后pv_kick_vcpu3。这里设置状态并且kickvcpu3的原因是,让vcpu3向下一步,vcpu3当前在等待的头部了,所以需要等待spin_lock->locked的值了。
  • vcpu3 被调度并且运行后,会看到pv_node->state == running,所以继续运行,有看到pv_node->mcs.locked == 1,继续运行,到了pv_wait_head基本和vcpu2 第一次等待的动作一样的,修改pv_node->state = halt,修改spin_lock->loked = _Q_SLOW_VAL,然后等待spin_lock->locked = 0,等待过程和vcpu2 trylock一样,也是halt->vmexit。
  • vcpu2 释放lock后修改spin_lock ->locked = 0,并且通过spin_lock hash获得了头部的pv_node(上一步vcpu3 pv_wait_head 把自己的pv_node加入到hash中)。拿到了pv_node->vcpu 也就是vcpu3,然后pv_kick。
  • vcpu3 获得运行后,看到spin_lock->locked == 0, 通过设置locked =1 并且清除cpu/idx, 获得该锁。

3. Code

lock_fastpath
/**
 * queued_spin_lock - acquire a queued spinlock
 * @lock: Pointer to queued spinlock structure
 */

static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
        u32 val;

        val = atomic_cmpxchg(&amp;lock-&gt;val, 0, _Q_LOCKED_VAL);
        if (likely(val == 0)) /* 当前锁没有被lock, 直接lock该锁,并返回 */
                return;
        /* 当前锁被其他人lock了,需要等待到慢速路径*/
        queued_spin_lock_slowpath(lock, val);
}
lock_slowpath
/* 慢速路径 */
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
        struct mcs_spinlock *prev, *next, *node;
        u32 new, old, tail;
        int idx;

        BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

        if (pv_enabled())
                goto queue;

        if (virt_queued_spin_lock(lock))
                return;

        /*
         * wait for in-progress pending->locked hand-overs
         *
         * 0,1,0 -> 0,0,1
         */

        if (val == _Q_PENDING_VAL) {
        /*  出现这种情况是cpu1 拿到了锁,cpu2在等待锁,并且set 了pending,
         *  cpu1 release了锁,cpu2还没有拿到了锁,但是一定是cpu2先拿到锁
         */

                while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
                        cpu_relax();
        }

        /*
         * trylock || pending
         *
         * 0,0,0 -> 0,0,1 ; trylock
         * 0,0,1 -> 0,1,1 ; pending
         */

        for (;;) {
                /*
                 * If we observe any contention; queue.
                 */

                if (val & ~_Q_LOCKED_MASK)
                /* 这里类似图中CPU3 CPU4 try lock 的动作*/
                        goto queue;

                new = _Q_LOCKED_VAL;
                if (val == new)
                        /* 如果上次获得的val 代表的只有一个CPU拿着锁,
                         * 并且没有其他CPU等待锁,加上pending */

                        new |= _Q_PENDING_VAL;

                /* 这个通过判断old 和val的确定,在上一次该CPU获取val的时间点
                 * 到现在为止的一段时间内,有没有其他CPU已经在等待该锁或者
                 * 锁已经被释放了
                 */

                old = atomic_cmpxchg(&lock->val, val, new);
                if (old == val)
                        break;

                val = old;
        }

        /*
         * we won the trylock
         */

        if (new == _Q_LOCKED_VAL)
        /* 这里代表上面的逻辑中,val被改为了0, 也就是锁被释放了 */
                return;

        /*
         * we're pending, wait for the owner to go away.
         *
         * *,1,1 -> *,1,0
         *
         * this wait loop must be a load-acquire such that we match the
         * store-release that clears the locked bit and create lock
         * sequentiality; this is because not all clear_pending_set_locked()
         * implementations imply full barriers.
         */

        while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)
        /* 等到该锁的被释放 对应图中CPU2 try lock*/
                cpu_relax();

        /*
         * take ownership and clear the pending bit.
         *
         * *,1,0 -> *,0,1
         */

        /* 获取该锁,对应图中CPU2 get lock */
        clear_pending_set_locked(lock);
        return;

        /*
         * End of pending bit optimistic spinning and beginning of MCS
         * queuing.
         */

queue:
        /* 获得该CPU上的空闲的mcs_lock 并初始化 */
        node = this_cpu_ptr(&mcs_nodes[0]);
        idx = node->count++;
        tail = encode_tail(smp_processor_id(), idx);

        node += idx;
        node->locked = 0;
        node->next = NULL;
        /*pv: 初始化node->cpu/state */
        pv_init_node(node);

        /*
         * We touched a (possibly) cold cacheline in the per-cpu queue node;
         * attempt the trylock once more in the hope someone let go while we
         * weren't watching.
         */

        /* 优化 */
        if (queued_spin_trylock(lock))
                goto release;

        /*
         * We have already touched the queueing cacheline; don't bother with
         * pending stuff.
         *
         * p,*,* -> n,*,*
         */

        /* 将tail 更新到lock tail(cpu/idx) 并且返回原来的tail
         * 表示最后一个想要获得锁的CPU
         * 图中CPU3 try lock的时候会写入,CPU4在try lock的时候
         * 也会写入会覆盖CPU3写入的,来表示CPU4是最后一个想要
         * 获得锁的。
         */

        old = xchg_tail(lock, tail);

        /*
         * if there was a previous node; link it and wait until reaching the
         * head of the waitqueue.
         */

        /* 判断当前有没有多于2个CPU在等待锁,如果有把mcs链接到
         * last mcs->next。
         * 针对图中CPU3来说,old = 0, 说明只有一个等待锁
         * 针对图中CPU4来说,old = CPU3/IDX, 需要把CPU4对应的mcs链接到CPU3/idx mcs->next
         * 现在等待锁的是CPU2 CPU3。
         */

        if (old & _Q_TAIL_MASK) {
                prev = decode_tail(old);
                WRITE_ONCE(prev->next, node);

                /* pv:修改 node->stat = halted
                 * 如果node->locked = 0 等待 node->stat变为running
                 * 该函数会造成vcpu vmexit
                 */

                 pv_wait_node(node);
                /* 等待 node->locked 被设置, locked
                 * 会被前一个拿锁的CPU设置
                 * 对应图中CPU4 try lock, CPU3在拿到
                 * 锁后会设置 对应图中CPU3 get lock
                 */

                arch_mcs_spin_lock_contended(&node->locked);
        }

        /*
         * we're at the head of the waitqueue, wait for the owner & pending to
         * go away.
         *
         * *,x,y -> *,0,0
         *
         * this wait loop must use a load-acquire such that we match the
         * store-release that clears the locked bit and create lock
         * sequentiality; this is because the set_locked() function below
         * does not imply a full barrier.
         *
         */

        /* pv: 设置locked = _Q_SLOW_VAL, 如果该锁还是没有释放,等待一些循环后,halt
         * 然后vmexit到host, 该等待的node 会被hash, 在unlock的时候被获取到。
         */
     
         pv_wait_head(lock, node);
        /* 等待locked && pend 都被clear */
        while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK)
                cpu_relax();

        /*
         * claim the lock:
         *
         * n,0,0 -> 0,0,1 : lock, uncontended
         * *,0,0 -> *,0,1 : lock, contended
         *
         * If the queue head is the only one in the queue (lock value == tail),
         * clear the tail code and grab the lock. Otherwise, we only need
         * to grab the lock.
         */

        for (;;) {
                /* 该CPU对应的mcs不是最后一个等待的lock的CPU, 直接set lock
                 * 注意这里并没有修改lock->tail(cpu/idx)以及pending
                 * 图中CPU3 get lock
                 */

                if (val != tail) {
                        set_locked(lock);
                        break;
                }

                /* 该CPU是最后一个等待锁的CPU,直接修改
                 * lock->val = LOCKED_VAL,相当于clear 了tail/pending,
                 * 并且set了locked
                 * 对应图中 CPU4 get lock
                 */

                old = atomic_cmpxchg(&lock->val, val, _Q_LOCKED_VAL);
                if (old == val)
                        goto release;   /* No contention */

                val = old;
        }

        /*
         * contended path; wait for next, release.
         */

        /* 现在获得锁的CPU不是最后一个等待的CPU, 需要获得next, 并且设置next->locked
         * 图中CPU3 get lock的过程,拿到CPU4 mcs, 并且修改CPU4 mcs->locked = 1
         */

        while (!(next = READ_ONCE(node->next)))
                cpu_relax();

        arch_mcs_spin_unlock_contended(&next->locked);
        /*pv: kick 等待在pv_wait_node的next->cpu, 因为pv_wait_node等待的是
         *node->state 变为running,所以这个函数修改next->state = running,然后在
         * kick next->vcpu
         */

        pv_kick_node(next);

release:
        /*
         * release the node
         */

        this_cpu_dec(mcs_nodes[0].count);
}
unlock(native)
static inline void native_queued_spin_unlock(struct qspinlock *lock)
{
        smp_store_release((u8 *)lock, 0);
}
unlock(pv)
__visible void __pv_queued_spin_unlock(struct qspinlock *lock)
{
        struct __qspinlock *l = (void *)lock;
        struct pv_node *node;

        /*
         * We must not unlock if SLOW, because in that case we must first
         * unhash. Otherwise it would be possible to have multiple @lock
         * entries, which would be BAD.
         */

        if (likely(cmpxchg(&amp;l-&gt;locked, _Q_LOCKED_VAL, 0) == _Q_LOCKED_VAL))
                return;

        /*
         * Since the above failed to release, this must be the SLOW path.
         * Therefore start by looking up the blocked node and unhashing it.
         */

        /*通过lock获得头部等待的vcpu 对应的pv_node */
        node = pv_unhash(lock);

        /*
         * Now that we have a reference to the (likely) blocked pv_node,
         * release the lock.
         */

        /* set spinlock-&gt;lock = 0 */
        smp_store_release(&amp;l-&gt;locked, 0);

        /*
         * At this point the memory pointed at by lock can be freed/reused,
         * however we can still use the pv_node to kick the CPU.
         */

        /* halt唤醒vcpu */
        if (READ_ONCE(node-&gt;state) == vcpu_halted)
                pv_kick(node-&gt;cpu);
}