sync rw_semaphore - ceragon/LinuxDoc GitHub Wiki

rw_semaphore

结构体

struct rw_semaphore {
	long			count;
	spinlock_t		wait_lock;
	struct list_head	wait_list;
};
struct rwsem_waiter {
	struct list_head list;
	struct task_struct *task;
	unsigned int flags;
}

初始化

void __init_rwsem(struct rw_semaphore *sem, const char *name, struct lock_class_key *key) {
//    sem->count = RWSEM_UNLOCKED_VALUE;
    sem->count = 0x00000000L;
//    spin_lock_init(&sem->wait_lock);
    do {
//        spinlock_check(&sem->wait_lock);
        &lock->rlock;
        do { 
            // raw_lock 初始化为0 
            *(&(&sem->wait_lock)->rlock) = (raw_spinlock_t) {
                .raw_lock={0},
            }; 
        } while (0);
    } while (0);
    // wait_list 初始化    
    INIT_LIST_HEAD(&sem->wait_list);
}
static inline void INIT_LIST_HEAD(struct list_head *list) {
    list->next = list;
    list->prev = list;
}

read lock

和 semaphore 相比,这里的加锁是信号量 +1,解锁是信号量 -1

  • 如果 +1 后是负数,说明加之前 count < -1
  • 如果 +1 后是正数,说明加之前 count >= -1
void __sched down_read(struct rw_semaphore *sem) {
//	might_sleep();
//	rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

//	LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
//    __down_read(sem)
    asm volatile("# beginning down_read\n\t"
//              LOCK_PREFIX _ASM_INC "(%1)\n\t"
                "lock;" "incq (%1) \n\t" // 等同于 sem->count = sem->count + 1
                /* adds 0x00000001 */
                "  jns        1f\n" // sem->count >= 0 跳转 1 的位置
                "  call call_rwsem_down_read_failed\n" // 函数调用
                "1:\n\t"
                "# ending down_read\n\t"
                : "+m" (sem->count)     // 告诉汇编器结果会输出到内存的这个位置。m 表示内存,+ 表示内存既是输入也是输出
                : "a" (sem)         // 将 *sem 保存到 a 寄存器中,也就是 rax寄存器中
                : "memory", "cc");  // 本汇编修改了内存 和 标志寄存器
}
ENTRY(call_rwsem_down_read_failed)
    // 函数调用开始
	CFI_STARTPROC
	// 保存被调用者寄存器 rdi,rsi,rdx,r8,r9,r10,r11
	save_common_regs
	// 将 rdx 压栈
	pushq_cfi %rdx
	CFI_REL_OFFSET rdx, 0
	// 将 rax 存到 rdi 中 (*sem 存在于 rax 中)
	movq %rax,%rdi
	call rwsem_down_read_failed
	// 弹出栈顶到 rdx
	popq_cfi %rdx
	CFI_RESTORE rdx
	// 恢复被调用者寄存器
	restore_common_regs
	ret
	CFI_ENDPROC
ENDPROC(call_rwsem_down_read_failed)

rwsem_down_read_failed

struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem) {
    return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_READ,
					-RWSEM_ACTIVE_READ_BIAS);
}

read unlock

  • 如果 -1 后是负数,说明减之前 count <= 0
  • 如果 -1 后是正数,说明减之前 count > 0
void up_read(struct rw_semaphore *sem) {
//	rwsem_release(&sem->dep_map, 1, _RET_IP_);
//	__up_read(sem);
    long tmp;
	asm volatile("# beginning __up_read\n\t"
                LOCK_PREFIX 
                // %1 = -RWSEM_ACTIVE_READ_BIAS,%2 = sem->count
                // -RWSEM_ACTIVE_READ_BIAS == -1
                "  xadd      %1,(%2)\n\t"
                /* subtracts 1, returns the old value */
                "  jns        1f\n\t"  // 正值则跳转到 1 处,说明当前没有人在等待 read lock
                "  call call_rwsem_wake\n" /* expects old value in %edx */
                "1:\n"
                "# ending __up_read\n"
                : "+m" (sem->count), "=d" (tmp)
                : "a" (sem), "1" (-RWSEM_ACTIVE_READ_BIAS)
                : "memory", "cc");
}

例子测试

假设两个 read 任务,因为 r 与 r 不互斥,所以加锁和解锁都能成功

任务序列 count 值变化 结果
A read 1 正数,成功
B read 2 正数,成功
A over 1 正数,成功
A over 0 正数,成功

write lock

void __sched down_write(struct rw_semaphore *sem) {
    // 目前只知道是判断是否要进入睡眠
	might_sleep();
//	rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);

//	LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
//    __down_write(sem)    
//    __down_write_nested(sem, 0);
    long tmp;
    asm volatile("# beginning down_write\n\t"
                // %1 = RWSEM_ACTIVE_WRITE_BIAS , %2 = sem -> count
		     LOCK_PREFIX "  xadd      %1,(%2)\n\t"
		     /* adds 0xffff0001, returns the old value */
		     "  test      %1,%1\n\t" // 两个数做逻辑与运算
		     /* was the count 0 before? */
		     "  jz        1f\n"  // 旧值为0,跳转到1
		     "  call call_rwsem_down_write_failed\n"
		     "1:\n"
		     "# ending down_write"
		     : "+m" (sem->count), "=d" (tmp)
		     : "a" (sem), "1" (RWSEM_ACTIVE_WRITE_BIAS)
		     : "memory", "cc");
}
ENTRY(call_rwsem_down_write_failed)
	CFI_STARTPROC
	save_common_regs
	movq %rax,%rdi  // *sem 在rax里,现在移到 rdi
	call rwsem_down_write_failed
	restore_common_regs
	ret
	CFI_ENDPROC
ENDPROC(call_rwsem_down_write_failed)

rwsem_down_write_failed

struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem) {
	return rwsem_down_failed_common(sem, RWSEM_WAITING_FOR_WRITE,
					-RWSEM_ACTIVE_WRITE_BIAS);
}

write unlock

void up_write(struct rw_semaphore *sem) {
//	rwsem_release(&sem->dep_map, 1, _RET_IP_);

//	__up_write(sem);
    long tmp;
    asm volatile("# beginning __up_write\n\t"
                // %1 = -RWSEM_ACTIVE_WRITE_BIAS,%2 = sem -> count
		     LOCK_PREFIX "  xadd      %1,(%2)\n\t"
		     "  jns        1f\n\t"      // 正数跳转到1
		     "  call call_rwsem_wake\n" /* expects old value in %edx */
		     "1:\n\t"
		     "# ending __up_write\n"
		     : "+m" (sem->count), "=d" (tmp)
		     : "a" (sem), "1" (-RWSEM_ACTIVE_WRITE_BIAS)
		     : "memory", "cc");
}

write downgrade

void downgrade_write(struct rw_semaphore *sem) {
//	__downgrade_write(sem);
    asm volatile("# beginning __downgrade_write\n\t"
               LOCK_PREFIX _ASM_ADD "%2,(%1)\n\t"
               /*
                * transitions 0xZZZZ0001 -> 0xYYYY0001 (i386)
                *     0xZZZZZZZZ00000001 -> 0xYYYYYYYY00000001 (x86_64)
                */
               "  jns       1f\n\t"
               "  call call_rwsem_downgrade_wake\n"
               "1:\n\t"
               "# ending __downgrade_write\n"
               : "+m" (sem->count)
               : "a" (sem), "er" (-RWSEM_WAITING_BIAS)
               : "memory", "cc");
}
ENTRY(call_rwsem_downgrade_wake)
	CFI_STARTPROC
	save_common_regs
	pushq_cfi %rdx
	CFI_REL_OFFSET rdx, 0
	movq %rax,%rdi
	call rwsem_downgrade_wake
	popq_cfi %rdx
	CFI_RESTORE rdx
	restore_common_regs
	ret
	CFI_ENDPROC
ENDPROC(call_rwsem_downgrade_wake)
struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem) {
	unsigned long flags;

	spin_lock_irqsave(&sem->wait_lock, flags);

	/* do nothing if list empty */
	if (!list_empty(&sem->wait_list))
		sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);

	spin_unlock_irqrestore(&sem->wait_lock, flags);

	return sem;
}

例子测试

假设两个 write 任务,因为 w 与 w 互斥

任务序列 count 值变化 结果
A write 0xffffffff00000001L 旧值为0,成功
B write 0xfffffffe00000002L 旧值不为0,失败
A over 0xffffffff00000001L 正数,成功
A over 0 正数,成功

通用部分

spin lock

static inline void __raw_spin_lock_irq(raw_spinlock_t *lock) {
    // 关中断
	local_irq_disable();
//	preempt_disable();
//	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    // 与普通的自旋锁逻辑一样
	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
static inline void __raw_spin_unlock_irq(raw_spinlock_t *lock) {
//	spin_release(&lock->dep_map, 1, _RET_IP_);
    // 释放 wait_lock
	do_raw_spin_unlock(lock);
//	local_irq_enable();
    // 开中断
    arch_local_irq_enable();
//	preempt_enable();
}

call_rwsem_wake

ENTRY(call_rwsem_wake)
	CFI_STARTPROC
	// tmp 的值 -1
	decl %edx	/* do nothing if still outstanding active readers */
	// 结果不是0 就跳转到 1
	jnz 1f
	// 保存被调用者寄存器
	save_common_regs
	// rax 复制到 rdi
	movq %rax,%rdi
	call rwsem_wake
	restore_common_regs
1:	ret
	CFI_ENDPROC
ENDPROC(call_rwsem_wake)
struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem) {
	unsigned long flags;
    // 关中断,并自旋抢占 wait_lock
	spin_lock_irqsave(&sem->wait_lock, flags);

	/* do nothing if list empty */
	if (!list_empty(&sem->wait_list))
        // 列表不为空,执行队列唤醒
		sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
    // 开中断,并释放 wait_lock
	spin_unlock_irqrestore(&sem->wait_lock, flags);
	return sem;
}

常量参考

名称 表达值 最终值
RWSEM_ACTIVE_MASK 0xffffffffL 0xffffffffL
RWSEM_UNLOCKED_VALUE 0x00000000L 0x00000000L
RWSEM_ACTIVE_BIAS 0x00000001L 0x00000001L
RWSEM_WAITING_BIAS -RWSEM_ACTIVE_MASK-1 0xffffffff00000000L
RWSEM_ACTIVE_READ_BIAS RWSEM_ACTIVE_BIAS 0x00000001L
RWSEM_ACTIVE_WRITE_BIAS RWSEM_WAITING_BIAS + RWSEM_ACTIVE_BIAS 0xffffffff00000001L

rwsem_down_failed_common

read 和 write 都会调用

static struct rw_semaphore __sched * rwsem_down_failed_common(struct rw_semaphore *sem,
			 unsigned int flags, signed long adjustment) {
    struct rwsem_waiter waiter;
    // 当前任务或进程
	struct task_struct *tsk = current;
	signed long count;
    // 设置当前任务不可打断
	set_task_state(tsk, TASK_UNINTERRUPTIBLE);
    
//    spin_lock_irq(&sem->wait_lock);
//    raw_spin_lock_irq(&lock->rlock);
//    _raw_spin_lock_irq(lock);
    // 获取 wait_lock 自旋锁,且关中断
    __raw_spin_lock_irq(lock);
    
	waiter.task = tsk;
	waiter.flags = flags;
    
    // 原子的将 usage + 1
//	get_task_struct(tsk);
    atomic_inc(&(tsk)->usage);

	if (list_empty(&sem->wait_list)) // 等待队列为空
        // RWSEM_WAITING_BIAS = 0xffffffff00000000L
        // =========== read ==========
        // adjustment = -RWSEM_ACTIVE_READ_BIAS = -0x00000001L = 0xffffffffffffffffL
        // adjustment = 0xfffffffeffffffffL
        // =========== read ==========
        // =========== write ==========
        // adjustment = -RWSEM_ACTIVE_WRITE_BIAS = -0xffffffff00000001L = 0xffffffffL
        // adjustment = 0xffffffffffffffffL
        // =========== write ==========
		adjustment += RWSEM_WAITING_BIAS;
    // 将当前任务加到等待队列的队尾
	list_add_tail(&waiter.list, &sem->wait_list);
    
//    count = rwsem_atomic_update(adjustment, sem);
    long tmp = adjustment;
	asm volatile(LOCK_PREFIX 
                "xadd %0,%1" // tmp = sem->count; sem->count += tmp 
                : "+r" (tmp), "+m" (sem->count)
                :
                : "memory");
    // =========== read ==========
    // tmp = sem->count 的旧值。会进入到这个方法,说明旧值现在是负数 < 0 
    // adjustment = 0xfffffffeffffffffL
    // 假设 tmp 是 -1,则 count = 0xfffffffefffffffeL
    // =========== read ==========
    // =========== write ==========
    // tmp = sem->count 的旧值。会进入到这个方法,说明旧值现在不是0
    // adjustment = 0xffffffffffffffffL
    // 假设 tmp 是 1,则 count = 0L
    // =========== write ==========
	count = tmp + adjustment;
    
    // 如果没有活动锁,唤醒前面排队的进程。
    // 或者,如果我们从失败的 down_write() 调用,
    //      已经有线程在我们之前排队并且没有活动的写入者,那么锁必须是读拥有的;
    // 所以我们尝试唤醒任何排在我们前面的读锁。
    
    // RWSEM_WAITING_BIAS = 0xffffffff00000000L
    // =========== read ==========
    // 则 count = x + 0xfffffffeffffffffL。如果要满足第一个条件,则 x = 1。
    // =========== read ==========
    // =========== write ==========
    // 则 count = x + 0xffffffffffffffffL。如果要满足第一个条件,则 x = -0xffffffff00000000L
    // =========== write ==========
    if (count == RWSEM_WAITING_BIAS)
		sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
	else if (count > RWSEM_WAITING_BIAS &&
		 adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
		sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);

//	spin_unlock_irq(&sem->wait_lock);
//    raw_spin_unlock_irq(&lock->rlock);
    // 释放 wait_lock
    __raw_spin_unlock_irq(&lock->rlock)

	for (;;) {
        // 没有看懂
		if (!waiter.task)
			break;
        // 移交 cpu 控制权
		schedule();
        // 获得时间片
		set_task_state(tsk, TASK_UNINTERRUPTIBLE);
	}
    // 任务状态改为运行中
	tsk->state = TASK_RUNNING;

	return sem;
}

__rwsem_do_wake

当现在可以运行的进程被阻塞时处理锁释放

  • 如果我们从 up_xxxx() 来到这里,那么:
    • count (&0x0000ffff) 的“活动部分”达到 0(但可能已更改)
    • count (&0xffff0000) 的“等待部分”是 -ve(并且仍然如此)
  • 必须有人在队列中
  • 自旋锁必须由调用者持有
  • 任务清零后,唤醒进程块从列表中丢弃
  • 仅当降级为假时才唤醒作家
static struct rw_semaphore *__rwsem_do_wake(struct rw_semaphore *sem, int wake_type) {
    struct rwsem_waiter *waiter;
    struct task_struct *tsk;
    struct list_head *next;
    signed long oldcount, woken, loop, adjustment;
    // 获取当前等待队列的下一个等待者。list_entry 宏的意思是 sem->wait_list.next 只是指向了目标的 list_head,
    // 还需要一些方法才能拿到对应的本体的指针(看一下 rwsem_waiter 的结构体就能理解)
    waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
    // 判断这个 waiter 是否等待非写锁,也就是读锁
    if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE))
        goto readers_only;
    // 走到这儿,说明 waiter 在等待写锁
    if (wake_type == RWSEM_WAKE_READ_OWNED)
        goto out;
    // RWSEM_ACTIVE_WRITE_BIAS = 0xffffffff00000001L
    adjustment = RWSEM_ACTIVE_WRITE_BIAS;
    // 这么判断的意思是,waiter 是否是最后一个元素
    if (waiter->list.next == &sem->wait_list)
        // RWSEM_WAITING_BIAS = 0xffffffff00000000L
        // adjustment = 1L
        adjustment -= RWSEM_WAITING_BIAS;

try_again_write:
    // 迷之操作,多了一次 -adjustment,还不如重写一遍汇编
    oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
    if (oldcount & RWSEM_ACTIVE_MASK)
        // RWSEM_ACTIVE_MASK = 0xffffffffL,说明不管是写还是读,有人正在占有锁
        /* Someone grabbed the sem already */
        goto undo_write;
    // 没人占有锁,把这个等待写的 waiter 从队列移除
    list_del(&waiter->list);
    // 取出等待写的任务
    tsk = waiter->task;
    // 指令屏障,告诉汇编器别指令重排序,也告诉 cpu指令流水线 别乱序执行。
//    smp_mb();
    asm volatile("mfence":: :"memory");
    waiter->task = NULL;
    // 唤醒这个任务,让它开始执行
    wake_up_process(tsk);
    // 没搞懂在干嘛,但不影响阅读
    put_task_struct(tsk);
    goto out;

readers_only:
    //    如果我们从 up_xxxx() 来到这里,在我们获得自旋锁并唤醒等待者之前,
    // 另一个线程可能已经到达 rwsem_down_failed_common(),使其现在处于活动状态。
    // 如果我们最终无法唤醒读者,我们更愿意先检查这一点,以免在自旋锁上花费太多时间。
    //    请注意,我们不需要更新 rwsem 计数:
    // 任何尝试获取 rwsem 的写入器都将运行 rwsem_down_write_failed(),因为等待线程并阻止尝试获取自旋锁。
    //    我们使用虚拟原子更新来专门获取缓存行,因为我们希望很快成功并运行最终的 rwsem 计数调整。 
    if (wake_type == RWSEM_WAKE_ANY &&
        //  RWSEM_WAITING_BIAS = 0xffffffff00000000L
        rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
        /* Someone grabbed the sem for write already */
        // 说明已经有人在写了,读锁需要等待
        goto out;

    // 向队列前面的读者授予无限数量的读锁。 
    // 请注意,在唤醒任何进程之前,我们将计数的“活动部分”增加阅读器的数量。 
    woken = 0;
    do {
        woken++;
        // 是否是最后一个等待者
        if (waiter->list.next == &sem->wait_list)
            break;
        // 取出下一个等待者
        waiter = list_entry(waiter->list.next, struct rwsem_waiter, list);
        // 只要后面都是读者,就继续循环。直到到头或者遇到写者
    } while (waiter->flags & RWSEM_WAITING_FOR_READ); 

    // RWSEM_ACTIVE_READ_BIAS = 0x00000001L
    adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
    // 由于是 do-while,所以 waiter 很有可能是个写者
    if (waiter->flags & RWSEM_WAITING_FOR_READ)
        // RWSEM_WAITING_BIAS = 0xffffffff00000000L 如果是读者,则 adjustmen >= 0x100000001L
        /* hit end of list above */
        adjustment -= RWSEM_WAITING_BIAS;
    // sem->count + adjustment
    rwsem_atomic_add(adjustment, sem);
    // 当前的第一位等待者
    next = sem->wait_list.next;
    for (loop = woken; loop > 0; loop--) {
        waiter = list_entry(next, struct rwsem_waiter, list);
        next = waiter->list.next;
        tsk = waiter->task;
        // 指令屏障
        smp_mb();
        // 这就task只要为空,对方就能获得锁
        waiter->task = NULL;
        // 唤醒这些读者,因为读读不互斥
        wake_up_process(tsk);
        // 先不管这个
        put_task_struct(tsk);
    }
    // 重置一下等待队列
    sem->wait_list.next = next;
    next->prev = &sem->wait_list;

out:
    return sem;

  /* undo the change to the active count, but check for a transition
   * 1->0 */
undo_write:
    // 根据上文,adjustment = 0xffffffff00000001L 或者 1
    // RWSEM_ACTIVE_MASK = 0xffffffffL
    // 将写修改的信息还原,如果此时还被人持有锁,就退出
    if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
        goto out;
    // 说明锁被释放了,再次尝试写
    goto try_again_write;
}
static inline long rwsem_atomic_update(long delta, struct rw_semaphore *sem){
    long tmp = delta;

    asm volatile(LOCK_PREFIX "xadd %0,%1"
                : "+r" (tmp), "+m" (sem->count)
                :
                : "memory");
    return tmp + delta;
}
static inline void rwsem_atomic_add(long delta, struct rw_semaphore *sem){
	asm volatile(LOCK_PREFIX _ASM_ADD "%1,%0"
		     : "+m" (sem->count)
		     : "er" (delta));
}
⚠️ **GitHub.com Fallback** ⚠️