作者归档:joker

经济相关记录

From 见证失衡 余永定

  1. 基础货币来源
    贸易顺差->美元->央行->人民币:这部分人民币实际没有对应的实体在国内,属于超发。央行会通过央票对冲回收一定的流动性。
    境外投资投机:
  2. 其他对冲方法:将不在经济体内的物品纳入到经济体内,例如政府供地,土地相关的房产属于流动行差的资产可以吸收一部分的流动性,但是这里问题是:供地->房企买地后抵押加杠杆在买地->人民买房房贷->信贷繁荣->通货膨胀
  3. 债务国->年轻债权国的转换:
    人口老龄化->对外投资收益逐渐大于贸易顺差
  4. 美元贬值
    以美元记价的债务(美债)不变
    以本地货币记价的美国对外投资,收益遍高
  5. 投资收益
    外国投资->投资收益逆差
    对外投资->投资收益顺差
    投资收益顺差->债权国
    逆差->债务国
  6. 存储-投资 > 0 但是国外投资>0
    国内投资效率低?存储余额不能有效转化为投资。地方政府倾向于吸引国外投资,政治领导承担风险少(不需要贷款),收益大。

rcu

记录看代码的一些思路:
当时的疑问:
1. rcu call_back没有带有加入的时候对应的gp_num, 怎么才能确保是对的?
两个方面保证的:
1) rnp->completed 是在start_gp时候设置的,和rsp->completed相同的以及rnp->gp_num.
2) rcu_process_gp_end 是per cpu调用的, 会用到rdp per cpu的数据.
在每次调用rcu_process_gp_end的情况下, 如果当前的rdp->completed != rnp->completed
代表的是rdp从上次结束到当前截至加入的callback 对应的graceperiod已经过去了,可以调用
callback了,这里

RCU 对于callback的理解:

178         /*
179          * If nxtlist is not NULL, it is partitioned as follows.                                                                
180          * Any of the partitions might be empty, in which case the                                                              
181          * pointer to that partition will be equal to the pointer for                                                            
182          * the following partition.  When the list is empty, all of
183          * the nxttail elements point to the ->nxtlist pointer itself,
184          * which in that case is NULL.
185          *
186          * [nxtlist, *nxttail[RCU_DONE_TAIL]):                                                                                  
187          *      Entries that batch # <= ->completed
188          *      The grace period for these entries has completed, and
189          *      the other grace-period-completed entries may be moved
190          *      here temporarily in rcu_process_callbacks().                                                                    
191          * [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL]):                                                                  
192          *      Entries that batch # <= ->completed - 1: waiting for current GP                                                  
193          * [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL]):                                                            
194          *      Entries known to have arrived before current GP ended                                                            
195          * [*nxttail[RCU_NEXT_READY_TAIL], *nxttail[RCU_NEXT_TAIL]):
196          *      Entries that might have arrived after current GP ended                                                          
197          *      Note that the value of *nxttail[RCU_NEXT_TAIL] will                                                              
198          *      always be NULL, as this is the end of the list.                                                                  
199          */
  • [nxtlist, *nxttail[RCU_DONE_TAIL])]: 这些callback对应的gp已经结束了, 需要处理.
  • [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL])]: 这些callback对应的在等待当前的当前gp结束:
    (1)当发现当前gp结束后可以通过 nxttail[RCU_DONE_TAIL]) = nxttail[RCU_WAIT_TAIL]) 这个动作将对应的这些callback放入到已经接受callback里面.
    (2) 对应WAIT_TAIL的更新有两个地方, 一个是当前CPU开始了一个新的gp: 很显然 在这个动作之前发生加入的callback都可以认为是等待当前的这个新开始的gp完成, 也就是nxttail[RCU_WAIT_TAIL] = nxttail[RCU_NEXT_TAIL]. 第二个更新点是发生在: 当前的gp已经完成, 那么能够确认是在该gp end前加入的callback就可以明确的被加入到等待下一个gp完成队列中, 也就是nxttail[RCU_WAIT_TAIL] = nxttail[RCU_NEXT_READY_TAIL](这里相当于在等待下一个gp结束).
  • [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL])]:代表的是在该gp结束前确认被加入的callback:
    (1) RCU_NEXT_READY_TAIL 更新有两个地方: 第一个地方在该cpu report qs的时候, 因为当前的gp end依赖于当前cpu report qs, 所以可以认为当前已经加入的callback 可以被认为是该gp end之前加入的, 也就是 nxttail[RCU_NEXT_READY_TAIL] = nxttail[RCU_NEXT_TAIL]. 第二个地方是在该gp已经完成, 并且这个CPU看到了这个完成的动作, 那么可以认为所有新加入的RCU_NEXT_TAIL之前的callback可以认为是新的gp结束前加入的, nxttail[RCU_NEXT_READY_TAIL] =nxttail[RCU_NEXT_TAIL]. 这个我们无法假设这些加入的callback是等待新gp结束的, 这个原因在于, 如果其他的CPUB开始了新的gp, CPUA 需要一段时间才能知道这个新的gp, 在这段时间内加入的callback, 可能需要看到其他的CPU发生了一次调度, 但是无法确保的是任意时刻加入的callback都能保证在新gp结束后都能看到一个调度, 举个例子. CPUB开始了一个新的gp, CPUC 完成了一次调度并且 report 了一个qs, CPUA 可能同时加入了一个callback这个callback需要等待一个其他所有的CPUs都发生一个调度, 但是对于新gp来说的话, 它只需要等待除了CPUC之外的所有cpu发生了一次调度, 这和CPUA新加入的callback是需要的条件是不一样的. 所以如果只是看拿到了当前的gp 完成, 是无法假设中间加入的callback是等待新的gp完成的. 只能假设这些callback是在等待新gp结束前加入的.
  • [*nxttail[RCU_NEXT_READY_TAIL], *nxttail[RCU_NEXT_TAIL])]:代表的是这些callback可能是在当前gp结束后加入的.

start_new_gp
rsp->gp_num ++;
rnp->gp_num = rsp->gp_num;
rnp->gp_completed = rsp->gp_completed;
(here rsp->gp_completed + 1 = rsp->gp_num)

for CPUA
sched_..
rdp->passed_quiesc_completed = rdp->gp_num –;
notice_new_period:
rdp->gp_num = rnp->gp_num;

RCU的整体描述(假设只有3个CPU):
(1). CPU1 调用了synchronize_sched, 该函数会把对应的callback加入到CPU1对应的rdp->nxttail[RCU_NEXT_AVAIL] 中, 并CPU1运行的进程会wait_completitionCPU1对应的进行睡眠. CPU1现在需要等待的是CPU2/3 发生一次调度. 假设当前没有启动任何的gp, 那么会调用rcu_start_gp开始新的gp, rsp->gpnum++, 并且更新CPU1/2/3对应的rnp的内容(如下面函数), 这里可以看到rsp->gpnum会被同步到rnp->gpnum, rsp->completed 会被同步到rnp->completed, 这样需要注意更新时是拿着rnp->lock的.

 766         rcu_for_each_node_breadth_first(rsp, rnp) {                                                                            
 767                 raw_spin_lock(&amp;rnp-&gt;lock);      /* irqs already disabled. */                                                    
 768                 rcu_preempt_check_blocked_tasks(rnp);                                                                          
 769                 rnp-&gt;qsmask = rnp-&gt;qsmaskinit;                                                                                  
 770                 rnp-&gt;gpnum = rsp-&gt;gpnum;                                                                                        
 771                 rnp-&gt;completed = rsp-&gt;completed;                                                                                
 772                 if (rnp == rdp-&gt;mynode)                                                                                        
 773                         rcu_start_gp_per_cpu(rsp, rnp, rdp);                                                                    
 774                 raw_spin_unlock(&amp;rnp-&gt;lock);    /* irqs remain disabled. */                                                    
 775         }

(2). CPU2 这个情况下发生了一次调度, 在发生schedule中会调用rcu_sched_qs. 并且

 102 void rcu_sched_qs(int cpu)
 103 {              
 104         struct rcu_data *rdp;
 105                        
 106         rdp = &amp;per_cpu(rcu_sched_data, cpu);
 107         rdp-&gt;passed_quiesc_completed = rdp-&gt;gpnum - 1;
 108         barrier();
 109         rdp-&gt;passed_quiesc = 1;
 110         rcu_preempt_note_context_switch(cpu);
 111 }

然后在每次update_process_times (hrtime的调用函数) 中会调用rcu_check_callbacks, 在上面的函数中会调用raise_softirq(RCU_SOFTIRQ); 最终会调用RCU_SOFTIRQ对应的处理函数rcu_process_callbacks.

1308 __rcu_process_callbacks(struct rcu_state *rsp, struct rcu_data *rdp)
1309 {
1310         unsigned long flags;
1311
1312         WARN_ON_ONCE(rdp-&gt;beenonline == 0);
1313
1314         /*
1315          * If an RCU GP has gone long enough, go check for dyntick
1316          * idle CPUs and, if needed, send resched IPIs.
1317          */

1318         if (ULONG_CMP_LT(ACCESS_ONCE(rsp-&gt;jiffies_force_qs), jiffies))
1319                 force_quiescent_state(rsp, 1);
1320        
1321         /*
1322          * Advance callbacks in response to end of earlier grace
1323          * period that some other CPU ended.
1324          */

1325         rcu_process_gp_end(rsp, rdp);
1326
1327         /* Update RCU state based on any recent quiescent states. */
1328         rcu_check_quiescent_state(rsp, rdp);
1329
1330         /* Does this CPU require a not-yet-started grace period? */
1331         if (cpu_needs_another_gp(rsp, rdp)) {
1332                 raw_spin_lock_irqsave(&amp;rcu_get_root(rsp)-&gt;lock, flags);
1333                 rcu_start_gp(rsp, flags);  /* releases above lock */
1334         }
1335
1336         /* If there are callbacks ready, invoke them. */
1337         rcu_do_batch(rsp, rdp);
1338 }

这里简单假设CPU2上没有callback, 那么最重要的是rcu_check_quiescent_state, 这个函数会依次向上面report rdp->passed_quiesc_completed. 以为当前有3个CPU. 那么实际rsp下面最后一层的rnp 是有3个rnp. 当前假设只有CPU1/CPU2完成了一次调度.也就是当前3个CPU都完成了一次调度, 有当所有的3个rnp都report 给rsp自己已经完成了一个调度后, 并且对应的passed_quiesc_completed都等于rnp->completed,( 这个很重要, 如果在一个CPUA report之间, 发生了其他CPUB开始了新的gp, 简单情况下每个rnp->completed都会被更新到新值, 如果report期间看到了rnp->completed 和rdp->passed_quises_completed不一样的话, 说明确实出现了上面的情况, 那么不需要向上面report)(前面提到了start_gp的时候会把rnp->completed = rsp->completed. 而我们这里可以认为rnp->completed 代表的是下一次需要completed的值).

791 static void rcu_report_qs_rsp(struct rcu_state *rsp, unsigned long flags)
 792         __releases(rcu_get_root(rsp)-&gt;lock)
 793 {
 794         WARN_ON_ONCE(!rcu_gp_in_progress(rsp));
 795         rsp-&gt;completed = rsp-&gt;gpnum;
 796         rsp-&gt;signaled = RCU_GP_IDLE;
 797         rcu_start_gp(rsp, flags);  /* releases root node's rnp-&gt;lock. */
 798 }

(3) CPU3 完成了调度, 所有的CPU都report了自己完成了一次调度, 那么在rcu_report_qs_rsp会完成rsp->completed = rsp->gpnum. 这里应该注意出现rsp->completed 只在这里更新. 然后会调用rcu_start_gp 看是否需要在次启动一个另外一个gp.

lock-mutex

mutex 和其他lock最大的区别是,拿着mutex是可以睡眠的,并且如果在尝试获得锁的情况下也是可以睡眠的。(分析基于base commit a33fda35e3a7655fb7df756ed67822afb5ed5e8d)。

1. structure
struct mutex {
          /* 1: unlocked, 0: locked, negative: locked, possible waiters */
          atomic_t                count;
          spinlock_t              wait_lock;
          struct list_head        wait_list;
  #if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
          struct task_struct      *owner;
  #endif
  #ifdef CONFIG_MUTEX_SPIN_ON_OWNER
          struct optimistic_spin_queue osq; /* Spinner MCS lock */
  #endif
  #ifdef CONFIG_DEBUG_MUTEXES
         void                    *magic;
  #endif
  #ifdef CONFIG_DEBUG_LOCK_ALLOC
         struct lockdep_map      dep_map;
  #endif
 };
2. 5个task 同时抢mutex的过程
  • CPU1 (task1) 想要获得该mutex,设置count = 0,设置owner = task1
  • CPU2 (task2) 尝试获得该mutex,首先count –,发现count < 0,走到slow path,首先获取osq lock(这里假设当前task1->on_cpu是设置的,也就是task1可能会很快会释放锁)(这是一种类似qlock的mcs lock,主要区别在于osq lock,在spin 期间是如果发现自己可以发生调度的话,是可以dequeue自己,然后发生调度的),并且设置osq->tail = CPU2,然后CPU2 会等待count 变为 1
  • CPU3 想要获取mutex,同样会对count — ,发现count < 0,走slowpath,首先会获取osq锁,该锁已经被CPU2(task2)获得,task3 获取CPU3上percpu的node,将自己写入osq->tail,然后将node链接到CPU2(task2)对应的node上,然后等待自己的node->lock 变为1或者need_schedule被设置。
  • CPU4想要获取mutex,对count — ,发现count < 0,走slowpath,这里假设task1->on_cpu没有被设置,那么CPU4(task4)不在尝试获得osq 锁,而是将自己加入到mutex->wait_list上,设置为TASK_UNINTERRUPTIBLE,发生调度。
  • CPU5(task5)想要获得该锁,count –,走slowpath,假设这个时候task1->on_cpu = 1被设置了也即是task1在CPU1上运行了,CPU5会尝试获取osq锁,该锁已经被CPU2(task2)获得,task5 获取CPU5上percpu的node,将自己写入osq->tail,然后将node链接到CPU3(task3)(prev_tail)对应的node上,然后等待自己的node3->lock 变为1或者need_schedule被设置。
  • CPU3 need_schedule被设置了,CPU3会将自己的在osq的node list上面去掉,并且重新连接CPU2和CPU5(这个过程比较复杂,因为设计到node3->prev/next 必须是一个确定的值,因为CPU2/CPU5 本省也是可能会把自己在nodelist中剥离出去,后面的code有详细分析一个corner case,大体的实现通过atomic_xchg/cmpxchg的动作获得prev/next的同时设置为NULL),然后task3 会把自己加入到wait_list里面。
  • CPU1(task1) unlock会将count 设置为0,并且会选择wait_list头部的任务进行wakeup(wait_lock保证wait_list的互斥)。这里大概率会是task2先获取到锁,但是也有可能task4获取到锁。
  • CPU2看到了count = 1 后设置count = 0,设置owner = task2,unlock了osq,看到了CPU5(task5)在等待该osq lock,将该锁osq lock->locked设置为1,CPU5(task5)拿到了osq lock,等待count == 1。
  • CPU2 释放锁,清owner,设置 count = 1,选择wait_list的头部任务task4 然后 wakeup。
  • CPU5拿到了mutex,首先是设置count = 0, 然后设置owner =task5,最后unlock osq锁。
  • CPU5(task5)释放锁,首先clear owner,然后设置 count =1, 然后选择wait_list中的任务wakeup。
  • CPU4 被wakeup 后,拿wait_lock,修改任务状态为TASK_RUNNING,设置count = -1 (因为wait_list里面还有task3),然后设置owner = task4,将task4从wait_list去掉,最后释放wait_lock。
  • CPU4(task4)释放锁,首先clear owner,然后set count =1,然后选择wait_list中的任务task3 wakeup。
  • CPU3 被wakeup 后,拿wait_lock,设置count = -1,然后设置owner = task4,将task3从wait_list去掉,因为wait_list中已经没有等待的任务了,设置count = 0,最后释放wait_lock。
3. code
#define __mutex_fastpath_lock(v, fail_fn)                       \
do {                                                            \
        unsigned long dummy;                                    \
                                                                \
        typecheck(atomic_t *, v);                               \
        typecheck_fn(void (*)(atomic_t *), fail_fn);            \
                                                                \
        /* lock->count -- */

        /* if lock->count < 0, call fail_fn */
        asm volatile(LOCK_PREFIX "   decl (%%rdi)\n"            \
                     "   jns 1f         \n"                     \
                     "   call " #fail_fn "\n"                   \
                     "1:"                                       \
                     : "=D" (dummy)                             \
                     : "D" (v)                                  \
                     : "rax", "rsi", "rdx", "rcx",              \
                       "r8", "r9", "r10", "r11", "memory");     \
} while (0)


__visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}
*
 * Lock a mutex (possibly interruptible), slowpath:
 */
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
                    struct lockdep_map *nest_lock, unsigned long ip,
                    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;
        struct mutex_waiter waiter;
        unsigned long flags;
        int ret;

        preempt_disable();
        mutex_acquire_nest(&amp;lock-&gt;dep_map, subclass, 0, nest_lock, ip);

        /* 1. 通过mcs_lock带来的短暂的spin动作获得该mutex */
        if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
                /* got the lock, yay! */
                preempt_enable();
                return 0;
        }

        /* 2. wait_lock 是为了让所有cpu 线性的去访问锁并且保护wait_list */
        spin_lock_mutex(&amp;lock-&gt;wait_lock, flags);

        /*
         * Once more, try to acquire the lock. Only try-lock the mutex if
         * it is unlocked to reduce unnecessary xchg() operations.
         */

        if (!mutex_is_locked(lock) &amp;&amp; (atomic_xchg(&amp;lock-&gt;count, 0) == 1))
                goto skip_wait;

        debug_mutex_lock_common(lock, &amp;waiter);
        debug_mutex_add_waiter(lock, &amp;waiter, task_thread_info(task));

        /* add waiting tasks to the end of the waitqueue (FIFO): */
        /* 加入到waiter list中 */
        list_add_tail(&amp;waiter.list, &amp;lock-&gt;wait_list);
        waiter.task = task;

        lock_contended(&amp;lock-&gt;dep_map, ip);

        for (;;) {
                /*
                 * Lets try to take the lock again - this is needed even if
                 * we get here for the first time (shortly after failing to
                 * acquire the lock), to make sure that we get a wakeup once
                 * it's unlocked. Later on, if we sleep, this is the
                 * operation that gives us the lock. We xchg it to -1, so
                 * that when we release the lock, we properly wake up the
                 * other waiters. We only attempt the xchg if the count is
                 * non-negative in order to avoid unnecessary xchg operations:
                 */

                if (atomic_read(&amp;lock-&gt;count) &gt;= 0 &amp;&amp;
                    (atomic_xchg(&amp;lock-&gt;count, -1) == 1))
                        break;

                /*
                 * got a signal? (This code gets eliminated in the
                 * TASK_UNINTERRUPTIBLE case.)
                 */

                if (unlikely(signal_pending_state(state, task))) {
                        ret = -EINTR;
                        goto err;
                }

                if (use_ww_ctx &amp;&amp; ww_ctx-&gt;acquired &gt; 0) {
                        ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
                        if (ret)
                                goto err;
                }

                __set_task_state(task, state);

                /* didn't get the lock, go to sleep: */
                spin_unlock_mutex(&amp;lock-&gt;wait_lock, flags);
                /* 主动调度 */
                schedule_preempt_disabled();
                spin_lock_mutex(&amp;lock-&gt;wait_lock, flags);
        }
        /* 到这里说明获得了该锁 */
        __set_task_state(task, TASK_RUNNING);

        mutex_remove_waiter(lock, &amp;waiter, current_thread_info());
        /* set it to 0 if there are no waiters left: */
        if (likely(list_empty(&amp;lock-&gt;wait_list)))
                /* 这里因为前面的会把lock-&gt;count 直接写入为 -1, 所以如果没有其他在wait_list的人直接设置为 0 */
                atomic_set(&amp;lock-&gt;count, 0);
        debug_mutex_free_waiter(&amp;waiter);

skip_wait:
        /* got the lock - cleanup and rejoice! */
        lock_acquired(&amp;lock-&gt;dep_map, ip);
        mutex_set_owner(lock);

        if (use_ww_ctx) {
                struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
                ww_mutex_set_context_slowpath(ww, ww_ctx);
        }

        spin_unlock_mutex(&amp;lock-&gt;wait_lock, flags);
        preempt_enable();
        return 0;

err:
        mutex_remove_waiter(lock, &amp;waiter, task_thread_info(task));
        spin_unlock_mutex(&amp;lock-&gt;wait_lock, flags);
        debug_mutex_free_waiter(&amp;waiter);
        mutex_release(&amp;lock-&gt;dep_map, 1, ip);
        preempt_enable();
        return ret;
}

mutex_optimistic_spin主要是为了更快速的拿mutex,而不是先wait然後睡眠.

/*
 * Optimistic spinning.
 *
 * We try to spin for acquisition when we find that the lock owner
 * is currently running on a (different) CPU and while we don't
 * need to reschedule. The rationale is that if the lock owner is
 * running, it is likely to release the lock soon.
 *
 * Since this needs the lock owner, and this mutex implementation
 * doesn't track the owner atomically in the lock field, we need to
 * track it non-atomically.
 *
 * We can't do this for DEBUG_MUTEXES because that relies on wait_lock
 * to serialize everything.
 *
 * The mutex spinners are queued up using MCS lock so that only one
 * spinner can compete for the mutex. However, if mutex spinning isn't
 * going to happen, there is no point in going through the lock/unlock
 * overhead.
 *
 * Returns true when the lock was taken, otherwise false, indicating
 * that we need to jump to the slowpath and sleep.
 */

static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;

        /* 检查获得锁的task是否on_cpu也就是在运行中, on_cpu会在context_switch时候设置 */
        if (!mutex_can_spin_on_owner(lock))
                goto done;

        /*
         * In order to avoid a stampede of mutex spinners trying to
         * acquire the mutex all at once, the spinners need to take a
         * MCS (queued) lock first before spinning on the owner field.
         */

        if (!osq_lock(&lock->osq))
                goto done;

        while (true) {
                struct task_struct *owner;

                if (use_ww_ctx && ww_ctx->acquired > 0) {
                        struct ww_mutex *ww;
                        ww = container_of(lock, struct ww_mutex, base);
                        /*
                         * If ww->ctx is set the contents are undefined, only
                         * by acquiring wait_lock there is a guarantee that
                         * they are not invalid when reading.
                         *
                         * As such, when deadlock detection needs to be
                         * performed the optimistic spinning cannot be done.
                         */

                        if (READ_ONCE(ww->ctx))
                                break;
                }

                /*
                 * If there's an owner, wait for it to either
                 * release the lock or go to sleep.
                 */

                owner = READ_ONCE(lock->owner);
                if (owner && !mutex_spin_on_owner(lock, owner))
                        break;

                /* Try to acquire the mutex if it is unlocked. */
                if (mutex_try_to_acquire(lock)) {
                        lock_acquired(&lock->dep_map, ip);

                        if (use_ww_ctx) {
                                struct ww_mutex *ww;
                                ww = container_of(lock, struct ww_mutex, base);

                                ww_mutex_set_context_fastpath(ww, ww_ctx);
                        }

                        mutex_set_owner(lock);
                        osq_unlock(&lock->osq);
                        return true;
                }

                /*
                 * When there's no owner, we might have preempted between the
                 * owner acquiring the lock and setting the owner field. If
                 * we're an RT task that will live-lock because we won't let
                 * the owner complete.
                 */

                if (!owner && (need_resched() || rt_task(task)))
                        break;

                /*
                 * The cpu_relax() call is a compiler barrier which forces
                 * everything in this loop to be re-loaded. We don't need
                 * memory barriers as we'll eventually observe the right
                 * values at the cost of a few extra spins.
                 */

                cpu_relax_lowlatency();
        }

        osq_unlock(&lock->osq);
done:
        /*
         * If we fell out of the spin path because of need_resched(),
         * reschedule now, before we try-lock the mutex. This avoids getting
         * scheduled out right after we obtained the mutex.
         */

        if (need_resched()) {
                /*
                 * We _should_ have TASK_RUNNING here, but just in case
                 * we do not, make it so, otherwise we might get stuck.
                 */

                __set_current_state(TASK_RUNNING);
                schedule_preempt_disabled();
        }

        return false;
}

osq_lock/unlock逻辑如下,尤其代码中的osq_wait_next stablize next/prev的动作,代码中有个例子比较有趣.

bool osq_lock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node = this_cpu_ptr(&amp;osq_node);
        struct optimistic_spin_node *prev, *next;
        int curr = encode_cpu(smp_processor_id());
        int old;

        node-&gt;locked = 0;
        node-&gt;next = NULL;
        node-&gt;cpu = curr;

        /* 设置 curr 到tail 代表的是当前最后等待lock的cpu */
        old = atomic_xchg(&amp;lock-&gt;tail, curr);
        if (old == OSQ_UNLOCKED_VAL)
                return true;

        prev = decode_cpu(old);
        node-&gt;prev = prev;
        WRITE_ONCE(prev-&gt;next, node);

        /*
         * Normally @prev is untouchable after the above store; because at that
         * moment unlock can proceed and wipe the node element from stack.
         *
         * However, since our nodes are static per-cpu storage, we're
         * guaranteed their existence -- this allows us to apply
         * cmpxchg in an attempt to undo our queueing.
         */


        /* 等待自己的locked 被设置 */
        while (!READ_ONCE(node-&gt;locked)) {
                /*
                 * If we need to reschedule bail... so we can block.
                 */

                /* 这里是和qspinlock的区别,这里可能会调度出去 */
                if (need_resched())
                        goto unqueue;

                cpu_relax_lowlatency();
        }
        return true;

unqueue:
        /*
         * Step - A  -- stabilize @prev
         *
         * Undo our @prev-&gt;next assignment; this will make @prev's
         * unlock()/unqueue() wait for a next pointer since @lock points to us
         * (or later).
         */


        /*
         * 这里是确定prev 到底的值,确定的方法是cmpxchg通过比较prev-&gt;next和
         * node的值,如果prev-&gt;next==node, 设置prev-&gt;next = NULL, 以为上面的两个
         * 动作是atomic的,prev-&gt;next = NULL后,代表的是prev-&gt;next是unstabled,
         * 也就是说如果这种情况下 prev 有可能因为need_schedule的原因也要发生unqueue
         * 动作,那么prev 在后面的stable next的时候会发现prev-&gt;next ==NULL, 所以
         * 会一直等待.
         * 例子假设当前的mcs_lock的链是:
         * Begin:
         *      CPU1 ----&gt;      CPU2 ----&gt;      CPU3 ----&gt;              CPU4                            CPU5
         *      hold_mutex      ......          ......                  ......                          ......
         *                      osq_get_lock    ......                  ......                          ......
         *                                      osq_try_lock            ......                          ......
         *                                      ......                  ......                          ......
         *                                      ......                  osq_try_lock                    ......
         *                                      wait(node3-&gt;locked)     ......                          ......
         *                                      ......                  ......                          osq_try_lock
         *                                      ......                  wait(node4-&gt;locked)             ......  
         *                                      ......                  ......                          ......
         *                                      ......                  ......                          wait(node5-&gt;locked)
         *                                      need_sched()            ......                          ......
         *                                      unqueue
         *                                      ......                  ......                          ......
         *                                      ......                  need_sched()                    ......
         *                                      ......                  unqueue                         ......
         *                                      ......                  ......                          ......
         *                                      ......                  ......                          ......
         *                                      get_stable_prev(CPU2)   ......                          ......
         *                                      prev = CPU2
         *                                      CPU2-&gt;next = NULL                      
         *                                      ......                  ......                          ......
         *                                      ......                  get_statble_prev(CPU3)          ......
         *                                                              prev = CPU3
         *                                                              CPU3-&gt;next = NULL
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      try_get_stable_next     ......
         *                                      while(CPU3-&gt;next==NULL)
         *                                              loop
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  get_stable_next
         *                                                              next = CPU4-&gt;next(CPU5)
         *                                                              CPU4-&gt;next = NULL
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  ......
         *                                                              prev(CPU3)-&gt;next = next(CPU5)
         *                                                              prev(CPU5)-&gt;prev = prev(CPU3)
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      loop end                ......
         *                                      CPU3-&gt;next= CPU5
         *                                      next is stable
         *                                      ......                  ......
         *                                      get_stable_next         ......
         *                                      next = CPU5
         *                                      CPU3-&gt;next = NULL
         *                                      ......                  ......
         *
         *                                      prev(CPU2)-&gt;next = next(CPU5)
         *                                      prev(CPU5)-&gt;prev = prev(CPU2)
         *
         * End:
         *      CPU1 ----&gt;      CPU2 ----&gt;      CPU5
         */


        for (;;) {
                if (prev-&gt;next == node &amp;&amp;
                   cmpxchg(&amp;prev-&gt;next, node, NULL) == node)
                        break;

                /*
                 * We can only fail the cmpxchg() racing against an unlock(),
                 * in which case we should observe @node-&gt;locked becomming
                 * true.
                 */

                if (smp_load_acquire(&amp;node-&gt;locked))
                        return true;

                cpu_relax_lowlatency();

                /*
                 * Or we race against a concurrent unqueue()'s step-B, in which
                 * case its step-C will write us a new @node-&gt;prev pointer.
                 */

                prev = READ_ONCE(node-&gt;prev);
        }

        /*
         * Step - B -- stabilize @next
         *
         * Similar to unlock(), wait for @node-&gt;next or move @lock from @node
         * back to @prev.
         */


        next = osq_wait_next(lock, node, prev);
        if (!next)
                return false;

        /*
         * Step - C -- unlink
         *
         * @prev is stable because its still waiting for a new @prev-&gt;next
         * pointer, @next is stable because our @node-&gt;next pointer is NULL and
         * it will wait in Step-A.
         */


        WRITE_ONCE(next-&gt;prev, prev);
        WRITE_ONCE(prev-&gt;next, next);

        return false;
}


void osq_unlock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node, *next;
        int curr = encode_cpu(smp_processor_id());

        /*
         * Fast path for the uncontended case.
         */

        if (likely(atomic_cmpxchg(&amp;lock-&gt;tail, curr, OSQ_UNLOCKED_VAL) == curr))
                return;

        /*
         * Second most likely case.
         */

        node = this_cpu_ptr(&amp;osq_node);
        next = xchg(&amp;node-&gt;next, NULL);
        if (next) {
                WRITE_ONCE(next-&gt;locked, 1);
                return;
        }

        /* 等待next 变成stable */
        next = osq_wait_next(lock, node, NULL);
        if (next)
                /* 通知下一个等待的为 1 */
                WRITE_ONCE(next-&gt;locked, 1);
}

/*
 * Get a stable @node-&gt;next pointer, either for unlock() or unqueue() purposes.
 * Can return NULL in case we were the last queued and we updated @lock instead.
 */

static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
              struct optimistic_spin_node *node,
              struct optimistic_spin_node *prev)
{
        struct optimistic_spin_node *next = NULL;
        int curr = encode_cpu(smp_processor_id());
        int old;

        /*
         * If there is a prev node in queue, then the 'old' value will be
         * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
         * we're currently last in queue, then the queue will then become empty.
         */

        old = prev ? prev-&gt;cpu : OSQ_UNLOCKED_VAL;

        for (;;) {
                if (atomic_read(&amp;lock-&gt;tail) == curr &amp;&amp;
                    atomic_cmpxchg(&amp;lock-&gt;tail, curr, old) == curr) {
                        /*
                         * We were the last queued, we moved @lock back. @prev
                         * will now observe @lock and will complete its
                         * unlock()/unqueue().
                         */

                        break;
                }

                /*
                 * We must xchg() the @node-&gt;next value, because if we were to
                 * leave it in, a concurrent unlock()/unqueue() from
                 * @node-&gt;next might complete Step-A and think its @prev is
                 * still valid.
                 *
                 * If the concurrent unlock()/unqueue() wins the race, we'll
                 * wait for either @lock to point to us, through its Step-B, or
                 * wait for a new @node-&gt;next from its Step-C.
                 */

                if (node-&gt;next) {
                        next = xchg(&amp;node-&gt;next, NULL);
                        if (next)
                                break;
                }

                cpu_relax_lowlatency();
        }

        return next;
}

lock-rwlock

rwlock是基于qspinlock的,同一时间多个reader可以获得该lock,但是同一时间只有一个writer可以获得该lock。(下面分析还是基于qspinlock 的base commit a33fda35e3a7655fb7df756ed67822afb5ed5e8d)

1.structure

typedef struct qrwlock {
        atomic_t                cnts;
        arch_spinlock_t         lock;
} arch_rwlock_t;

cnts: 用来标记是有多个在拿reader锁或者writer锁。
lock: qspin_lock

2. 例子

(1) CPU1/2 reader lock
  • CPU1 通过cnts+= _QR_BIAS拿到reader 锁。
  • CPU2 通过cnts+= _QR_BIAS拿到reader 锁。
  • CPU1 通过cnts -= _QR_BIAS释放reader 锁。
  • CPU2 通过cnts -= _QR_BIAS释放reader 锁。
(2) CPU1 reader lock – CPU2 writer lock – CPU3 reader lock
  • CPU1 通过cnts+= _QR_BIAS拿到reader 锁。
  • CPU2 想要拿写锁,坚持cnts是否为0, 如果不为0,代表已经有读锁或者写锁被其他人拿到(这里有CPU1的读锁拿到了锁),然后CPU2 spin_lock(lock),这里没有其他CPU获得该spin_lock,CPU2 获得了锁,然后利用cnts |= _QW_WAITING通知后续的想要读或者写的CPU走show_patch也就是首先获得spin_lock的逻辑,然后CPU2等待cnts 变为_QW_WAITING(CPU1 已经写入了_QR_BIAS,这里是等CPU1清楚了_QR_BIAS)。
  • CPU3 想要获得读锁,但是发现cents & _QW_MASK != 0,所以需要走slow patch,首先尝试获得spin_lock但是该lock被CPU2拿着所以pending。
  • CPU1 cnts-=_QR_BIAS 释放reader 锁。
  • CPU2 看到了 cents = _QW_WATING,也就是前面的读锁已经释放了,set cnts = _QW_LOCKED 获得该写锁,CPU2 unlock spin_lock,CPU3 因为在等待该spin_lock,所以获得该spin_lock,CPU3 等待 cnts & _QW_WMASK == 0, 也就是所有的写锁被释放。
  • CPU2 通过 cnts -= _QW_LOCKED 是否写锁。
  • CPU3 看到了cnts & _QW_WMASK == 0,通过cnts += _QR_BIAS获得该读锁,并且释放spin_lock。

3. code

/**
 * queue_read_lock_slowpath - acquire read lock of a queue rwlock
 * @lock: Pointer to queue rwlock structure
 */

void queue_read_lock_slowpath(struct qrwlock *lock)
{
        u32 cnts;

        /*
         * Readers come here when they cannot get the lock without waiting
         */

        if (unlikely(in_interrupt())) {
                /*
                 * Readers in interrupt context will spin until the lock is
                 * available without waiting in the queue.
                 */

                cnts = smp_load_acquire((u32 *)&amp;lock-&gt;cnts);
                rspin_until_writer_unlock(lock, cnts);
                return;
        }
        /* 这里减去了_QR_BIAS 因为该cpu并没有获得reader锁 */
        atomic_sub(_QR_BIAS, &amp;lock-&gt;cnts);

        /*
         * Put the reader into the wait queue
         */

        arch_spin_lock(&amp;lock-&gt;lock);

        /*
         * At the head of the wait queue now, wait until the writer state
         * goes to 0 and then try to increment the reader count and get
         * the lock. It is possible that an incoming writer may steal the
         * lock in the interim, so it is necessary to check the writer byte
         * to make sure that the write lock isn't taken.
         */

        while (atomic_read(&amp;lock-&gt;cnts) &amp; _QW_WMASK)
                cpu_relax_lowlatency();

        /* 这里可以确定的是全面又一个writer lock已经释放了。(因为这个writer
         * lock,reader lock需要走slowpath
         * 但是在下面这个函数之前,也就是 &amp;lock-&gt;cnts + _QR_BIAS 之前,
         * lock-&gt;cnts是可能= 0的,这种情况下,在这个间隙之间,
         * 如果有其他的writer lock 来是可以直接拿到锁的.
         * 如果有其他的read 想拿到锁(在没有writer拿到锁的情况下)也是可以直接拿到锁了。
         */


        cnts = atomic_add_return(_QR_BIAS, &amp;lock-&gt;cnts) - _QR_BIAS;

        /* 由于上面解释说在间隙间是有可能被另外的writer拿到锁的所以需要等待
         * 该writer lock 释放后在继续
         */

        rspin_until_writer_unlock(lock, cnts);

        /*
         * Signal the next one in queue to become queue head
         */

        arch_spin_unlock(&amp;lock-&gt;lock);
}
EXPORT_SYMBOL(queue_read_lock_slowpath);
/**
 * queue_write_lock_slowpath - acquire write lock of a queue rwlock
 * @lock : Pointer to queue rwlock structure
 */

void queue_write_lock_slowpath(struct qrwlock *lock)
{
        u32 cnts;

        /* Put the writer into the wait queue */
        arch_spin_lock(&amp;lock-&gt;lock);

        /* 到了这里 writer 现在是head */
        /* Try to acquire the lock directly if no reader is present */
        if (!atomic_read(&amp;lock-&gt;cnts) &amp;&amp;
            (atomic_cmpxchg(&amp;lock-&gt;cnts, 0, _QW_LOCKED) == 0))
                goto unlock;

        /* 因为read_lock首先会查看lock-&gt;cnts的值,也就是如果该值没有被设置
         * _QW_LOCKED 或则 _QW_WARITING, 即便是write 先拿到了锁,也可能另外的
         * read 会首先获得锁,所以需要等待。 因为read_lock fast patch是直接
         * 检查_QW_WMASK的值是否被设置。
         * 也是有可能另外的writer_lock先获得锁,这种情况发生在
         *      READER-0        WRITER-0        WRITER-1
         *      LOCK
         *                      try_get_lock
         *      UNLOCK
         *                                      LOCK
         *                      check other
         *                      writer get lock
         */

        /*
         * Set the waiting flag to notify readers that a writer is pending,
         * or wait for a previous writer to go away.
         */

        for (;;) {
                cnts = atomic_read(&amp;lock-&gt;cnts);
                /* 下面等待所有其他的writer释放锁, 并且在设置了_QW_WAITING后
                 * 所有的在后面的read/writer都需要先lock-&gt;lock
                 */

                if (!(cnts &amp; _QW_WMASK) &amp;&amp;
                    (atomic_cmpxchg(&amp;lock-&gt;cnts, cnts,
                                    cnts | _QW_WAITING) == cnts))
                        break;

                cpu_relax_lowlatency();
        }

        /* When no more readers, set the locked flag */
        for (;;) {
                cnts = atomic_read(&amp;lock-&gt;cnts);
                /* 下面的保证所有持有reader锁已经释放 */
                if ((cnts == _QW_WAITING) &amp;&amp;
                    (atomic_cmpxchg(&amp;lock-&gt;cnts, _QW_WAITING,
                                    _QW_LOCKED) == _QW_WAITING))
                        break;

                cpu_relax_lowlatency();
        }
unlock:
        arch_spin_unlock(&amp;lock-&gt;lock);
}
EXPORT_SYMBOL(queue_write_lock_slowpath);
4. Notice

rwlock 是中的spin_lock可以保证在有写锁的情况下,读锁的一个顺序性。已经更新cnts的一致性。读写锁在拿到spin_lock的时候如果没有更新cents,其实是可以被其他的读写锁先获得也就是steal的概念(上面代码解析里面有些标注)主要是因为lock fast patch是直接看cnts的指来确实是否lock成功。某个CPUA 写锁在获得 spin_lock后(由于有CPU B 拿了读锁),可能会发现其他的CPU C 已经获得了写锁,可能原因是CPUA check cnts是看到了CPU B拿了读锁,去spin_lock的时候,CPU B刚巧释放了读锁,CPU C 看到了没有人拿锁(cnts ==0) 先拿了锁,那么CPUB在 spin_lock返回后就会看到有CPU C拿到写锁的情况(同理CPUC拿读锁也是一样的情况),CPUB就需要等到CPUC 是否了写锁后在拿到写锁。原本的拿锁顺序是
CPUA读-CPUB写-CPUC写,现在变成了CPUA度-CPUC写-CPUB写。需要看下最新的代码有没有对这个case 处理,或者说这个case 不是问题。

weekly 12/8

一些思考:

这周新能源在爆发期,群里面的讨论也很热闹,最大感觉还是投资是个逆人性过程,排除人性,等待过激,主要是看自己打算赚那部分钱,是业务反转,还是业务向好,还是概念炒作。

另外长线中线短线,要看整个行业的天花板,选股的话,需要看当前的产品是否有涨价的可能,行业的发展是否给公司带来了增量。

当确定是整体行业的增量的时候,一定要选择产能最大的,确定的,尤其对于有些投资来说产能的扩张需要几年的时间,当产能受限后,就可能会出现涨价趋势。

关于选龙头还是龙二的问题,龙头更容易获得益价格,这个一方面龙头在商业上是规模最大的,也是成本最低的,另外一方面龙头容易被机构抱团。

这周看到了一个推算是否涨价的方法,那锂电池薄膜为例,通过估算明年后年电动车的产量,推算出锂电池的量,最后推算出薄膜的用量。继而推算出是否有涨价的可能。

lock-qspinlock

qspinlock是为了解决ticketlock存在的多个cpu 同时等待同一个变量时cache 的改变对性能会产生很大的影响的问题,因为一个内存数据的改变会invalid 所有的等待的cpu上的cache。
下面分析机遇最初版本的qspinlock() a33fda35e3a7655fb7df756ed67822afb5ed5e8d

1. Structure

typedef struct qspinlock {
       atomic_t        val;
} arch_spinlock_t;
 /*
  * Bitfields in the atomic value:
  *
  * When NR_CPUS < 16K
  *  0- 7: locked byte
  *     8: pending
  *  9-15: not used
  * 16-17: tail index
  * 18-31: tail cpu (+1)
  *
  * When NR_CPUS >= 16K
  *  0- 7: locked byte
  *     8: pending
  *  9-10: tail index
  * 11-31: tail cpu (+1)
 */

struct mcs_spinlock {
       struct mcs_spinlock *next;
       int locked; /* 1 if lock acquired */
       int count;  /* nesting count, see qspinlock.c */
};

2. qspinlock的过程

(1). host 两个CPU1/2同时抢锁

  • CPU 1 set locked = 1 获得该锁
  • CPU 2 set pend = 1 然后等待锁的locked = 0
  • CPU1 set locked = 1 释放该锁
  • CPU2 atomic set locked = 1 && set pend = 0 获得该锁

(2). host 4个CPU同时抢夺锁

CPU1/2 获得锁的过程和上面第一图一致

  • CPU3 想要获得锁,发现 pend = 1,代表着已经有两个或者以上CPU在等待该锁,所以CPU3选择了queue,CPU3 首先将自己的mcs_node对应的id + cpu 写到到 cpu/idx(tail)中,然后等待locked =0 && pend = 0
  • CPU4想要获得锁,和CPU3一样会选择queue,CPU4 会得到当前cpu/idx 的指,通过atomic_xchg将自己对应的4/0写入,同时获得CPU3 对应的mcs_node,并将自己的mcs_node链接到CPU3对应的mcs_node->next,CPU4等待mcs_node->locked 变为1
  • CPU1 释放锁后,设置locked = 0
  • CPU2 设置 locked = 1 pend = 0 获得锁
  • CPU2 设置locked = 0 释放锁
  • CPU3 看到locked = 0 后,set locked = 1 获得该锁,并且通过自己的mcs_node 获得next 也就是CPU4对应的mcs_node,并且设置mcs_node->locked = 1,CPU4看到自己 mcs_node ->locked =1 后,改为等待spin_lock->locked = 0 && pend = 0
  • CPU3 set spin_lock->locked = 0,释放锁
  • CPU4 看到了spin_lock->locked == 0 && pend = 0,并且看到了spin_lock->cpu/idx 对应的值是自己,代表CPU4是最后一个获取锁的CPU,设置spin_lock->locked = 1 并且清楚其他bit

(3)guest 3 个 vcpu同时抢锁

针对虚拟化的场景,spinlock的处理不同点在与,当guest vcpu 等待lock的时候,是可以vmexit 到host中,host 调度可以根据需要运行其他的进程。同时当一个锁释放后,需要通过kick的动作将唤醒该vcpu对应的线程,并且vmentry 到guest中,继续guest 拿锁的动作

struct pv_node {
        struct mcs_spinlock     mcs;
        struct mcs_spinlock     __res[3];

        int                     cpu;
        u8                      state;
};

针对虚拟化场景,pv_node在原来mcs_lock上主要添加了两个变量,cpu代表是vcpu cpuid, state 代表的是当前vcpu运行的状态(halt/running)

  • vcpu1 set locked = 1拿锁
  • vcpu2 看到锁已经被其他vcpu获得后,(这里没有看到pv对应pending的处理,只要有vcpu等待都需要vmexit的,所以逻辑和host两个cpu同时抢锁有区别的,同时因为vmexit后的vcpu在unlock时需要被kick唤醒,所有只要有多于一个vcpu在等待该锁都需要附加的pv_node,这也是为什么没有用pending的原因(不知道最新版本的有没有优化))获得新的mcs并初始化cpu = 2 stat = run, 同时设置spin_lock->cpu/idx = 2/0,在等待spin_lock->locked 被设置前,会调用pv_wait_head通过该函数halt该vcpu,vmexit到host,使得host可以运行其他的认为,在该函数中,首先会修改pv_node->state = halt。然后修改spin_locked->locked = _Q_SLOW_VAL,并且wait spin_locked->locked != _Q_SLOW_VAL,等待的过程是调动halt()类似的函数,vcpu在调用halt后会vmexit到host中,host调度策略可以选取其他任务进行运行。
    这里几个点需要注意,vcpu2现在是第一个等待的vcpu,所以它对应的pv_node会和spin_lock值做一定的hash,这样在unlock的时候,通过spin_lock本身的地址就可以获得第一个等待pv_node。
  • vcpu3 想要获得该锁的时候前4个步骤都是一样的,到了第5步,vcpu3 检查pv_node->mcs.locked是否为0(为0 代表需要等待,为1代表需要等待),如果为0,vcpu3会调用pv_wait等待vcpu3的状态被修改为running。和vcpu2一样vcpu3也会通过halt vmexit到host上。
  • vcpu1 释放锁,修改spin_locked->locked = 0,并且通过spin_lock hash得到头部的pv_node,现在的pv_node为vcpu2的pv_node,拿到pv_node后获取到pv_node->cpu,
    通过pv_kick唤醒该vcpu。(pv_kick的原理后续写虚拟化会在讲,基本就是拿到vcpu对应的pcpu 然后发一个reschedule ipi)。
  • host 调度选择调度vcpu2 并且vcpu2通过vmentery 进入guest mode后,会检查spin_locked->locked 是否为0(我们看到unlock的时候是会先修改locked = 0 然后在kick, 但是呢vcpu有可能因为其他的一些中断被唤醒,例如timer 中断),为0,说明锁被释放了,修改spin_locked->locked = 1获得该锁,然后发现后面还有vcpu在等待(vcpu3),所以获得vcpu3 对应的pv_node,设置pv_node->mcs->locked = 1(vcpu3 当前是先看mcs->locked 是否为0才做等待state != halted的等待动作的),修改pv_node->state = running,然后pv_kick_vcpu3。这里设置状态并且kickvcpu3的原因是,让vcpu3向下一步,vcpu3当前在等待的头部了,所以需要等待spin_lock->locked的值了。
  • vcpu3 被调度并且运行后,会看到pv_node->state == running,所以继续运行,有看到pv_node->mcs.locked == 1,继续运行,到了pv_wait_head基本和vcpu2 第一次等待的动作一样的,修改pv_node->state = halt,修改spin_lock->loked = _Q_SLOW_VAL,然后等待spin_lock->locked = 0,等待过程和vcpu2 trylock一样,也是halt->vmexit。
  • vcpu2 释放lock后修改spin_lock ->locked = 0,并且通过spin_lock hash获得了头部的pv_node(上一步vcpu3 pv_wait_head 把自己的pv_node加入到hash中)。拿到了pv_node->vcpu 也就是vcpu3,然后pv_kick。
  • vcpu3 获得运行后,看到spin_lock->locked == 0, 通过设置locked =1 并且清除cpu/idx, 获得该锁。

3. Code

lock_fastpath
/**
 * queued_spin_lock - acquire a queued spinlock
 * @lock: Pointer to queued spinlock structure
 */

static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
        u32 val;

        val = atomic_cmpxchg(&amp;lock-&gt;val, 0, _Q_LOCKED_VAL);
        if (likely(val == 0)) /* 当前锁没有被lock, 直接lock该锁,并返回 */
                return;
        /* 当前锁被其他人lock了,需要等待到慢速路径*/
        queued_spin_lock_slowpath(lock, val);
}
lock_slowpath
/* 慢速路径 */
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
        struct mcs_spinlock *prev, *next, *node;
        u32 new, old, tail;
        int idx;

        BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

        if (pv_enabled())
                goto queue;

        if (virt_queued_spin_lock(lock))
                return;

        /*
         * wait for in-progress pending->locked hand-overs
         *
         * 0,1,0 -> 0,0,1
         */

        if (val == _Q_PENDING_VAL) {
        /*  出现这种情况是cpu1 拿到了锁,cpu2在等待锁,并且set 了pending,
         *  cpu1 release了锁,cpu2还没有拿到了锁,但是一定是cpu2先拿到锁
         */

                while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
                        cpu_relax();
        }

        /*
         * trylock || pending
         *
         * 0,0,0 -> 0,0,1 ; trylock
         * 0,0,1 -> 0,1,1 ; pending
         */

        for (;;) {
                /*
                 * If we observe any contention; queue.
                 */

                if (val & ~_Q_LOCKED_MASK)
                /* 这里类似图中CPU3 CPU4 try lock 的动作*/
                        goto queue;

                new = _Q_LOCKED_VAL;
                if (val == new)
                        /* 如果上次获得的val 代表的只有一个CPU拿着锁,
                         * 并且没有其他CPU等待锁,加上pending */

                        new |= _Q_PENDING_VAL;

                /* 这个通过判断old 和val的确定,在上一次该CPU获取val的时间点
                 * 到现在为止的一段时间内,有没有其他CPU已经在等待该锁或者
                 * 锁已经被释放了
                 */

                old = atomic_cmpxchg(&lock->val, val, new);
                if (old == val)
                        break;

                val = old;
        }

        /*
         * we won the trylock
         */

        if (new == _Q_LOCKED_VAL)
        /* 这里代表上面的逻辑中,val被改为了0, 也就是锁被释放了 */
                return;

        /*
         * we're pending, wait for the owner to go away.
         *
         * *,1,1 -> *,1,0
         *
         * this wait loop must be a load-acquire such that we match the
         * store-release that clears the locked bit and create lock
         * sequentiality; this is because not all clear_pending_set_locked()
         * implementations imply full barriers.
         */

        while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_MASK)
        /* 等到该锁的被释放 对应图中CPU2 try lock*/
                cpu_relax();

        /*
         * take ownership and clear the pending bit.
         *
         * *,1,0 -> *,0,1
         */

        /* 获取该锁,对应图中CPU2 get lock */
        clear_pending_set_locked(lock);
        return;

        /*
         * End of pending bit optimistic spinning and beginning of MCS
         * queuing.
         */

queue:
        /* 获得该CPU上的空闲的mcs_lock 并初始化 */
        node = this_cpu_ptr(&mcs_nodes[0]);
        idx = node->count++;
        tail = encode_tail(smp_processor_id(), idx);

        node += idx;
        node->locked = 0;
        node->next = NULL;
        /*pv: 初始化node->cpu/state */
        pv_init_node(node);

        /*
         * We touched a (possibly) cold cacheline in the per-cpu queue node;
         * attempt the trylock once more in the hope someone let go while we
         * weren't watching.
         */

        /* 优化 */
        if (queued_spin_trylock(lock))
                goto release;

        /*
         * We have already touched the queueing cacheline; don't bother with
         * pending stuff.
         *
         * p,*,* -> n,*,*
         */

        /* 将tail 更新到lock tail(cpu/idx) 并且返回原来的tail
         * 表示最后一个想要获得锁的CPU
         * 图中CPU3 try lock的时候会写入,CPU4在try lock的时候
         * 也会写入会覆盖CPU3写入的,来表示CPU4是最后一个想要
         * 获得锁的。
         */

        old = xchg_tail(lock, tail);

        /*
         * if there was a previous node; link it and wait until reaching the
         * head of the waitqueue.
         */

        /* 判断当前有没有多于2个CPU在等待锁,如果有把mcs链接到
         * last mcs->next。
         * 针对图中CPU3来说,old = 0, 说明只有一个等待锁
         * 针对图中CPU4来说,old = CPU3/IDX, 需要把CPU4对应的mcs链接到CPU3/idx mcs->next
         * 现在等待锁的是CPU2 CPU3。
         */

        if (old & _Q_TAIL_MASK) {
                prev = decode_tail(old);
                WRITE_ONCE(prev->next, node);

                /* pv:修改 node->stat = halted
                 * 如果node->locked = 0 等待 node->stat变为running
                 * 该函数会造成vcpu vmexit
                 */

                 pv_wait_node(node);
                /* 等待 node->locked 被设置, locked
                 * 会被前一个拿锁的CPU设置
                 * 对应图中CPU4 try lock, CPU3在拿到
                 * 锁后会设置 对应图中CPU3 get lock
                 */

                arch_mcs_spin_lock_contended(&node->locked);
        }

        /*
         * we're at the head of the waitqueue, wait for the owner & pending to
         * go away.
         *
         * *,x,y -> *,0,0
         *
         * this wait loop must use a load-acquire such that we match the
         * store-release that clears the locked bit and create lock
         * sequentiality; this is because the set_locked() function below
         * does not imply a full barrier.
         *
         */

        /* pv: 设置locked = _Q_SLOW_VAL, 如果该锁还是没有释放,等待一些循环后,halt
         * 然后vmexit到host, 该等待的node 会被hash, 在unlock的时候被获取到。
         */
     
         pv_wait_head(lock, node);
        /* 等待locked && pend 都被clear */
        while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK)
                cpu_relax();

        /*
         * claim the lock:
         *
         * n,0,0 -> 0,0,1 : lock, uncontended
         * *,0,0 -> *,0,1 : lock, contended
         *
         * If the queue head is the only one in the queue (lock value == tail),
         * clear the tail code and grab the lock. Otherwise, we only need
         * to grab the lock.
         */

        for (;;) {
                /* 该CPU对应的mcs不是最后一个等待的lock的CPU, 直接set lock
                 * 注意这里并没有修改lock->tail(cpu/idx)以及pending
                 * 图中CPU3 get lock
                 */

                if (val != tail) {
                        set_locked(lock);
                        break;
                }

                /* 该CPU是最后一个等待锁的CPU,直接修改
                 * lock->val = LOCKED_VAL,相当于clear 了tail/pending,
                 * 并且set了locked
                 * 对应图中 CPU4 get lock
                 */

                old = atomic_cmpxchg(&lock->val, val, _Q_LOCKED_VAL);
                if (old == val)
                        goto release;   /* No contention */

                val = old;
        }

        /*
         * contended path; wait for next, release.
         */

        /* 现在获得锁的CPU不是最后一个等待的CPU, 需要获得next, 并且设置next->locked
         * 图中CPU3 get lock的过程,拿到CPU4 mcs, 并且修改CPU4 mcs->locked = 1
         */

        while (!(next = READ_ONCE(node->next)))
                cpu_relax();

        arch_mcs_spin_unlock_contended(&next->locked);
        /*pv: kick 等待在pv_wait_node的next->cpu, 因为pv_wait_node等待的是
         *node->state 变为running,所以这个函数修改next->state = running,然后在
         * kick next->vcpu
         */

        pv_kick_node(next);

release:
        /*
         * release the node
         */

        this_cpu_dec(mcs_nodes[0].count);
}
unlock(native)
static inline void native_queued_spin_unlock(struct qspinlock *lock)
{
        smp_store_release((u8 *)lock, 0);
}
unlock(pv)
__visible void __pv_queued_spin_unlock(struct qspinlock *lock)
{
        struct __qspinlock *l = (void *)lock;
        struct pv_node *node;

        /*
         * We must not unlock if SLOW, because in that case we must first
         * unhash. Otherwise it would be possible to have multiple @lock
         * entries, which would be BAD.
         */

        if (likely(cmpxchg(&amp;l-&gt;locked, _Q_LOCKED_VAL, 0) == _Q_LOCKED_VAL))
                return;

        /*
         * Since the above failed to release, this must be the SLOW path.
         * Therefore start by looking up the blocked node and unhashing it.
         */

        /*通过lock获得头部等待的vcpu 对应的pv_node */
        node = pv_unhash(lock);

        /*
         * Now that we have a reference to the (likely) blocked pv_node,
         * release the lock.
         */

        /* set spinlock-&gt;lock = 0 */
        smp_store_release(&amp;l-&gt;locked, 0);

        /*
         * At this point the memory pointed at by lock can be freed/reused,
         * however we can still use the pv_node to kick the CPU.
         */

        /* halt唤醒vcpu */
        if (READ_ONCE(node-&gt;state) == vcpu_halted)
                pv_kick(node-&gt;cpu);
}