sync semaphore - ceragon/LinuxDoc GitHub Wiki

semaphore

结构定义

struct semaphore {
    // 自旋锁
    spinlock_t		lock;
    // 信号量
    unsigned int		count;
    // 等待列表
    struct list_head	wait_list;
};
typedef struct spinlock {
    union {
        struct raw_spinlock rlock;
    }
} spinlock_t;

typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
} raw_spinlock_t;

typedef struct arch_spinlock {
    unsigned int slock;
} arch_spinlock_t;
struct spinlock {
    struct raw_spinlock {
        struct arch_spinlock {
            unsigned int slock;
        } raw_lock;
    } rlock;
} spinlock_t;
struct semaphore_waiter {
   struct list_head list;
   struct task_struct *task;
   int up;
};
struct task_struct {
    void *stack;
};

初始化

struct lock_class_key { };
static inline void sema_init(struct semaphore *sem, int val) {
    // 一个空的结构体
   static struct lock_class_key __key;
//  *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
    *sem = (struct semaphore) {
        .lock = (spinlock_t){{ 
            .rlock = {
                .raw_lock = {0},
            } 
        }},
        .count = val, 
        .wait_list = {
                // next
                &((*sem).wait_list),
                //prev 
                &((*sem).wait_list)
        },
    }
//  lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
    do {
        (void)("semaphore->lock");
        (void)(&__key);
    } while (0)
}

down

void down(struct semaphore *sem) {
   unsigned long flags;
//  spin_lock_irqsave(&sem->lock, flags);
    do {
        do {
            ({
                unsigned long __dummy;
                typeof(flags) __dummy2;
                (void)(&__dummy == &__dummy2);
                1;
            });
            flags = _raw_spin_lock_irqsave(spinlock_check(&sem->lock));
        } while (0);
    } while (0);
    if (likely(sem->count > 0))
        // 信号量充足
        sem->count--;
    else
        // 需要陷入等待中
        __down(sem);
    spin_unlock_irqrestore(&sem->lock, flags);
}

__down

static noinline void __sched __down(struct semaphore *sem){
    // TASK_UNINTERRUPTIBLE == 2,MAX_SCHEDULE_TIMEOUT == Long.MAX
    __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state, long timeout){
// struct task_struct *task = current;
    struct task_struct *task = get_current();
    struct semaphore_waiter waiter;
    // new: waiter.list, head: sem->wait_list
    // 将 new 加到 head 前面,所以 waiter.list -> next = sem->wait_list
    list_add_tail(&waiter.list, &sem->wait_list);
    // 由于 waiter 是 tail,所以当前 task 也被放到了 tail
    waiter.task = task;
    // 唤醒标记,默认没有唤醒
    waiter.up = 0;
    for (;;) {
        if (signal_pending_state(state, task))
            goto interrupted;
        if (timeout <= 0)
            goto timed_out;
    // 设置进程为 TASK_UNINTERRUPTIBLE
//  __set_task_state(task, state);
    do { (task)->state = (state); } while (0)
//  spin_unlock_irq(&sem->lock);
    // 释放自旋锁    
    __raw_spin_unlock_irq(&sem->lock->rlock)
    // 进程进入sleep状态,直到 timeout 或者被唤醒
    timeout = schedule_timeout(timeout);
    // 进程被唤醒了,那么就尝试加锁
//  spin_lock_irq(&sem->lock);
    _raw_spin_lock_irq(&sem->lock->rlock)
    // 当有进程调用了 up,就会把 waiter_list 的头部进程唤醒,也就是 waiter.up = 1
    if (waiter.up)
        return 0;
    }

 timed_out:
    list_del(&waiter.list);
    return -ETIME;

 interrupted:
    list_del(&waiter.list);
    return -EINTR;
}

signal_pending_state

static inline int signal_pending_state(long state, struct task_struct *p) {
//	if (!(state & (TASK_INTERRUPTIBLE | TASK_WAKEKILL)))
    if (!(state & (0b1 | 0b1000_0000)))
        return 0;
//	if (!signal_pending(p))
    // TIF_SIGPENDING == 2
    if (unlikely(test_ti_thread_flag(((struct thread_info *) (p)->stack), TIF_SIGPENDING)))
        return 0;

    return (state & TASK_INTERRUPTIBLE) || __fatal_signal_pending(p);
}

schedule_timeout

signed long __sched schedule_timeout(signed long timeout){
    switch (timeout) {
    	case MAX_SCHEDULE_TIMEOUT:
            // 进程调度,时间片切换给其他进程,也可以理解是让进程 sleep
            schedule();
            goto out;
    }
out:
    return timeout < 0 ? 0 : timeout;
}

__raw_spin_unlock_irq

static inline void __raw_spin_unlock_irq(raw_spinlock_t *lock) {
//  spin_release(&lock->dep_map, 1, _RET_IP_);
    do {
    } while (0)
//  do_raw_spin_unlock(lock);
//  arch_spin_unlock(&lock->raw_lock);
//  __ticket_spin_unlock(lock);
    asm volatile(UNLOCK_LOCK_PREFIX "incb %0"
		     : "+m" (lock->slock)
		     :
		     : "memory", "cc");   
//  __release(lock);
    (void)0;
    // 中断启用
//  local_irq_enable();
    do {
        do {
        } while (0);
//      arch_local_irq_enable();
//      native_irq_enable();
        asm volatile("sti": : :"memory");
    } while (0)
        
//  preempt_enable();
    do {
    } while (0)
}

spin_lock_irq

static inline void spin_lock_irq(spinlock_t *lock) {
//  raw_spin_lock_irq(&lock->rlock);
//  _raw_spin_lock_irq(lock)
    __raw_spin_lock_irq(lock);
}
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock) {
//  local_irq_disable();
    do {
//      arch_local_irq_disable();
//      native_irq_disable();
        // 关中断
        asm volatile("cli": : :"memory");
        do {
        } while (0);
    } while (0);
//  preempt_disable();
    do {
    } while (0)
        
//  spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    do {
    } while (0)    
        
//  LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
//  do_raw_spin_lock(lock)
//  __acquire(lock);
    (void)0    
//  arch_spin_lock(&lock->raw_lock);    
//  __ticket_spin_lock(lock);
    short inc = 0x0100;
    asm volatile (
        LOCK_PREFIX "xaddw %w0, %1\n"
        "1:\t"
        "cmpb %h0, %b0\n\t"
        "je 2f\n\t"
        "rep ; nop\n\t"
        "movb %1, %b0\n\t"
        /* don't need lfence here, because loads are in-order */
        "jmp 1b\n"
        "2:"
        : "+Q" (inc), "+m" (lock->slock)
        :
        : "memory", "cc");
}

up

void up(struct semaphore *sem) {
    unsigned long flags;
//  spin_lock_irqsave(&sem->lock, flags);
    do {
        do {
            ({
                unsigned long __dummy;
                typeof(flags) __dummy2;
                (void) (&__dummy == &__dummy2);
                1;
            });
            flags = _raw_spin_lock_irqsave(spinlock_check(&sem->lock));
        } while (0);
    } while (0)
    // 是否有人在等待
    if (likely(list_empty(&sem->wait_list)))
        // 没有人等待
	    sem->count++;
    else
        // 需要唤醒第一个等待的人
        __up(sem);
    spin_unlock_irqrestore(&sem->lock, flags);
}

__up

static noinline void __sched __up(struct semaphore *sem) {
    // ptr: 元素的头部. type: 结构体. member: 结构体中的元素名称
    // 获取 *sem 的第一个元素
    struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
						struct semaphore_waiter, list);
//  list_del(&waiter->list);
    __list_del(&waiter->list->prev, &waiter->list->next); // prev,next
//  next->prev = prev;
//  prev->next = next;
    &waiter->list->next = LIST_POISON1; // ((void *) 0x00100100 + (0xdead000000000000UL))
    &waiter->list->prev = LIST_POISON2; // ((void *) 0x00200200 + (0xdead000000000000UL))
    // 唤醒标记
    waiter->up = 1;
//  wake_up_process(waiter->task);
    // TASK_ALL = (TASK_NORMAL | __TASK_STOPPED | __TASK_TRACED)
    // 尝试把线程唤醒
    try_to_wake_up(waiter->task, TASK_ALL, 0);
}

通用方法

__raw_spin_lock_irqsave

static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock) {
	unsigned long flags;
//  local_irq_save(flags);
    do {
      do {
        ({
          unsigned long __dummy;
          typeof(flags) __dummy2;
          (void)(&__dummy == &__dummy2);
          1;
        });
        flags = arch_local_irq_save();
      } while (0);
      do {
      } while (0);
    } while (0)
//  preempt_disable();
    do {
    } while (0)
//  spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    do {
    } while (0)
    /*
    * On lockdep we dont want the hand-coded irq-enable of
    * do_raw_spin_lock_flags() code, because lockdep assumes
    * that interrupts are not re-enabled during lock-acquire:
    */
    do_raw_spin_lock_flags(lock, &flags);
    return flags;
}
static inline unsigned long arch_local_irq_save(void) {
//  unsigned long flags = arch_local_save_flags();
    unsigned long flags;
    /*
     * "=rm" is safe here, because "pop" adjusts the stack before
     * it evaluates its effective address -- this is part of the
     * documented behavior of the "pop" instruction.
     */
    asm volatile("# __raw_save_flags\n\t"
		     "pushf ; pop %0" // pushf: 将标记寄存器的值压栈,
		     : "=rm" (flags)
		     : /* no input */
		     : "memory");
//  arch_local_irq_disable();
    // 关中断
    asm volatile("cli": : :"memory");
    return flags;
}
static inline void do_raw_spin_lock_flags(raw_spinlock_t *lock, unsigned long *flags) __acquires(lock) {
//  __acquire(lock);
    (void)0;
//  arch_spin_lock_flags(&lock->raw_lock, *flags);
//  arch_spin_lock(lock);
//  __ticket_spin_lock(lock);
    // 分为 01 | 00 两部分。前面表示队列的尾部,后面表示队列的头部
    short inc = 0x0100;
    asm volatile (
//      LOCK_PREFIX "xaddw %w0, %1\n"
        // 1、temp = inc + lock->slock。2、inc = lock->slock,
        // 3、lock->slock = temp. 
        // 情况1:inc=0, lock=0x0100
        // 情况2:inc=0x0100, lock=0x0200
        "lock; xaddw %w0, %1\n" 
        "1:\t"
        // 比较的单位是 byte(8位),取 inc(short 总共16位) 所在寄存器的高 8 位,减去低 8 位
        // 比较 inc 的高低位,并修改标志寄存器
        "cmpb %h0, %b0\n\t" 
        // 情况1:inc=0,满足条件跳转成功
        // 情况2:inc=0x01_00,不满足条件,跳转失败
        "je 2f\n\t"     // 如果高8位和低8位相等就跳转到 2f 的位置
        "rep ; nop\n\t" // 空转
        // 将 lock->slock 的值存到 inc 所在寄存器的低8位
        // 将 lock->slock 的低位覆盖 inc 的低位
        "movb %1, %b0\n\t" 
        /* don't need lfence here, because loads are in-order */
        "jmp 1b\n"
        "2:"
        : "+Q" (inc), "+m" (lock->slock) // +Q 使用 abcd 寄存器,且 + 表示既是输出也是输入。m 表示数据在内存中
        :
        : "memory", "cc");  // 内存会被修改,条件码寄存器也会被修改
}

spin_unlock_irqrestore

static inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags){
//  raw_spin_unlock_irqrestore(&lock->rlock, flags);
    do {
        ({
            unsigned long __dummy;
            typeof(flags) __dummy2;
            (void)(&__dummy == &__dummy2);
            1;
        });
        _raw_spin_unlock_irqrestore(&lock->rlock, flags);
    } while (0)
}
static inline void __raw_spin_unlock_irqrestore(raw_spinlock_t *lock, unsigned long flags){
//  spin_release(&lock->dep_map, 1, _RET_IP_);
    do {
    } while (0)
//  do_raw_spin_unlock(lock);
    arch_spin_unlock(&lock->raw_lock);
//  __release(lock);    
    (void)0;
    local_irq_restore(flags);
    preempt_enable();
}
static __always_inline void arch_spin_unlock(arch_spinlock_t *lock){
//	__ticket_spin_unlock(lock);
    asm volatile(UNLOCK_LOCK_PREFIX "incb %0"
            : "+m" (lock->slock)
            :
            : "memory", "cc");
}
static inline void arch_local_irq_enable(void){
//  native_irq_enable();
    asm volatile("sti": : :"memory");
}

关于自旋锁的实现验证

两个task

顺序 lock 高位 lock 低位 inc 高位 inc 低位
task1-lock 01 00 00 00
task2-lock 02 00 01 00
task1-unlock 02 01
task2-lock-retry 02 01 01 01
task2-unlock 02 02

总结

down 的步骤如下:

  1. 禁用中断,确保当前down流程不会被中断给切走。
  2. 使用公平自旋锁加锁。加锁是防止 CPU核心之间竞争。将2字节的 rlock 值分为高字节和低字节。 加锁时将高字节 +1,将高字节的值作为加锁队列的索引。比较 rlock 旧值的高字节和低字节,在以下情况选择:
    1. 如果高字节和低字节相等,则当前无竞争,则加锁成功。
    2. 如果不相等,则有竞争,则自己陷入等待循环。
  3. 锁获取成功,判定信号量的值:
    1. 若信号量 > 0,则信号量 -1,可以继续执行。
    2. 如果信号量 < 1,则将自身加入到等待队列的队尾,等待被唤醒。且此时会释放自旋锁,但依旧关中断。
      1. 被唤醒后重新获取锁,如果当前是被 up 操作唤醒,则获取信号量成功。
  4. 本次获取到信号量,先释放自旋锁,方法是将 rlock 的低位字节 +1。之后重新开启中断。

up 的步骤如下:

  1. 禁用中断
  2. 使用公平自旋锁加锁
  3. 根据等待队列是否为空,做出如下选择:
    1. 如果等待队列为空,则直接将信号量+1
    2. 如果等待队列不为空,则唤醒等待队列中的队首,并删除该元素。
  4. 释放自旋锁,并开启中断。
⚠️ **GitHub.com Fallback** ⚠️