qspinlock是为了解决ticketlock存在的多个cpu 同时等待同一个变量时cache 的改变对性能会产生很大的影响的问题,因为一个内存数据的改变会invalid 所有的等待的cpu上的cache。
下面分析机遇最初版本的qspinlock() a33fda35e3a7655fb7df756ed67822afb5ed5e8d
1. Structure
typedef struct qspinlock {
atomic_t val;
} arch_spinlock_t;
/*
* Bitfields in the atomic value:
*
* When NR_CPUS < 16K
* 0- 7: locked byte
* 8: pending
* 9-15: not used
* 16-17: tail index
* 18-31: tail cpu (+1)
*
* When NR_CPUS >= 16K
* 0- 7: locked byte
* 8: pending
* 9-10: tail index
* 11-31: tail cpu (+1)
*/
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked; /* 1 if lock acquired */
int count; /* nesting count, see qspinlock.c */
};
atomic_t val;
} arch_spinlock_t;
/*
* Bitfields in the atomic value:
*
* When NR_CPUS < 16K
* 0- 7: locked byte
* 8: pending
* 9-15: not used
* 16-17: tail index
* 18-31: tail cpu (+1)
*
* When NR_CPUS >= 16K
* 0- 7: locked byte
* 8: pending
* 9-10: tail index
* 11-31: tail cpu (+1)
*/
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked; /* 1 if lock acquired */
int count; /* nesting count, see qspinlock.c */
};
2. qspinlock的过程
(1). host 两个CPU1/2同时抢锁
- CPU 1 set locked = 1 获得该锁
- CPU 2 set pend = 1 然后等待锁的locked = 0
- CPU1 set locked = 1 释放该锁
- CPU2 atomic set locked = 1 && set pend = 0 获得该锁
(2). host 4个CPU同时抢夺锁
CPU1/2 获得锁的过程和上面第一图一致
- CPU3 想要获得锁,发现 pend = 1,代表着已经有两个或者以上CPU在等待该锁,所以CPU3选择了queue,CPU3 首先将自己的mcs_node对应的id + cpu 写到到 cpu/idx(tail)中,然后等待locked =0 && pend = 0
- CPU4想要获得锁,和CPU3一样会选择queue,CPU4 会得到当前cpu/idx 的指,通过atomic_xchg将自己对应的4/0写入,同时获得CPU3 对应的mcs_node,并将自己的mcs_node链接到CPU3对应的mcs_node->next,CPU4等待mcs_node->locked 变为1
- CPU1 释放锁后,设置locked = 0
- CPU2 设置 locked = 1 pend = 0 获得锁
- CPU2 设置locked = 0 释放锁
- CPU3 看到locked = 0 后,set locked = 1 获得该锁,并且通过自己的mcs_node 获得next 也就是CPU4对应的mcs_node,并且设置mcs_node->locked = 1,CPU4看到自己 mcs_node ->locked =1 后,改为等待spin_lock->locked = 0 && pend = 0
- CPU3 set spin_lock->locked = 0,释放锁
- CPU4 看到了spin_lock->locked == 0 && pend = 0,并且看到了spin_lock->cpu/idx 对应的值是自己,代表CPU4是最后一个获取锁的CPU,设置spin_lock->locked = 1 并且清楚其他bit
(3)guest 3 个 vcpu同时抢锁
针对虚拟化的场景,spinlock的处理不同点在与,当guest vcpu 等待lock的时候,是可以vmexit 到host中,host 调度可以根据需要运行其他的进程。同时当一个锁释放后,需要通过kick的动作将唤醒该vcpu对应的线程,并且vmentry 到guest中,继续guest 拿锁的动作
struct pv_node {
struct mcs_spinlock mcs;
struct mcs_spinlock __res[3];
int cpu;
u8 state;
};
struct mcs_spinlock mcs;
struct mcs_spinlock __res[3];
int cpu;
u8 state;
};
针对虚拟化场景,pv_node在原来mcs_lock上主要添加了两个变量,cpu代表是vcpu cpuid, state 代表的是当前vcpu运行的状态(halt/running)
- vcpu1 set locked = 1拿锁
- vcpu2 看到锁已经被其他vcpu获得后,(这里没有看到pv对应pending的处理,只要有vcpu等待都需要vmexit的,所以逻辑和host两个cpu同时抢锁有区别的,同时因为vmexit后的vcpu在unlock时需要被kick唤醒,所有只要有多于一个vcpu在等待该锁都需要附加的pv_node,这也是为什么没有用pending的原因(不知道最新版本的有没有优化))获得新的mcs并初始化cpu = 2 stat = run, 同时设置spin_lock->cpu/idx = 2/0,在等待spin_lock->locked 被设置前,会调用pv_wait_head通过该函数halt该vcpu,vmexit到host,使得host可以运行其他的认为,在该函数中,首先会修改pv_node->state = halt。然后修改spin_locked->locked = _Q_SLOW_VAL,并且wait spin_locked->locked != _Q_SLOW_VAL,等待的过程是调动halt()类似的函数,vcpu在调用halt后会vmexit到host中,host调度策略可以选取其他任务进行运行。
这里几个点需要注意,vcpu2现在是第一个等待的vcpu,所以它对应的pv_node会和spin_lock值做一定的hash,这样在unlock的时候,通过spin_lock本身的地址就可以获得第一个等待pv_node。
- vcpu3 想要获得该锁的时候前4个步骤都是一样的,到了第5步,vcpu3 检查pv_node->mcs.locked是否为0(为0 代表需要等待,为1代表需要等待),如果为0,vcpu3会调用pv_wait等待vcpu3的状态被修改为running。和vcpu2一样vcpu3也会通过halt vmexit到host上。
- vcpu1 释放锁,修改spin_locked->locked = 0,并且通过spin_lock hash得到头部的pv_node,现在的pv_node为vcpu2的pv_node,拿到pv_node后获取到pv_node->cpu,
通过pv_kick唤醒该vcpu。(pv_kick的原理后续写虚拟化会在讲,基本就是拿到vcpu对应的pcpu 然后发一个reschedule ipi)。
- host 调度选择调度vcpu2 并且vcpu2通过vmentery 进入guest mode后,会检查spin_locked->locked 是否为0(我们看到unlock的时候是会先修改locked = 0 然后在kick, 但是呢vcpu有可能因为其他的一些中断被唤醒,例如timer 中断),为0,说明锁被释放了,修改spin_locked->locked = 1获得该锁,然后发现后面还有vcpu在等待(vcpu3),所以获得vcpu3 对应的pv_node,设置pv_node->mcs->locked = 1(vcpu3 当前是先看mcs->locked 是否为0才做等待state != halted的等待动作的),修改pv_node->state = running,然后pv_kick_vcpu3。这里设置状态并且kickvcpu3的原因是,让vcpu3向下一步,vcpu3当前在等待的头部了,所以需要等待spin_lock->locked的值了。
- vcpu3 被调度并且运行后,会看到pv_node->state == running,所以继续运行,有看到pv_node->mcs.locked == 1,继续运行,到了pv_wait_head基本和vcpu2 第一次等待的动作一样的,修改pv_node->state = halt,修改spin_lock->loked = _Q_SLOW_VAL,然后等待spin_lock->locked = 0,等待过程和vcpu2 trylock一样,也是halt->vmexit。
- vcpu2 释放lock后修改spin_lock ->locked = 0,并且通过spin_lock hash获得了头部的pv_node(上一步vcpu3 pv_wait_head 把自己的pv_node加入到hash中)。拿到了pv_node->vcpu 也就是vcpu3,然后pv_kick。
- vcpu3 获得运行后,看到spin_lock->locked == 0, 通过设置locked =1 并且清除cpu/idx, 获得该锁。
3. Code
lock_fastpath
/**
* queued_spin_lock - acquire a queued spinlock
* @lock: Pointer to queued spinlock structure
*/
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
u32 val;
val = atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL);
if (likely(val == 0)) /* 当前锁没有被lock, 直接lock该锁,并返回 */
return;
/* 当前锁被其他人lock了,需要等待到慢速路径*/
queued_spin_lock_slowpath(lock, val);
}
* queued_spin_lock - acquire a queued spinlock
* @lock: Pointer to queued spinlock structure
*/
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
u32 val;
val = atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL);
if (likely(val == 0)) /* 当前锁没有被lock, 直接lock该锁,并返回 */
return;
/* 当前锁被其他人lock了,需要等待到慢速路径*/
queued_spin_lock_slowpath(lock, val);
}
lock_slowpath
/* 慢速路径 */
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
struct mcs_spinlock *prev, *next, *node;
u32 new, old, tail;
int idx;
BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
if (pv_enabled())
goto queue;
if (virt_queued_spin_lock(lock))
return;
/*
* wait for in-progress pending->locked hand-overs
*
* 0,1,0 -> 0,0,1
*/
if (val == _Q_PENDING_VAL) {
/* 出现这种情况是cpu1 拿到了锁,cpu2在等待锁,并且set 了pending,
* cpu1 release了锁,cpu2还没有拿到了锁,但是一定是cpu2先拿到锁
*/
while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
cpu_relax();
}
/*
* trylock || pending
*
* 0,0,0 -> 0,0,1 ; trylock
* 0,0,1 -> 0,1,1 ; pending
*/
for (;;) {
/*
* If we observe any contention; queue.
*/
if (val & ~_Q_LOCKED_MASK)
/* 这里类似图中CPU3 CPU4 try lock 的动作*/
goto queue;
new = _Q_LOCKED_VAL;
if (val == new)
/* 如果上次获得的val 代表的只有一个CPU拿着锁,
* 并且没有其他CPU等待锁,加上pending */
new |= _Q_PENDING_VAL;
/* 这个通过判断old 和val的确定,在上一次该CPU获取val的时间点
* 到现在为止的一段时间内,有没有其他CPU已经在等待该锁或者
* 锁已经被释放了
*/
old = atomic_cmpxchg(&lock->val, val, new);
if (old == val)
break;
val = old;
}
/*
* we won the trylock
*/
if (new == _Q_LOCKED_VAL)
/* 这里代表上面的逻辑中,val被改为了0, 也就是锁被释放了 */
return;
/*
* we're pending, wait for the owner to go away.
*
* *,1,1 -> *,1,0
*
* this wait loop must be a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because not all clear_pending_set_locked()
* implementations imply full barriers.
*/
while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)
/* 等到该锁的被释放 对应图中CPU2 try lock*/
cpu_relax();
/*
* take ownership and clear the pending bit.
*
* *,1,0 -> *,0,1
*/
/* 获取该锁,对应图中CPU2 get lock */
clear_pending_set_locked(lock);
return;
/*
* End of pending bit optimistic spinning and beginning of MCS
* queuing.
*/
queue:
/* 获得该CPU上的空闲的mcs_lock 并初始化 */
node = this_cpu_ptr(&mcs_nodes[0]);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);
node += idx;
node->locked = 0;
node->next = NULL;
/*pv: 初始化node->cpu/state */
pv_init_node(node);
/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
* attempt the trylock once more in the hope someone let go while we
* weren't watching.
*/
/* 优化 */
if (queued_spin_trylock(lock))
goto release;
/*
* We have already touched the queueing cacheline; don't bother with
* pending stuff.
*
* p,*,* -> n,*,*
*/
/* 将tail 更新到lock tail(cpu/idx) 并且返回原来的tail
* 表示最后一个想要获得锁的CPU
* 图中CPU3 try lock的时候会写入,CPU4在try lock的时候
* 也会写入会覆盖CPU3写入的,来表示CPU4是最后一个想要
* 获得锁的。
*/
old = xchg_tail(lock, tail);
/*
* if there was a previous node; link it and wait until reaching the
* head of the waitqueue.
*/
/* 判断当前有没有多于2个CPU在等待锁,如果有把mcs链接到
* last mcs->next。
* 针对图中CPU3来说,old = 0, 说明只有一个等待锁
* 针对图中CPU4来说,old = CPU3/IDX, 需要把CPU4对应的mcs链接到CPU3/idx mcs->next
* 现在等待锁的是CPU2 CPU3。
*/
if (old & _Q_TAIL_MASK) {
prev = decode_tail(old);
WRITE_ONCE(prev->next, node);
/* pv:修改 node->stat = halted
* 如果node->locked = 0 等待 node->stat变为running
* 该函数会造成vcpu vmexit
*/
pv_wait_node(node);
/* 等待 node->locked 被设置, locked
* 会被前一个拿锁的CPU设置
* 对应图中CPU4 try lock, CPU3在拿到
* 锁后会设置 对应图中CPU3 get lock
*/
arch_mcs_spin_lock_contended(&node->locked);
}
/*
* we're at the head of the waitqueue, wait for the owner & pending to
* go away.
*
* *,x,y -> *,0,0
*
* this wait loop must use a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*
*/
/* pv: 设置locked = _Q_SLOW_VAL, 如果该锁还是没有释放,等待一些循环后,halt
* 然后vmexit到host, 该等待的node 会被hash, 在unlock的时候被获取到。
*/
pv_wait_head(lock, node);
/* 等待locked && pend 都被clear */
while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK)
cpu_relax();
/*
* claim the lock:
*
* n,0,0 -> 0,0,1 : lock, uncontended
* *,0,0 -> *,0,1 : lock, contended
*
* If the queue head is the only one in the queue (lock value == tail),
* clear the tail code and grab the lock. Otherwise, we only need
* to grab the lock.
*/
for (;;) {
/* 该CPU对应的mcs不是最后一个等待的lock的CPU, 直接set lock
* 注意这里并没有修改lock->tail(cpu/idx)以及pending
* 图中CPU3 get lock
*/
if (val != tail) {
set_locked(lock);
break;
}
/* 该CPU是最后一个等待锁的CPU,直接修改
* lock->val = LOCKED_VAL,相当于clear 了tail/pending,
* 并且set了locked
* 对应图中 CPU4 get lock
*/
old = atomic_cmpxchg(&lock->val, val, _Q_LOCKED_VAL);
if (old == val)
goto release; /* No contention */
val = old;
}
/*
* contended path; wait for next, release.
*/
/* 现在获得锁的CPU不是最后一个等待的CPU, 需要获得next, 并且设置next->locked
* 图中CPU3 get lock的过程,拿到CPU4 mcs, 并且修改CPU4 mcs->locked = 1
*/
while (!(next = READ_ONCE(node->next)))
cpu_relax();
arch_mcs_spin_unlock_contended(&next->locked);
/*pv: kick 等待在pv_wait_node的next->cpu, 因为pv_wait_node等待的是
*node->state 变为running,所以这个函数修改next->state = running,然后在
* kick next->vcpu
*/
pv_kick_node(next);
release:
/*
* release the node
*/
this_cpu_dec(mcs_nodes[0].count);
}
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
struct mcs_spinlock *prev, *next, *node;
u32 new, old, tail;
int idx;
BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
if (pv_enabled())
goto queue;
if (virt_queued_spin_lock(lock))
return;
/*
* wait for in-progress pending->locked hand-overs
*
* 0,1,0 -> 0,0,1
*/
if (val == _Q_PENDING_VAL) {
/* 出现这种情况是cpu1 拿到了锁,cpu2在等待锁,并且set 了pending,
* cpu1 release了锁,cpu2还没有拿到了锁,但是一定是cpu2先拿到锁
*/
while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
cpu_relax();
}
/*
* trylock || pending
*
* 0,0,0 -> 0,0,1 ; trylock
* 0,0,1 -> 0,1,1 ; pending
*/
for (;;) {
/*
* If we observe any contention; queue.
*/
if (val & ~_Q_LOCKED_MASK)
/* 这里类似图中CPU3 CPU4 try lock 的动作*/
goto queue;
new = _Q_LOCKED_VAL;
if (val == new)
/* 如果上次获得的val 代表的只有一个CPU拿着锁,
* 并且没有其他CPU等待锁,加上pending */
new |= _Q_PENDING_VAL;
/* 这个通过判断old 和val的确定,在上一次该CPU获取val的时间点
* 到现在为止的一段时间内,有没有其他CPU已经在等待该锁或者
* 锁已经被释放了
*/
old = atomic_cmpxchg(&lock->val, val, new);
if (old == val)
break;
val = old;
}
/*
* we won the trylock
*/
if (new == _Q_LOCKED_VAL)
/* 这里代表上面的逻辑中,val被改为了0, 也就是锁被释放了 */
return;
/*
* we're pending, wait for the owner to go away.
*
* *,1,1 -> *,1,0
*
* this wait loop must be a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because not all clear_pending_set_locked()
* implementations imply full barriers.
*/
while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)
/* 等到该锁的被释放 对应图中CPU2 try lock*/
cpu_relax();
/*
* take ownership and clear the pending bit.
*
* *,1,0 -> *,0,1
*/
/* 获取该锁,对应图中CPU2 get lock */
clear_pending_set_locked(lock);
return;
/*
* End of pending bit optimistic spinning and beginning of MCS
* queuing.
*/
queue:
/* 获得该CPU上的空闲的mcs_lock 并初始化 */
node = this_cpu_ptr(&mcs_nodes[0]);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);
node += idx;
node->locked = 0;
node->next = NULL;
/*pv: 初始化node->cpu/state */
pv_init_node(node);
/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
* attempt the trylock once more in the hope someone let go while we
* weren't watching.
*/
/* 优化 */
if (queued_spin_trylock(lock))
goto release;
/*
* We have already touched the queueing cacheline; don't bother with
* pending stuff.
*
* p,*,* -> n,*,*
*/
/* 将tail 更新到lock tail(cpu/idx) 并且返回原来的tail
* 表示最后一个想要获得锁的CPU
* 图中CPU3 try lock的时候会写入,CPU4在try lock的时候
* 也会写入会覆盖CPU3写入的,来表示CPU4是最后一个想要
* 获得锁的。
*/
old = xchg_tail(lock, tail);
/*
* if there was a previous node; link it and wait until reaching the
* head of the waitqueue.
*/
/* 判断当前有没有多于2个CPU在等待锁,如果有把mcs链接到
* last mcs->next。
* 针对图中CPU3来说,old = 0, 说明只有一个等待锁
* 针对图中CPU4来说,old = CPU3/IDX, 需要把CPU4对应的mcs链接到CPU3/idx mcs->next
* 现在等待锁的是CPU2 CPU3。
*/
if (old & _Q_TAIL_MASK) {
prev = decode_tail(old);
WRITE_ONCE(prev->next, node);
/* pv:修改 node->stat = halted
* 如果node->locked = 0 等待 node->stat变为running
* 该函数会造成vcpu vmexit
*/
pv_wait_node(node);
/* 等待 node->locked 被设置, locked
* 会被前一个拿锁的CPU设置
* 对应图中CPU4 try lock, CPU3在拿到
* 锁后会设置 对应图中CPU3 get lock
*/
arch_mcs_spin_lock_contended(&node->locked);
}
/*
* we're at the head of the waitqueue, wait for the owner & pending to
* go away.
*
* *,x,y -> *,0,0
*
* this wait loop must use a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*
*/
/* pv: 设置locked = _Q_SLOW_VAL, 如果该锁还是没有释放,等待一些循环后,halt
* 然后vmexit到host, 该等待的node 会被hash, 在unlock的时候被获取到。
*/
pv_wait_head(lock, node);
/* 等待locked && pend 都被clear */
while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK)
cpu_relax();
/*
* claim the lock:
*
* n,0,0 -> 0,0,1 : lock, uncontended
* *,0,0 -> *,0,1 : lock, contended
*
* If the queue head is the only one in the queue (lock value == tail),
* clear the tail code and grab the lock. Otherwise, we only need
* to grab the lock.
*/
for (;;) {
/* 该CPU对应的mcs不是最后一个等待的lock的CPU, 直接set lock
* 注意这里并没有修改lock->tail(cpu/idx)以及pending
* 图中CPU3 get lock
*/
if (val != tail) {
set_locked(lock);
break;
}
/* 该CPU是最后一个等待锁的CPU,直接修改
* lock->val = LOCKED_VAL,相当于clear 了tail/pending,
* 并且set了locked
* 对应图中 CPU4 get lock
*/
old = atomic_cmpxchg(&lock->val, val, _Q_LOCKED_VAL);
if (old == val)
goto release; /* No contention */
val = old;
}
/*
* contended path; wait for next, release.
*/
/* 现在获得锁的CPU不是最后一个等待的CPU, 需要获得next, 并且设置next->locked
* 图中CPU3 get lock的过程,拿到CPU4 mcs, 并且修改CPU4 mcs->locked = 1
*/
while (!(next = READ_ONCE(node->next)))
cpu_relax();
arch_mcs_spin_unlock_contended(&next->locked);
/*pv: kick 等待在pv_wait_node的next->cpu, 因为pv_wait_node等待的是
*node->state 变为running,所以这个函数修改next->state = running,然后在
* kick next->vcpu
*/
pv_kick_node(next);
release:
/*
* release the node
*/
this_cpu_dec(mcs_nodes[0].count);
}
unlock(native)
static inline void native_queued_spin_unlock(struct qspinlock *lock)
{
smp_store_release((u8 *)lock, 0);
}
{
smp_store_release((u8 *)lock, 0);
}
unlock(pv)
__visible void __pv_queued_spin_unlock(struct qspinlock *lock)
{
struct __qspinlock *l = (void *)lock;
struct pv_node *node;
/*
* We must not unlock if SLOW, because in that case we must first
* unhash. Otherwise it would be possible to have multiple @lock
* entries, which would be BAD.
*/
if (likely(cmpxchg(&l->locked, _Q_LOCKED_VAL, 0) == _Q_LOCKED_VAL))
return;
/*
* Since the above failed to release, this must be the SLOW path.
* Therefore start by looking up the blocked node and unhashing it.
*/
/*通过lock获得头部等待的vcpu 对应的pv_node */
node = pv_unhash(lock);
/*
* Now that we have a reference to the (likely) blocked pv_node,
* release the lock.
*/
/* set spinlock->lock = 0 */
smp_store_release(&l->locked, 0);
/*
* At this point the memory pointed at by lock can be freed/reused,
* however we can still use the pv_node to kick the CPU.
*/
/* halt唤醒vcpu */
if (READ_ONCE(node->state) == vcpu_halted)
pv_kick(node->cpu);
}
{
struct __qspinlock *l = (void *)lock;
struct pv_node *node;
/*
* We must not unlock if SLOW, because in that case we must first
* unhash. Otherwise it would be possible to have multiple @lock
* entries, which would be BAD.
*/
if (likely(cmpxchg(&l->locked, _Q_LOCKED_VAL, 0) == _Q_LOCKED_VAL))
return;
/*
* Since the above failed to release, this must be the SLOW path.
* Therefore start by looking up the blocked node and unhashing it.
*/
/*通过lock获得头部等待的vcpu 对应的pv_node */
node = pv_unhash(lock);
/*
* Now that we have a reference to the (likely) blocked pv_node,
* release the lock.
*/
/* set spinlock->lock = 0 */
smp_store_release(&l->locked, 0);
/*
* At this point the memory pointed at by lock can be freed/reused,
* however we can still use the pv_node to kick the CPU.
*/
/* halt唤醒vcpu */
if (READ_ONCE(node->state) == vcpu_halted)
pv_kick(node->cpu);
}