thread pthread_mutex - ceragon/LinuxDoc GitHub Wiki

pthread mutex

结构体

typedef union
{
    struct __pthread_mutex_s __data;
    char __size[__SIZEOF_PTHREAD_MUTEX_T];
    long int __align;
} pthread_mutex_t;

__kind 标志位含义

  • 0b0_1000_0000: 表示锁非线程私有
  • 0b1_0000_0000: 表示支持锁重入
struct __pthread_mutex_s
{
    int __lock;
    unsigned int __count;
    int __owner;
    unsigned int __nusers;
    int __kind;
    short __spins;
    short __elision;
    __pthread_list_t __list;
};
typedef struct __pthread_internal_list
{
    struct __pthread_internal_list *__prev;
    struct __pthread_internal_list *__next;
} __pthread_list_t;
typedef union
{
    char __size[__SIZEOF_PTHREAD_MUTEXATTR_T];
    int __align;
} pthread_mutexattr_t;
struct pthread_mutexattr
{
    int mutexkind;
};

init

static const struct pthread_mutexattr default_mutexattr =
{
    .mutexkind = PTHREAD_MUTEX_NORMAL
};
int __pthread_mutex_init (pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr) {
    // mutex 初始化后通过 mutex 指针返回锁对象,mutexattr 用于配置 mutex 的属性
    const struct pthread_mutexattr *imutexattr;
    // 如果没有传入 mutexattr 参数,使用默认参数。默认是 PTHREAD_MUTEX_NORMAL 类型
    imutexattr = ((const struct pthread_mutexattr *) mutexattr ?: &default_mutexattr);
    // 把 mutex 的所有值就变为 0 值
    memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T);
    //  PTHREAD_MUTEXATTR_FLAG_BITS = 1111_0000_1111_1111_1111_0000_0000_0000
    // ~PTHREAD_MUTEXATTR_FLAG_BITS = 0000_1111_0000_0000_0000_1111_1111_1111
    // PTHREAD_MUTEX_NORMAL 对应的类型是 0
    int mutex_kind = imutexattr->mutexkind & ~PTHREAD_MUTEXATTR_FLAG_BITS;
    // PTHREAD_MUTEXATTR_FLAG_ROBUST = 0100_0000_0000_0000_0000_0000_0000_0000
    if ((imutexattr->mutexkind & PTHREAD_MUTEXATTR_FLAG_ROBUST) != 0) {
        // PTHREAD_MUTEX_ROBUST_NORMAL_NP = 1_0000
        mutex_kind |= PTHREAD_MUTEX_ROBUST_NORMAL_NP;
    }
    // 和上面很类似,都是在处理 mutex_kind 应该是什么值
    // 省略代码
    // 修改 mutex 对应的 __kind 值
    atomic_store_relaxed (&(mutex->__data.__kind), mutex_kind);
    // 宏对应的是空实现
//    LIBC_PROBE (mutex_init, 1, mutex);
}

lock

# define PTHREAD_MUTEX_LOCK ___pthread_mutex_lock
int PTHREAD_MUTEX_LOCK(pthread_mutex_t *mutex) {
    // atomic_load_relaxed 主要作用是告诉编译器从内存读取 __kind 值
//    unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
    // 127 | PTHREAD_MUTEX_ELISION_NP = 127 | 256 = 1_0111_1111
    unsigned int type = atomic_load_relaxed(&(mutex->__data.__kind)) & (127 | PTHREAD_MUTEX_ELISION_NP));
//    LIBC_PROBE (mutex_entry, 1, mutex);
    // ~(PTHREAD_MUTEX_KIND_MASK_NP | PTHREAD_MUTEX_ELISION_FLAGS_NP) = ~(3 | (256 | 512)) = ~(11_0000_0011) = 1111_1100
    // type & 1111_1100 == 0
    if (__builtin_expect(type & ~(PTHREAD_MUTEX_KIND_MASK_NP
                                  | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
        return __pthread_mutex_lock_full(mutex);
    // PTHREAD_MUTEX_TIMED_NP == 0
    if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP)) {
        // 强制锁省略
        FORCE_ELISION (mutex, goto elision);
    simple:
        // 尝试将 mutex->lock 的最低位设为 1,但如果进入过 wait 状态 lock 的值是 2
        LLL_MUTEX_LOCK_OPTIMIZED (mutex);
    }
    // 是否支持 锁省略,通常 intel cpu 会支持
#if ENABLE_ELISION_SUPPORT
    // PTHREAD_MUTEX_TIMED_ELISION_NP = 0b1_0000_0000
    else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP)) {
        elision:
        return LLL_MUTEX_LOCK_ELISION (mutex);
    }
#endif
    // 是否支持锁重入
    else if (__builtin_expect(PTHREAD_MUTEX_TYPE (mutex)
                              == PTHREAD_MUTEX_RECURSIVE_NP, 1)) {
        // 获取线程的 tid
        pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
        if (mutex->__data.__owner == id) {
            // 表示锁重入成功
            // 估计是重入次数不能超过 int 最大值
            if (__glibc_unlikely (mutex->__data.__count + 1 == 0))
                return EAGAIN;
            // 重入次数 +1
            ++mutex->__data.__count;
            return 0;
        }
        // 锁重入失败,走正常的加锁
        LLL_MUTEX_LOCK_OPTIMIZED (mutex);
        mutex->__data.__count = 1;
    } else if (__builtin_expect(PTHREAD_MUTEX_TYPE (mutex)
                                == PTHREAD_MUTEX_ADAPTIVE_NP, 1)) {
        if (LLL_MUTEX_TRYLOCK (mutex) != 0) {
            // 自旋锁
            int cnt = 0;
            int max_cnt = MIN (max_adaptive_count(),
                               mutex->__data.__spins * 2 + 10);
            do {
                if (cnt++ >= max_cnt) {
                    LLL_MUTEX_LOCK (mutex);
                    break;
                }
                atomic_spin_nop ();
            } while (LLL_MUTEX_READ_LOCK (mutex) != 0
                     || LLL_MUTEX_TRYLOCK (mutex) != 0);

            mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
        }
        // 说明 try lock 成功
    } else {
        // 默认方式就是查看一下锁重入是否成功
        pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
        if (__glibc_unlikely (mutex->__data.__owner == id))
            return EDEADLK;
        goto simple;
    }
    // 获取线程的 tid 值
    pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
    // 既然加锁成功了,就把 __owner 设为 1
    mutex->__data.__owner = id;
#ifndef NO_INCR
    // TODO:
    ++mutex->__data.__nusers;
#endif
//    LIBC_PROBE (mutex_acquired, 1, mutex);
    return 0;
}

lll_mutex_lock_optimized

static inline void lll_mutex_lock_optimized(pthread_mutex_t *mutex) {
    // 单线程优化仅对私有互斥锁有效。
    // 对于进程共享的互斥锁,互斥锁可能位于共享映射中,因此即使没有任何线程也需要与另一个进程同步。
    // 如果锁已被标记为已获取,POSIX 要求 pthread_mutex_lock 为普通互斥锁死锁,因此在这种情况下也跳过优化。
//    int private = PTHREAD_MUTEX_PSHARED (mutex);
    // 128 = 1000_0000
    int private = atomic_load_relaxed(&(mutex)->__data.__kind) & 128;
    // LLL_PRIVATE = 0,也就是右起第8位 1 表示不私有
    // SINGLE_THREAD_P 可能表示是不是单线程
    // __lock == 0 表示当前没上锁
    if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
        // 既然锁是线程私有的,直接设为1 就是加锁了
        mutex->__data.__lock = 1;
    else {
        lll_lock (mutex->__data.__lock, private);
    }
}

lll_lock

#define __lll_lock(futex, private)                                      \
  ((void)                                                               \
   ({                                                                   \
     int *__futex = &(mutex->__data.__lock);
     // 期望 futex 的值是否是 0,然后置为 1
     if (!atomic_compare_and_exchange_bool_acq(__futex, 1, 0)) {
         // 操作失败,由于不想考虑线程私有的情况,所以直接看正常的 wait
         __lll_lock_wait(__futex, private);
     }                                                                \
   }))

lll_lock_elision

lll_lock_elision ((mutex)->__data.__lock, (mutex)->__data.__elision, \
		   PTHREAD_MUTEX_PSHARED (mutex));
lll_lock_elision ((mutex)->__data.__lock, (mutex)->__data.__elision, \
		   atomic_load_relaxed (&((m)->__data.__kind)) & 128);
__lll_lock_elision (&(futex), &(adapt_count), private)                  
int
__lll_lock_elision(int *futex, short *adapt_count, EXTRAARG int private) {
    // 加载 __data.__elision 值
    if (atomic_load_relaxed (adapt_count) <= 0) {
        // 如果 自适应值 小于等于 0
        unsigned status;
        int try_xbegin;
        // aconf.retry_try_xbegin 默认值是 3,所以下面起始就是循环3次
        for (try_xbegin = aconf.retry_try_xbegin;
             try_xbegin > 0;
             try_xbegin--) {
            
            if ((status = _xbegin()) == _XBEGIN_STARTED) {
                if (*futex == 0)
                    // 成功的锁消除
                    return 0;
                _xabort(_ABORT_LOCK_BUSY);
            }

            if (!(status & _XABORT_RETRY)) {
                if ((status & _XABORT_EXPLICIT)
                    && _XABORT_CODE (status) == _ABORT_LOCK_BUSY) {
                    /* Right now we skip here.  Better would be to wait a bit
                       and retry.  This likely needs some spinning.  See
                       above for why relaxed MO is sufficient.  */
                    if (atomic_load_relaxed (adapt_count)
                        != aconf.skip_lock_busy)
                        atomic_store_relaxed (adapt_count, aconf.skip_lock_busy);
                }
                    /* Internal abort.  There is no chance for retry.
                   Use the normal locking and next time use lock.
                   Be careful to avoid writing to the lock.  See above for why
                   relaxed MO is sufficient.  */
                else if (atomic_load_relaxed (adapt_count)
                         != aconf.skip_lock_internal_abort)
                    atomic_store_relaxed (adapt_count,
                                          aconf.skip_lock_internal_abort);
                break;
            }
        }
    } else {
        // 此时adapt_count > 0, 那么就将 adapt_count 原子的方式 -1
        atomic_store_relaxed (adapt_count,
                              atomic_load_relaxed(adapt_count) - 1);
    }
    // 对 futex 加锁,不成功会等待
    return LLL_LOCK (*futex, private);
}

__pthread_mutex_lock_full

//TODO:

unlock

int
__pthread_mutex_unlock(pthread_mutex_t *mutex) {
    return __pthread_mutex_unlock_usercnt(mutex, 1);
}

int
attribute_hidden
__pthread_mutex_unlock_usercnt(pthread_mutex_t *mutex, int decr) {
//  int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
    // PTHREAD_MUTEX_ELISION_NP = 256。 127 | 256 = 0b1_0111_1111。右起第8位是是否线程私有位
    int type = atomic_load_relaxed &(mutex->__data.__kind)	& (127 | PTHREAD_MUTEX_ELISION_NP);
    if (__builtin_expect(type
                         & ~(PTHREAD_MUTEX_KIND_MASK_NP
                             | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
        return __pthread_mutex_unlock_full(mutex, decr);
    
    // PTHREAD_MUTEX_TIMED_NP = 0,期望 type == 0
    if (__builtin_expect(type, PTHREAD_MUTEX_TIMED_NP)
        == PTHREAD_MUTEX_TIMED_NP) {
        // 字面意思是,锁的类型是带超时的锁
    normal:
        // 解锁的方式的就是将 __owner 置为 0
        mutex->__data.__owner = 0;
        if (decr) {
            // 如果 decr 是非0值,就 -1
            --mutex->__data.__nusers;
        }
        /* Unlock.  */
        lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED(mutex));
//        LIBC_PROBE (mutex_release, 1, mutex);
        return 0;
    } else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP)) {
        // 由于类型是锁消除,而加锁的时候也没设置 __data.__owner,所以这里也用不着设置
        return lll_unlock_elision (mutex->__data.__lock, mutex->__data.__elision,
                                   PTHREAD_MUTEX_PSHARED(mutex));
    } else if (__builtin_expect(PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_RECURSIVE_NP, 1)) {
        // 支持锁重入    
        if (mutex->__data.__owner != THREAD_GETMEM (THREAD_SELF, tid))
            // 不是当前线程上的锁
            return EPERM;
        if (--mutex->__data.__count != 0)
            // 锁的重入次数 -1,如果 -1 后的值 > 0,说明锁还没释放完全
            return 0;
        // 锁释放完全了,执行锁的正常解锁
        goto normal;
    } else if (__builtin_expect(PTHREAD_MUTEX_TYPE (mutex)
                                == PTHREAD_MUTEX_ADAPTIVE_NP, 1))
        // 自旋锁
        // 自旋锁与普通锁没太大差别,所以解锁方式一样
        goto normal;
    else {
        if (mutex->__data.__owner != THREAD_GETMEM (THREAD_SELF, tid)
            || !lll_islocked (mutex->__data.__lock))
            // 如果不是自己持有锁,或者当前没有加锁,就返回异常
            return EPERM;
        // 可以正常解锁
        goto normal;
    }
}

lll_unlock

void
lll_unlock(int* lock, int private) {
    __lll_unlock (&(lock), private);
}

void
__lll_unlock (int* futex, int private) {
    int *__futex = (futex);
    int __private = (private);
    // 原子替换 __futex 的值为 0
    int __oldval = atomic_exchange_rel (__futex, 0);
    // 如果旧值 > 1,说明有人在等待。理由是 lock 的时候 wait 方法会将 __futex 置为 2
    if (__glibc_unlikely (__oldval > 1)) {
        // 既然有人等待,就去唤醒
        lll_futex_wake (__futex, 1, __private);
    }
}

lll_unlock_elision

void
lll_unlock_elision(int *lock, short *adapt_count, int private) {
    __lll_unlock_elision (&(futex), private)
}

int
__lll_unlock_elision(int *lock, int private) {
    // 不考虑 lock 为0 的情况。直接调用上面的 lll_unlock 方法
    lll_unlock (*lock, private);
    return 0;
}

lll_

bool
lll_islocked(int futex) {
    // LLL_LOCK_INITIALIZER == 0
    // futex 不是0 就是在加锁
    return futex != LLL_LOCK_INITIALIZER
}

通用方法

__lll_lock_wait

void __lll_lock_wait(int *futex, int private) {
    // 2 = 0b10
    if (atomic_load_relaxed (futex) == 2)
        goto futex;
    // 强行将 futex 改为 2
    while (atomic_exchange_acquire (futex, 2) != 0) {
        // 只要改之前的值不是 0 就进来等待
    futex:
//        LIBC_PROBE (lll_lock_wait, 1, futex);
        // 翻译一下,只要 futex 的值是2 就陷入等待,直到 futex 的值不是2 
        futex_wait((unsigned int *) futex, 2, private); /* Wait if *futex == 2.  */
    }
}

futex_wait

static __always_inline int
futex_wait(unsigned int *futex_word, unsigned int expected, int private) {
    // lll_futex_timed_wait == 
    //              lll_futex_syscall (4, futexp,                                 
	//	                __lll_private_flag (FUTEX_WAIT, private),  
	//	                val, timeout)
    // 执行 linux 系统调用 FUTEX_WAIT
    int err = lll_futex_timed_wait (futex_word, expected, NULL, private);
    switch (err) {
        case 0:
        case -EAGAIN:
        case -EINTR:
            return -err;
        case -ETIMEDOUT: /* Cannot have happened as we provided no timeout.  */
        case -EFAULT: /* Must have been caused by a glibc or application bug.  */
        case -EINVAL: 
        case -ENOSYS: /* Must have been caused by a glibc bug.  */
        default:
            futex_fatal_error();
    }
}
⚠️ **GitHub.com Fallback** ⚠️