sync futex - ceragon/LinuxDoc GitHub Wiki

futex

说明

  • futex() 系统调用提供了一种等待某个条件成立的方法。
  • 它通常用作共享内存同步上下文中的阻塞构造。
  • 使用 futex 时,大多数同步操作都是在用户空间中执行的。
  • 用户空间程序仅在程序可能必须阻塞更长的时间直到条件变为真时才使用 futex() 系统调用。
  • 其他 futex() 操作可用于唤醒任何等待特定条件的进程或线程。
  • futex 是一个 32 位的值——下面称为 futex 字——它的地址被提供给 futex() 系统调用。
  • (Futex 在所有平台上都是 32 位大小,包括 64 位系统。)所有 futex 操作都受此值控制。
  • 为了在进程之间共享 futex,将 futex 放置在共享内存区域中,使用(例如)mmap(2) 或 shmat(2) 创建。
  • (因此,futex 字在不同进程中可能有不同的虚拟地址,但这些地址都指向物理内存中的同一位置。)
  • 在多线程程序中,将 futex 字放在所有线程共享的全局变量中就足够了.
  • 当执行请求阻塞线程的 futex 操作时,仅当 futex 字具有调用线程提供的值(作为 futex() 调用的参数之一)作为 futex 字的预期值时,内核才会阻塞.
  • futex 字的值的加载、该值与预期值的比较以及实际的阻塞将自动发生,并且将相对于其他线程在同一个 futex 字上执行的并发操作完全排序。
  • 因此,futex 字用于连接用户空间的同步和内核的阻塞实现。
  • 类似于可能更改共享内存的原子比较和交换操作,通过 futex 阻塞是原子比较和阻塞操作。
  • futexes 的一种用途是实现锁。锁的状态(即获得或未获得)可以表示为共享内存中的原子访问标志。
  • 在无竞争的情况下,线程可以使用原子指令访问或修改锁定状态,例如使用原子比较和交换指令将其从未获取原子地更改为已获取。
  • (这些指令完全在用户模式下执行,内核不维护有关锁状态的信息。)
  • 另一方面,一个线程可能无法获取锁,因为它已经被另一个线程获取。
  • 然后它可以将锁的标志作为 futex 字和表示获取状态的值作为预期值传递给 futex() 等待操作。
  • 当且仅当锁仍被获取时(即 futex 字中的值仍与“获取状态”匹配),此 futex() 操作才会阻塞。
  • 释放锁时,线程必须首先将锁状态重置为未获取,然后执行 futex 操作,唤醒在用作 futex 字的锁定标志上阻塞的线程(这可以进一步优化以避免不必要的唤醒)。
  • 有关如何使用 futex 的更多详细信息,请参阅 futex(7)。
  • 除了基本的等待和唤醒 futex 功能外,还有更多 futex 操作旨在支持更复杂的用例。
  • 请注意,使用 futex 不需要显式初始化或销毁;内核维护一个 futex(即内核内部实现工件),仅当下面描述的 FUTEX_WAIT 等操作正在对特定 futex 字执行时。

结构体

struct futex_q {
	struct plist_node list;

	struct task_struct *task;
	spinlock_t *lock_ptr;
	union futex_key key;
	struct futex_pi_state *pi_state;
	struct rt_mutex_waiter *rt_waiter;
	union futex_key *requeue_pi_key;
	u32 bitset;
};

系统调用

SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
		struct timespec __user *, utime, u32 __user *, uaddr2,
		u32, val3)
{
    // 省略
    return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
		u32 __user *uaddr2, u32 val2, u32 val3)
{
    int ret = -ENOSYS, cmd = op & FUTEX_CMD_MASK;
    switch (cmd) {
        case FUTEX_WAIT:
            ret = futex_wait(uaddr, flags, val, timeout, val3);
            break;
        case FUTEX_WAKE:
            ret = futex_wake(uaddr, flags, val, val3);
            break;
    }
    return ret;
}

wait

请注意,该方法的主要目的是进入队列等待,直到被唤醒获得信号量。 所以 u32 val 这个参数是进入队列等待的条件,千万不要理解错了。而且通常情况下,这个 val = 0.

只有返回值为 0 的时候才是成功,所以只跟踪赋值为 0 的部分

  • val: uaddr 的值与该值相等才会进入等待队列
  • aba_time: 等待超时时间,0 表示不超时
  • bitset: 设置 futex 的标志位
  • return: 0 表示获取到信号量, EAGAIN==11 表示重试(当前值与目标值不一致)。
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
		      ktime_t *abs_time, u32 bitset)
{
retry:
    // 判断 uaddr 的最新值是否等于 val
    // - 相等:ret == 0,并对 uaddr 对应的 bucket 加锁
    // - 不等:说明信号量满足要求,所以直接返回。
    ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
    // 如果返回值 > 0,大概率是 uaddr 对应的值 不是 val。
    // 直白的说,就是如果 uaddr == val,那么就应该去等待,否则不需要等待
    if (ret) 
		goto out;
    // -----------------------
    // 进入队列等待,直到被其他 task 通过 futex_wake 唤醒
    futex_wait_queue_me(hb, &q, to);
    ret = 0;
    // 正常来说既然被唤醒了,应该就不在队列中了。因为会被唤醒者移除队列
	/* unqueue_me() drops q.key ref */
	if (!unqueue_me(&q))
		goto out;
    // -----------------------
    // 代码省略
out:
	if (to) {
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}
	return ret;
}

unqueue_me

  • 1 - if the futex_q was still queued (and we removed unqueued it)
  • 0 - if the futex_q was already removed by the waking thread
static int unqueue_me(struct futex_q *q) {
    spinlock_t *lock_ptr;
    int ret = 0;
retry:
	lock_ptr = q->lock_ptr;
	barrier();
    // 在其他 task 调用了 wake 之后,会顺带把 lock_ptr 置为 null
	if (lock_ptr != NULL) {
		spin_lock(lock_ptr);
		// q->lock_ptr 可以在读取它和 spin_lock() 之间改变,导致我们获取错误的锁。 这更正了竞态条件。
        // 推理是这样的:如果我们有错误的锁,q->lock_ptr 必须在读取它和 spin_lock() 之间发生变化(可能多次)。
        // 它可以在 spin_lock() 之后再次更改,但前提是它在 spin_lock() 之前已经更改。 
        // 但是,它不能变回原始值。 因此我们可以检测我们是否获得了正确的锁。
		if (unlikely(lock_ptr != q->lock_ptr)) {
			spin_unlock(lock_ptr);
			goto retry;
		}
		__unqueue_futex(q);
		spin_unlock(lock_ptr);
		ret = 1;
	}

	drop_futex_key_refs(&q->key);
	return ret;
}

futex_wait_queue_me

static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
				struct hrtimer_sleeper *timeout)
{
    set_current_state(TASK_INTERRUPTIBLE);
//    queue_me(q, hb);
    int prio;
    // 用于注册此元素的优先级是
    // - 实时线程的实际线程优先级(即优先级低于 MAX_RT_PRIO 的线程)
    // - 或 MAX_RT_PRIO 用于非 RT 线程。 因此,所有 RT 线程按优先级顺序首先被唤醒,其他线程按 FIFO 顺序最后唤醒。
    prio = min(current->normal_prio, MAX_RT_PRIO);
    // 初始化
    plist_node_init(&q->list, prio);
    // 扔到队尾
	plist_add(&q->list, &hb->chain);
    // 赋值 task
	q->task = current;
    // 解锁 futex_hash_bucket,在 futex_wait_setup 执行了 spin_lock
	spin_unlock(&hb->lock);
    
    if (likely(!plist_node_empty(&q->list))) {
        if (!timeout || timeout->task)
            // 交出CPU 时间片,等待被他人唤醒
			schedule();
    }
    // 被唤醒
    __set_current_state(TASK_RUNNING);
}

futex_wait_setup

  • 设置 futex_q 并找到 hash_bucket。
  • 获取 futex 值并将其与期望值进行比较。 在内部处理原子故障。
  • 返回并保持 hb 锁定和成功时的 q.key 引用,并在失败时解锁且没有 q.key 引用。
  • return 0 - uaddr contains val and hb has been locked
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
			   struct futex_q *q, struct futex_hash_bucket **hb)
{
retry:
    // 获取锁对应的 key
	ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key);
	if (unlikely(ret != 0))
		return ret;
retry_private:
    // 对 futex 加锁
	*hb = queue_lock(q);
    // uval 是 uaddr 的最新值
	ret = get_futex_value_locked(&uval, uaddr);
//	if (ret) {
//		queue_unlock(q, *hb);
//		ret = get_user(uval, uaddr);
//		if (ret)
//			goto out;
//		if (!(flags & FLAGS_SHARED))
//			goto retry_private;
//		put_futex_key(&q->key);
//		goto retry;
//	}
    // 最新值与期望值不一致
	if (uval != val) {
        // futex 解锁
		queue_unlock(q, *hb);
		ret = -EWOULDBLOCK;
	}

out:
	if (ret)
		put_futex_key(&q->key);
	return ret;    
}
static int get_futex_value_locked(u32 *dest, u32 __user *from)
{
	int ret;

	pagefault_disable();
    // 原子地把 from 地址的值拷贝到 dest 地址中,且限定长度为 32 位
	ret = __copy_from_user_inatomic(dest, from, sizeof(u32));
    // ret == 0 表示操作成功
	pagefault_enable();

	return ret ? -EFAULT : 0;
}

wake

与 wait 类似,该方法的主要目的就是唤醒等待者,所以不会判断当地是否满足唤醒条件。

  • nr_wake: 唤醒个数
  • bitset: 只唤醒特定的 futex
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
    struct futex_hash_bucket *hb;
	struct futex_q *this, *next;
	struct plist_head *head;
	union futex_key key = FUTEX_KEY_INIT;
    int ret;
    // 主要作用是根据 uaddr 找到对应的 bucket 的 key
    ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key);
	if (unlikely(ret != 0))
		goto out;
    // 获取对应的 bucket
	hb = hash_futex(&key);
    // 对该 bucket 上自旋锁
	spin_lock(&hb->lock);
    head = &hb->chain;
    plist_for_each_entry_safe(this, next, head, list) {
        // 校验key 是否匹配,避免唤醒错误的等待者
		if (match_futex (&this->key, &key)) {
			if (this->pi_state || this->rt_waiter) {
				ret = -EINVAL;
				break;
			}

			/* Check if one of the bits is set in both bitsets */
            // 唤醒的过滤条件,只有满足要求的 task 才会被唤醒
			if (!(this->bitset & bitset))
				continue;
            // 唤醒这个等待中的 task
			wake_futex(this);
            // nr_wake 是本次唤醒的 task 个数
			if (++ret >= nr_wake)
				break;
		}
	}
    // 解锁
    spin_unlock(&hb->lock);
	put_futex_key(&key);
out:
    return ret;
}
static void wake_futex(struct futex_q *q)
{
    struct task_struct *p = q->task;
    get_task_struct(p);
    // 从队列中移除
    __unqueue_futex(q);
//    smp_wmb();
    __asm__ __volatile__("" ::: "memory")
    // 这样就可以保证被唤醒的task可以正常运行
	q->lock_ptr = NULL;
    // 唤醒等待者
	wake_up_state(p, TASK_NORMAL);
	put_task_struct(p);
}

通用接口

queue lock&unlock

static inline struct futex_hash_bucket *queue_lock(struct futex_q *q)
	__acquires(&hb->lock)
{
	struct futex_hash_bucket *hb;
    // 获取对应的 bucket
	hb = hash_futex(&q->key);
    // 当前 futex 中的锁是 bucket 对应的锁
	q->lock_ptr = &hb->lock;
    // 自旋上锁
	spin_lock(&hb->lock);
	return hb;
}
static inline void
queue_unlock(struct futex_q *q, struct futex_hash_bucket *hb)
	__releases(&hb->lock)
{
	spin_unlock(&hb->lock);
}
⚠️ **GitHub.com Fallback** ⚠️