lock-mutex

mutex 和其他lock最大的区别是,拿着mutex是可以睡眠的,并且如果在尝试获得锁的情况下也是可以睡眠的。(分析基于base commit a33fda35e3a7655fb7df756ed67822afb5ed5e8d)。

1. structure
struct mutex {
          /* 1: unlocked, 0: locked, negative: locked, possible waiters */
          atomic_t                count;
          spinlock_t              wait_lock;
          struct list_head        wait_list;
  #if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
          struct task_struct      *owner;
  #endif
  #ifdef CONFIG_MUTEX_SPIN_ON_OWNER
          struct optimistic_spin_queue osq; /* Spinner MCS lock */
  #endif
  #ifdef CONFIG_DEBUG_MUTEXES
         void                    *magic;
  #endif
  #ifdef CONFIG_DEBUG_LOCK_ALLOC
         struct lockdep_map      dep_map;
  #endif
 };
2. 5个task 同时抢mutex的过程
  • CPU1 (task1) 想要获得该mutex,设置count = 0,设置owner = task1
  • CPU2 (task2) 尝试获得该mutex,首先count –,发现count < 0,走到slow path,首先获取osq lock(这里假设当前task1->on_cpu是设置的,也就是task1可能会很快会释放锁)(这是一种类似qlock的mcs lock,主要区别在于osq lock,在spin 期间是如果发现自己可以发生调度的话,是可以dequeue自己,然后发生调度的),并且设置osq->tail = CPU2,然后CPU2 会等待count 变为 1
  • CPU3 想要获取mutex,同样会对count — ,发现count < 0,走slowpath,首先会获取osq锁,该锁已经被CPU2(task2)获得,task3 获取CPU3上percpu的node,将自己写入osq->tail,然后将node链接到CPU2(task2)对应的node上,然后等待自己的node->lock 变为1或者need_schedule被设置。
  • CPU4想要获取mutex,对count — ,发现count < 0,走slowpath,这里假设task1->on_cpu没有被设置,那么CPU4(task4)不在尝试获得osq 锁,而是将自己加入到mutex->wait_list上,设置为TASK_UNINTERRUPTIBLE,发生调度。
  • CPU5(task5)想要获得该锁,count –,走slowpath,假设这个时候task1->on_cpu = 1被设置了也即是task1在CPU1上运行了,CPU5会尝试获取osq锁,该锁已经被CPU2(task2)获得,task5 获取CPU5上percpu的node,将自己写入osq->tail,然后将node链接到CPU3(task3)(prev_tail)对应的node上,然后等待自己的node3->lock 变为1或者need_schedule被设置。
  • CPU3 need_schedule被设置了,CPU3会将自己的在osq的node list上面去掉,并且重新连接CPU2和CPU5(这个过程比较复杂,因为设计到node3->prev/next 必须是一个确定的值,因为CPU2/CPU5 本省也是可能会把自己在nodelist中剥离出去,后面的code有详细分析一个corner case,大体的实现通过atomic_xchg/cmpxchg的动作获得prev/next的同时设置为NULL),然后task3 会把自己加入到wait_list里面。
  • CPU1(task1) unlock会将count 设置为0,并且会选择wait_list头部的任务进行wakeup(wait_lock保证wait_list的互斥)。这里大概率会是task2先获取到锁,但是也有可能task4获取到锁。
  • CPU2看到了count = 1 后设置count = 0,设置owner = task2,unlock了osq,看到了CPU5(task5)在等待该osq lock,将该锁osq lock->locked设置为1,CPU5(task5)拿到了osq lock,等待count == 1。
  • CPU2 释放锁,清owner,设置 count = 1,选择wait_list的头部任务task4 然后 wakeup。
  • CPU5拿到了mutex,首先是设置count = 0, 然后设置owner =task5,最后unlock osq锁。
  • CPU5(task5)释放锁,首先clear owner,然后设置 count =1, 然后选择wait_list中的任务wakeup。
  • CPU4 被wakeup 后,拿wait_lock,修改任务状态为TASK_RUNNING,设置count = -1 (因为wait_list里面还有task3),然后设置owner = task4,将task4从wait_list去掉,最后释放wait_lock。
  • CPU4(task4)释放锁,首先clear owner,然后set count =1,然后选择wait_list中的任务task3 wakeup。
  • CPU3 被wakeup 后,拿wait_lock,设置count = -1,然后设置owner = task4,将task3从wait_list去掉,因为wait_list中已经没有等待的任务了,设置count = 0,最后释放wait_lock。
3. code
#define __mutex_fastpath_lock(v, fail_fn)                       \
do {                                                            \
        unsigned long dummy;                                    \
                                                                \
        typecheck(atomic_t *, v);                               \
        typecheck_fn(void (*)(atomic_t *), fail_fn);            \
                                                                \
        /* lock->count -- */

        /* if lock->count < 0, call fail_fn */
        asm volatile(LOCK_PREFIX "   decl (%%rdi)\n"            \
                     "   jns 1f         \n"                     \
                     "   call " #fail_fn "\n"                   \
                     "1:"                                       \
                     : "=D" (dummy)                             \
                     : "D" (v)                                  \
                     : "rax", "rsi", "rdx", "rcx",              \
                       "r8", "r9", "r10", "r11", "memory");     \
} while (0)


__visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}
*
 * Lock a mutex (possibly interruptible), slowpath:
 */
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
                    struct lockdep_map *nest_lock, unsigned long ip,
                    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;
        struct mutex_waiter waiter;
        unsigned long flags;
        int ret;

        preempt_disable();
        mutex_acquire_nest(&amp;lock-&gt;dep_map, subclass, 0, nest_lock, ip);

        /* 1. 通过mcs_lock带来的短暂的spin动作获得该mutex */
        if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
                /* got the lock, yay! */
                preempt_enable();
                return 0;
        }

        /* 2. wait_lock 是为了让所有cpu 线性的去访问锁并且保护wait_list */
        spin_lock_mutex(&amp;lock-&gt;wait_lock, flags);

        /*
         * Once more, try to acquire the lock. Only try-lock the mutex if
         * it is unlocked to reduce unnecessary xchg() operations.
         */

        if (!mutex_is_locked(lock) &amp;&amp; (atomic_xchg(&amp;lock-&gt;count, 0) == 1))
                goto skip_wait;

        debug_mutex_lock_common(lock, &amp;waiter);
        debug_mutex_add_waiter(lock, &amp;waiter, task_thread_info(task));

        /* add waiting tasks to the end of the waitqueue (FIFO): */
        /* 加入到waiter list中 */
        list_add_tail(&amp;waiter.list, &amp;lock-&gt;wait_list);
        waiter.task = task;

        lock_contended(&amp;lock-&gt;dep_map, ip);

        for (;;) {
                /*
                 * Lets try to take the lock again - this is needed even if
                 * we get here for the first time (shortly after failing to
                 * acquire the lock), to make sure that we get a wakeup once
                 * it's unlocked. Later on, if we sleep, this is the
                 * operation that gives us the lock. We xchg it to -1, so
                 * that when we release the lock, we properly wake up the
                 * other waiters. We only attempt the xchg if the count is
                 * non-negative in order to avoid unnecessary xchg operations:
                 */

                if (atomic_read(&amp;lock-&gt;count) &gt;= 0 &amp;&amp;
                    (atomic_xchg(&amp;lock-&gt;count, -1) == 1))
                        break;

                /*
                 * got a signal? (This code gets eliminated in the
                 * TASK_UNINTERRUPTIBLE case.)
                 */

                if (unlikely(signal_pending_state(state, task))) {
                        ret = -EINTR;
                        goto err;
                }

                if (use_ww_ctx &amp;&amp; ww_ctx-&gt;acquired &gt; 0) {
                        ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
                        if (ret)
                                goto err;
                }

                __set_task_state(task, state);

                /* didn't get the lock, go to sleep: */
                spin_unlock_mutex(&amp;lock-&gt;wait_lock, flags);
                /* 主动调度 */
                schedule_preempt_disabled();
                spin_lock_mutex(&amp;lock-&gt;wait_lock, flags);
        }
        /* 到这里说明获得了该锁 */
        __set_task_state(task, TASK_RUNNING);

        mutex_remove_waiter(lock, &amp;waiter, current_thread_info());
        /* set it to 0 if there are no waiters left: */
        if (likely(list_empty(&amp;lock-&gt;wait_list)))
                /* 这里因为前面的会把lock-&gt;count 直接写入为 -1, 所以如果没有其他在wait_list的人直接设置为 0 */
                atomic_set(&amp;lock-&gt;count, 0);
        debug_mutex_free_waiter(&amp;waiter);

skip_wait:
        /* got the lock - cleanup and rejoice! */
        lock_acquired(&amp;lock-&gt;dep_map, ip);
        mutex_set_owner(lock);

        if (use_ww_ctx) {
                struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
                ww_mutex_set_context_slowpath(ww, ww_ctx);
        }

        spin_unlock_mutex(&amp;lock-&gt;wait_lock, flags);
        preempt_enable();
        return 0;

err:
        mutex_remove_waiter(lock, &amp;waiter, task_thread_info(task));
        spin_unlock_mutex(&amp;lock-&gt;wait_lock, flags);
        debug_mutex_free_waiter(&amp;waiter);
        mutex_release(&amp;lock-&gt;dep_map, 1, ip);
        preempt_enable();
        return ret;
}

mutex_optimistic_spin主要是为了更快速的拿mutex,而不是先wait然後睡眠.

/*
 * Optimistic spinning.
 *
 * We try to spin for acquisition when we find that the lock owner
 * is currently running on a (different) CPU and while we don't
 * need to reschedule. The rationale is that if the lock owner is
 * running, it is likely to release the lock soon.
 *
 * Since this needs the lock owner, and this mutex implementation
 * doesn't track the owner atomically in the lock field, we need to
 * track it non-atomically.
 *
 * We can't do this for DEBUG_MUTEXES because that relies on wait_lock
 * to serialize everything.
 *
 * The mutex spinners are queued up using MCS lock so that only one
 * spinner can compete for the mutex. However, if mutex spinning isn't
 * going to happen, there is no point in going through the lock/unlock
 * overhead.
 *
 * Returns true when the lock was taken, otherwise false, indicating
 * that we need to jump to the slowpath and sleep.
 */

static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;

        /* 检查获得锁的task是否on_cpu也就是在运行中, on_cpu会在context_switch时候设置 */
        if (!mutex_can_spin_on_owner(lock))
                goto done;

        /*
         * In order to avoid a stampede of mutex spinners trying to
         * acquire the mutex all at once, the spinners need to take a
         * MCS (queued) lock first before spinning on the owner field.
         */

        if (!osq_lock(&lock->osq))
                goto done;

        while (true) {
                struct task_struct *owner;

                if (use_ww_ctx && ww_ctx->acquired > 0) {
                        struct ww_mutex *ww;
                        ww = container_of(lock, struct ww_mutex, base);
                        /*
                         * If ww->ctx is set the contents are undefined, only
                         * by acquiring wait_lock there is a guarantee that
                         * they are not invalid when reading.
                         *
                         * As such, when deadlock detection needs to be
                         * performed the optimistic spinning cannot be done.
                         */

                        if (READ_ONCE(ww->ctx))
                                break;
                }

                /*
                 * If there's an owner, wait for it to either
                 * release the lock or go to sleep.
                 */

                owner = READ_ONCE(lock->owner);
                if (owner && !mutex_spin_on_owner(lock, owner))
                        break;

                /* Try to acquire the mutex if it is unlocked. */
                if (mutex_try_to_acquire(lock)) {
                        lock_acquired(&lock->dep_map, ip);

                        if (use_ww_ctx) {
                                struct ww_mutex *ww;
                                ww = container_of(lock, struct ww_mutex, base);

                                ww_mutex_set_context_fastpath(ww, ww_ctx);
                        }

                        mutex_set_owner(lock);
                        osq_unlock(&lock->osq);
                        return true;
                }

                /*
                 * When there's no owner, we might have preempted between the
                 * owner acquiring the lock and setting the owner field. If
                 * we're an RT task that will live-lock because we won't let
                 * the owner complete.
                 */

                if (!owner && (need_resched() || rt_task(task)))
                        break;

                /*
                 * The cpu_relax() call is a compiler barrier which forces
                 * everything in this loop to be re-loaded. We don't need
                 * memory barriers as we'll eventually observe the right
                 * values at the cost of a few extra spins.
                 */

                cpu_relax_lowlatency();
        }

        osq_unlock(&lock->osq);
done:
        /*
         * If we fell out of the spin path because of need_resched(),
         * reschedule now, before we try-lock the mutex. This avoids getting
         * scheduled out right after we obtained the mutex.
         */

        if (need_resched()) {
                /*
                 * We _should_ have TASK_RUNNING here, but just in case
                 * we do not, make it so, otherwise we might get stuck.
                 */

                __set_current_state(TASK_RUNNING);
                schedule_preempt_disabled();
        }

        return false;
}

osq_lock/unlock逻辑如下,尤其代码中的osq_wait_next stablize next/prev的动作,代码中有个例子比较有趣.

bool osq_lock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node = this_cpu_ptr(&amp;osq_node);
        struct optimistic_spin_node *prev, *next;
        int curr = encode_cpu(smp_processor_id());
        int old;

        node-&gt;locked = 0;
        node-&gt;next = NULL;
        node-&gt;cpu = curr;

        /* 设置 curr 到tail 代表的是当前最后等待lock的cpu */
        old = atomic_xchg(&amp;lock-&gt;tail, curr);
        if (old == OSQ_UNLOCKED_VAL)
                return true;

        prev = decode_cpu(old);
        node-&gt;prev = prev;
        WRITE_ONCE(prev-&gt;next, node);

        /*
         * Normally @prev is untouchable after the above store; because at that
         * moment unlock can proceed and wipe the node element from stack.
         *
         * However, since our nodes are static per-cpu storage, we're
         * guaranteed their existence -- this allows us to apply
         * cmpxchg in an attempt to undo our queueing.
         */


        /* 等待自己的locked 被设置 */
        while (!READ_ONCE(node-&gt;locked)) {
                /*
                 * If we need to reschedule bail... so we can block.
                 */

                /* 这里是和qspinlock的区别,这里可能会调度出去 */
                if (need_resched())
                        goto unqueue;

                cpu_relax_lowlatency();
        }
        return true;

unqueue:
        /*
         * Step - A  -- stabilize @prev
         *
         * Undo our @prev-&gt;next assignment; this will make @prev's
         * unlock()/unqueue() wait for a next pointer since @lock points to us
         * (or later).
         */


        /*
         * 这里是确定prev 到底的值,确定的方法是cmpxchg通过比较prev-&gt;next和
         * node的值,如果prev-&gt;next==node, 设置prev-&gt;next = NULL, 以为上面的两个
         * 动作是atomic的,prev-&gt;next = NULL后,代表的是prev-&gt;next是unstabled,
         * 也就是说如果这种情况下 prev 有可能因为need_schedule的原因也要发生unqueue
         * 动作,那么prev 在后面的stable next的时候会发现prev-&gt;next ==NULL, 所以
         * 会一直等待.
         * 例子假设当前的mcs_lock的链是:
         * Begin:
         *      CPU1 ----&gt;      CPU2 ----&gt;      CPU3 ----&gt;              CPU4                            CPU5
         *      hold_mutex      ......          ......                  ......                          ......
         *                      osq_get_lock    ......                  ......                          ......
         *                                      osq_try_lock            ......                          ......
         *                                      ......                  ......                          ......
         *                                      ......                  osq_try_lock                    ......
         *                                      wait(node3-&gt;locked)     ......                          ......
         *                                      ......                  ......                          osq_try_lock
         *                                      ......                  wait(node4-&gt;locked)             ......  
         *                                      ......                  ......                          ......
         *                                      ......                  ......                          wait(node5-&gt;locked)
         *                                      need_sched()            ......                          ......
         *                                      unqueue
         *                                      ......                  ......                          ......
         *                                      ......                  need_sched()                    ......
         *                                      ......                  unqueue                         ......
         *                                      ......                  ......                          ......
         *                                      ......                  ......                          ......
         *                                      get_stable_prev(CPU2)   ......                          ......
         *                                      prev = CPU2
         *                                      CPU2-&gt;next = NULL                      
         *                                      ......                  ......                          ......
         *                                      ......                  get_statble_prev(CPU3)          ......
         *                                                              prev = CPU3
         *                                                              CPU3-&gt;next = NULL
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      try_get_stable_next     ......
         *                                      while(CPU3-&gt;next==NULL)
         *                                              loop
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  get_stable_next
         *                                                              next = CPU4-&gt;next(CPU5)
         *                                                              CPU4-&gt;next = NULL
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      ......                  ......
         *                                                              prev(CPU3)-&gt;next = next(CPU5)
         *                                                              prev(CPU5)-&gt;prev = prev(CPU3)
         *                                      ......                  ......
         *                                      ......                  ......
         *                                      loop end                ......
         *                                      CPU3-&gt;next= CPU5
         *                                      next is stable
         *                                      ......                  ......
         *                                      get_stable_next         ......
         *                                      next = CPU5
         *                                      CPU3-&gt;next = NULL
         *                                      ......                  ......
         *
         *                                      prev(CPU2)-&gt;next = next(CPU5)
         *                                      prev(CPU5)-&gt;prev = prev(CPU2)
         *
         * End:
         *      CPU1 ----&gt;      CPU2 ----&gt;      CPU5
         */


        for (;;) {
                if (prev-&gt;next == node &amp;&amp;
                   cmpxchg(&amp;prev-&gt;next, node, NULL) == node)
                        break;

                /*
                 * We can only fail the cmpxchg() racing against an unlock(),
                 * in which case we should observe @node-&gt;locked becomming
                 * true.
                 */

                if (smp_load_acquire(&amp;node-&gt;locked))
                        return true;

                cpu_relax_lowlatency();

                /*
                 * Or we race against a concurrent unqueue()'s step-B, in which
                 * case its step-C will write us a new @node-&gt;prev pointer.
                 */

                prev = READ_ONCE(node-&gt;prev);
        }

        /*
         * Step - B -- stabilize @next
         *
         * Similar to unlock(), wait for @node-&gt;next or move @lock from @node
         * back to @prev.
         */


        next = osq_wait_next(lock, node, prev);
        if (!next)
                return false;

        /*
         * Step - C -- unlink
         *
         * @prev is stable because its still waiting for a new @prev-&gt;next
         * pointer, @next is stable because our @node-&gt;next pointer is NULL and
         * it will wait in Step-A.
         */


        WRITE_ONCE(next-&gt;prev, prev);
        WRITE_ONCE(prev-&gt;next, next);

        return false;
}


void osq_unlock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node, *next;
        int curr = encode_cpu(smp_processor_id());

        /*
         * Fast path for the uncontended case.
         */

        if (likely(atomic_cmpxchg(&amp;lock-&gt;tail, curr, OSQ_UNLOCKED_VAL) == curr))
                return;

        /*
         * Second most likely case.
         */

        node = this_cpu_ptr(&amp;osq_node);
        next = xchg(&amp;node-&gt;next, NULL);
        if (next) {
                WRITE_ONCE(next-&gt;locked, 1);
                return;
        }

        /* 等待next 变成stable */
        next = osq_wait_next(lock, node, NULL);
        if (next)
                /* 通知下一个等待的为 1 */
                WRITE_ONCE(next-&gt;locked, 1);
}

/*
 * Get a stable @node-&gt;next pointer, either for unlock() or unqueue() purposes.
 * Can return NULL in case we were the last queued and we updated @lock instead.
 */

static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
              struct optimistic_spin_node *node,
              struct optimistic_spin_node *prev)
{
        struct optimistic_spin_node *next = NULL;
        int curr = encode_cpu(smp_processor_id());
        int old;

        /*
         * If there is a prev node in queue, then the 'old' value will be
         * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
         * we're currently last in queue, then the queue will then become empty.
         */

        old = prev ? prev-&gt;cpu : OSQ_UNLOCKED_VAL;

        for (;;) {
                if (atomic_read(&amp;lock-&gt;tail) == curr &amp;&amp;
                    atomic_cmpxchg(&amp;lock-&gt;tail, curr, old) == curr) {
                        /*
                         * We were the last queued, we moved @lock back. @prev
                         * will now observe @lock and will complete its
                         * unlock()/unqueue().
                         */

                        break;
                }

                /*
                 * We must xchg() the @node-&gt;next value, because if we were to
                 * leave it in, a concurrent unlock()/unqueue() from
                 * @node-&gt;next might complete Step-A and think its @prev is
                 * still valid.
                 *
                 * If the concurrent unlock()/unqueue() wins the race, we'll
                 * wait for either @lock to point to us, through its Step-B, or
                 * wait for a new @node-&gt;next from its Step-C.
                 */

                if (node-&gt;next) {
                        next = xchg(&amp;node-&gt;next, NULL);
                        if (next)
                                break;
                }

                cpu_relax_lowlatency();
        }

        return next;
}

发表评论