Sleep and wakeup 链接到标题

Sleep 允许一个内核线程等待某个特定事件的发生,另一个线程可以调用 wakeup 来表示这个正在等待时间发生的线程应该恢复了。

Sleep and wakeup are often called sequence cooridination or conditional synchronization mechanisms.

xv6 中利用 sleepwakeup 实现了一种 high-level 的同步机制,被称为信号量(semaphore),用于协调生产者和消费者(xv6 中并未使用信号量)。

A semaphore maintains a count and provides two operations. The “V” operation (for the producer) increments the count. The “P” operation (for the consumer) waits until the count is non-zero, and then decrements it and returns.

struct semaphore {
    struct spinlock lock;
    int count;
};
void V(struct semaphore *s) {
    acquire(&s->lock);
    s->count += 1;
    // wakeup(s); 
    release(&s->lock);
}
void P(struct *s) {
    while (s->count == 0) 
        //sleep(s);
        ;
    acquire(&s->lock);
    s->count -= 1;
    release(&s->count);
}

上述代码给出了一个非常简单但是性能不优秀的“生产者 - 消费者”模型实现,如果生产者很少工作,那么消费者会花费大量时间在 while 循环中。为了避免这一点,消费者应该要有办法主动让出 cpu,并且只在 V 递增 s->count 之后才恢复执行。

注释中给出了一种简单的实现办法。然而,仅仅是这样并不够,因为存在一个 lost wake-up 问题。

(先考虑一个 P 和一个 V 的情况)假设 P 执行完 while (s->count == 0),正准备执行 sleep(s),这时候 V 递增了 s->count,然后准备唤醒一个线程,然而由于这时 P 还没有 sleep,因此没有线程可以被 V 唤醒,那么即使 s->count 已经不为 $0$ 了,P 还是执行了 sleep(s),然而,除非 V 再次递增 s->count,否则 P 永远不会被唤醒。这就是 lost wake-up,即有一次 wake-up 丢失了,本该唤醒进程却没有,而 P 一直在等待这个已经发生了的唤醒。

如果简单的把 acquire(&s->lock) 移动到 P 的最开始,看起来可以解决问题,但是会造成更严重的死锁问题,P 持有锁,然后 sleep,因此 P 在被唤醒之前不会主动释放锁,而 V 需要持有锁才能唤醒 P。

为了解决 lost wake-up 问题,同时避免死锁,我们需要修改一下 sleep 的接口,

void P(struct semaphore *s) {
    acquire(&s->lock);
    while(s->count == 0) 
        sleep(s, &s->lock)
    s->count -= 1;
    release(&s->lock);
}

在 sleep 中,我们会在 proc 已经被标记成 asleep 之后再释放 s->lock,这就保证了 V 直到 P 让自己完成睡眠之后才能尝试递增计数和唤醒进程。我们这里可以将 s->lock 称为 condtion lock

Code: Sleep and wakeup 链接到标题

xv6 的 sleepwakeup 的实现机制与使用规则避免了 lost wake-up

xv6 的 sleep 的基础机制就是 sleep 将当前进程标记为 SLEEPING,并调用 sched 来让出 cpu;而 wakeup 寻找等待队列中一个 sleeping 的进程,然后将它标记为 RUNNABLE。xv6 一般使用涉及 wait 的内核数据结构的地址作为 channel

void sleep(void *chan, struct spinlock *lk) {
    struct proc *p = myproc();
    acquire(&p->lock); // DOC: sleeplock1
    release(lk);
    // Go to sleep.
    p->chan = chan;
    p->state = SLEEPING;
    sched();
    // Tidy up.
    p->chan = 0;
    // Reacquire original lock.
    release(&p->lock);
    acquire(lk);
}

当要 sleep 的进程持有了 &p->lock 之后,就可以释放 s->lock(即 lk 了),持有 lk 是为了避免在进程进入睡眠之前,就有 V 来唤醒它,造成 lost wake-up,当持有 &p->lock 之后,由于 wakeup 进程需要持有该进程的锁,因此持有 &p->lock 就能避免该进程在进入睡眠之前就被唤醒了,于是我们可以释放 lk 了。而 &p->lock 会在完成睡眠,调用 sched 之后,由 scheduler 线程释放,这里跟定时器中断导致的进程切换是一致的。

理解了 sleep,再看 wakeup 其实就不难理解了。

void wakeup(void *chan) {
    struct proc *p;
    for (p = proc; p < &proc[NPROC]; p++) {
        if (p != myproc()) {
            acquire(&p->lock);
            if (p->state == SLEEPING && p->chan == chan) {
                p->state = RUNNABLE;
            }
            release(&p->lock);
        }
    }
}

虚假唤醒:有时候可能会出现多个进程睡眠在同一个 channel 上的情况,例如多个进程从同一个 pipe 中读取数据,一次 wakeup 调用会唤醒所有的这些进程,但是当一个进程被 scheduler 线程选择,将数据读取完之后,递减计数,于是其他睡眠的进程会发现,自己虽然被唤醒了,但是 s->count 依旧是 $0$,这种情况我们被称为虚假唤醒,由于 sleep 处于 while(s->count == 0) 的循环中,因此会被困在循环中,再次睡眠,因此虚假唤醒这个问题是可以接受的。

这里 sleep 最后的 acquire 保证了,即使一次有多个睡眠进程被标记为 RUNNABLE,也只有一个进程能够获取到 lk,执行生产者的操作并递减 s->count

sleepwakeup 的吸引力在于它们都非常轻量,不需要创建特定数据结构来充当 sleep 的 channel,并且提供了一层间接性,调用者不需要知道具体与哪个进程交互。

Code: Pipes 链接到标题

这里要主要是理解 pipereadpipewrite,这要求我们注意两点:一是 sleep 会申请进程锁再释放 pi->lock,scheduler 线程切换回 sleep 之后,会释放进程锁再申请 pi->lock。而 wakeup 只会申请与释放进程锁。

pipereadpi->nread == pi->nwrite 就起到了 s->count == 0 的作用,在 pipewrite 中则是 pipewrite == piperead + PIPESIZEpipewrite 是生产者, piperead 是消费者。

pipe 的实现针对 reader 和 writer 使用了不同的 channel(&pi->nread&pi->nwrite)。

int pipewrite(struct pipe *pi, uint64 addr, int n) {
    int i = 0;
    struct proc *pr = myproc();

    acquire(&pi->lock);
    while (i < n) {
        if (pi->readopen == 0 || pr->killed) {
            release(&pi->lock);
            return -1;
        }
        if (pi->nwrite == pi->nread + PIPESIZE) { // DOC: pipewrite-full
            wakeup(&pi->nread);
            sleep(&pi->nwrite, &pi->lock);
        } else {
            char ch;
            if (copyin(pr->pagetable, &ch, addr + i, 1) == -1)
                break;
            pi->data[pi->nwrite++ % PIPESIZE] = ch;
            i++;
        }
    }
    wakeup(&pi->nread);
    release(&pi->lock);

    return i;
}

int piperead(struct pipe *pi, uint64 addr, int n) {
    int i;
    struct proc *pr = myproc();
    char ch;

    acquire(&pi->lock);
    while (pi->nread == pi->nwrite && pi->writeopen) { // DOC: pipe-empty
        if (pr->killed) {
            release(&pi->lock);
            return -1;
        }
        sleep(&pi->nread, &pi->lock); // DOC: piperead-sleep
    }
    for (i = 0; i < n; i++) { // DOC: piperead-copy
        if (pi->nread == pi->nwrite)
            break;
        ch = pi->data[pi->nread++ % PIPESIZE];
        if (copyout(pr->pagetable, addr + i, &ch, 1) == -1)
            break;
    }
    wakeup(&pi->nwrite); // DOC: piperead-wakeup
    release(&pi->lock);
    return i;
}

Code: wait, exit and kill 链接到标题

wait 首先会申请 condtion lock,即 wait_lock,保证 wait 不会错过 exitwakeup,然后寻找子进程中的 ZOMBIE 进程,释放其资源和 proc 结构体,拷贝子进程的 exit status,释放锁,并返回子进程的 pid。如果 wait 发现没有子进程退出,就会调用 sleep 来等待子进程退出。被退出的子进程调用 wakeup 唤醒并由 scheduler 线程返回到 sleep 的下一条语句之后,它会继续执行查找 ZOMBIE 子进程并释放其资源的任务。

假设进程 a 调用 exit,那么exit 记录了 exit status,释放进程 a 的一些资源,调用 reparent 把 a 的子进程的转让给 init 进程,即进程 a 的子进程的父进程,会变成 init 进程。exit 还会将进程 a 的状态标记为 ZOMBIE,永远让出 cpu(毕竟 scheduler 线程只会选择 RUNNABLE 进程)。exit 会持有 wait_lockp->lockwait_lock 是生产者 exit 和消费者 wait 的 condtion lock,而持有 &p->lock 是为了防止 waitexit 调用 swtch 之前就观察到该进程是 ZOMBIE 的了。为了防止死锁,exitwait 申请锁的顺序是相同的

kill 则只是将进程 p->killed 标记为 $1$,将进程状态从 SLEEPING 修改为 RUNNABLE,从而让进程能被唤醒,然后回到 usertrap.c 调用 exit 自杀,如果 victim 进程本身是在 user space 中运行的,那么它很快会因为定时器中断进入 usertrap 然后调用 exit

kill 的将状态从 SLEEPING 修改为 RUNNABLE 可能带来隐患,睡眠中的进程可能没等到它需要的条件就被唤醒了,因此 while 循环中还需要判断 p->killed 的状态,如果被设置了,那么就放弃当前的活动,例如 pipe。

也有一些 xv6 的 sleep 所处的 while 循环不检查 p->killed,因为代码应该是原子操作的多步系统调用的中间,例如 virtio 驱动程序,它不检查 p->killed,等待磁盘 I/O 时被杀死的进程将不会退出,直到它完成当前系统调用并且 usertrap 看到了 p->killed 标志。

Process Locking 链接到标题

进程的锁(p->lock)应该是 xv6 中最复杂的锁了。在进程要读写 struct procp->state, p->chan, p->killed, p->xstate, p->pid 等字段时,都必须持有 p->lock,因为这些字段可能被其他进程,或者其他 cpu 上的调度器线程访问。

p->lock 做了以下事情:

  • p->state 一起,避免从 proc[] 数组创建新线程时的 race condition;
  • 当进程被创建或者销毁时,避免它被别的进程看到;
  • 避免父进程在 wait 循环中看到一个状态为 ZOMBIE 但是还没有让出 cpu 的子进程;
  • 防止其他 cpu 的 scheduler 线程去运行一个马上要让出 cpu,已经将状态设置为 RUNNABLE 但是还没有结束 swtch 调用的进程;
  • 保证一个 RUNNABLE 的进程只会被一个 scheduler 线程运行;
  • 防止进程在调用 swtch 中时,被定时器中断再驱使着去执行 yield
  • 防止 wakeup 进程看到有状态处于 SLEEP 但是还没有完成让出 cpu 的动作的进程;
  • 使得 kill 的检查 p->pid 和设置 p->killed 原子化;
  • It prevents the victim process of kill from exiting and perhaps being re-allocated between kill’s check of p->pid and setting p->killed.
    • 假设没有这个 p->lock 的话,它可能检查完 p->pid 之后,这个调用 killed 的进程就被暂停了,然后让出了 cpu,这时一个新进程被分配,并且二者具有相同的 pid,因此导致新进程受到影响,而不是影响原来的目标进程。