Sleep and wakeup 链接到标题
Sleep 允许一个内核线程等待某个特定事件的发生,另一个线程可以调用 wakeup 来表示这个正在等待时间发生的线程应该恢复了。
Sleep and wakeup are often called sequence cooridination or conditional synchronization mechanisms.
xv6 中利用 sleep 和 wakeup 实现了一种 high-level 的同步机制,被称为信号量(semaphore),用于协调生产者和消费者(xv6 中并未使用信号量)。
A semaphore maintains a count and provides two operations. The “V” operation (for the producer) increments the count. The “P” operation (for the consumer) waits until the count is non-zero, and then decrements it and returns.
struct semaphore {
struct spinlock lock;
int count;
};
void V(struct semaphore *s) {
acquire(&s->lock);
s->count += 1;
// wakeup(s);
release(&s->lock);
}
void P(struct *s) {
while (s->count == 0)
//sleep(s);
;
acquire(&s->lock);
s->count -= 1;
release(&s->count);
}
上述代码给出了一个非常简单但是性能不优秀的“生产者 - 消费者”模型实现,如果生产者很少工作,那么消费者会花费大量时间在 while 循环中。为了避免这一点,消费者应该要有办法主动让出 cpu,并且只在 V 递增 s->count 之后才恢复执行。
注释中给出了一种简单的实现办法。然而,仅仅是这样并不够,因为存在一个 lost wake-up 问题。
(先考虑一个 P 和一个 V 的情况)假设 P 执行完
while (s->count == 0),正准备执行sleep(s),这时候 V 递增了s->count,然后准备唤醒一个线程,然而由于这时 P 还没有 sleep,因此没有线程可以被 V 唤醒,那么即使s->count已经不为 $0$ 了,P 还是执行了sleep(s),然而,除非 V 再次递增s->count,否则 P 永远不会被唤醒。这就是 lost wake-up,即有一次 wake-up 丢失了,本该唤醒进程却没有,而 P 一直在等待这个已经发生了的唤醒。
如果简单的把 acquire(&s->lock) 移动到 P 的最开始,看起来可以解决问题,但是会造成更严重的死锁问题,P 持有锁,然后 sleep,因此 P 在被唤醒之前不会主动释放锁,而 V 需要持有锁才能唤醒 P。
为了解决 lost wake-up 问题,同时避免死锁,我们需要修改一下 sleep 的接口,
void P(struct semaphore *s) {
acquire(&s->lock);
while(s->count == 0)
sleep(s, &s->lock)
s->count -= 1;
release(&s->lock);
}
在 sleep 中,我们会在 proc 已经被标记成 asleep 之后再释放 s->lock,这就保证了 V 直到 P 让自己完成睡眠之后才能尝试递增计数和唤醒进程。我们这里可以将 s->lock 称为 condtion lock。
Code: Sleep and wakeup 链接到标题
xv6 的 sleep 和 wakeup 的实现机制与使用规则避免了 lost wake-up。
xv6 的 sleep 的基础机制就是 sleep 将当前进程标记为 SLEEPING,并调用 sched 来让出 cpu;而 wakeup 寻找等待队列中一个 sleeping 的进程,然后将它标记为 RUNNABLE。xv6 一般使用涉及 wait 的内核数据结构的地址作为 channel。
void sleep(void *chan, struct spinlock *lk) {
struct proc *p = myproc();
acquire(&p->lock); // DOC: sleeplock1
release(lk);
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
release(&p->lock);
acquire(lk);
}
当要 sleep 的进程持有了 &p->lock 之后,就可以释放 s->lock(即 lk 了),持有 lk 是为了避免在进程进入睡眠之前,就有 V 来唤醒它,造成 lost wake-up,当持有 &p->lock 之后,由于 wakeup 进程需要持有该进程的锁,因此持有 &p->lock 就能避免该进程在进入睡眠之前就被唤醒了,于是我们可以释放 lk 了。而 &p->lock 会在完成睡眠,调用 sched 之后,由 scheduler 线程释放,这里跟定时器中断导致的进程切换是一致的。
理解了 sleep,再看 wakeup 其实就不难理解了。
void wakeup(void *chan) {
struct proc *p;
for (p = proc; p < &proc[NPROC]; p++) {
if (p != myproc()) {
acquire(&p->lock);
if (p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
}
虚假唤醒:有时候可能会出现多个进程睡眠在同一个 channel 上的情况,例如多个进程从同一个 pipe 中读取数据,一次 wakeup 调用会唤醒所有的这些进程,但是当一个进程被 scheduler 线程选择,将数据读取完之后,递减计数,于是其他睡眠的进程会发现,自己虽然被唤醒了,但是 s->count 依旧是 $0$,这种情况我们被称为虚假唤醒,由于 sleep 处于 while(s->count == 0) 的循环中,因此会被困在循环中,再次睡眠,因此虚假唤醒这个问题是可以接受的。
这里
sleep最后的acquire保证了,即使一次有多个睡眠进程被标记为 RUNNABLE,也只有一个进程能够获取到lk,执行生产者的操作并递减s->count。
sleep 与 wakeup 的吸引力在于它们都非常轻量,不需要创建特定数据结构来充当 sleep 的 channel,并且提供了一层间接性,调用者不需要知道具体与哪个进程交互。
Code: Pipes 链接到标题
这里要主要是理解 piperead 和 pipewrite,这要求我们注意两点:一是 sleep 会申请进程锁再释放 pi->lock,scheduler 线程切换回 sleep 之后,会释放进程锁再申请 pi->lock。而 wakeup 只会申请与释放进程锁。
piperead 中 pi->nread == pi->nwrite 就起到了 s->count == 0 的作用,在 pipewrite 中则是 pipewrite == piperead + PIPESIZE,pipewrite 是生产者, piperead 是消费者。
pipe 的实现针对 reader 和 writer 使用了不同的 channel(&pi->nread 与 &pi->nwrite)。
int pipewrite(struct pipe *pi, uint64 addr, int n) {
int i = 0;
struct proc *pr = myproc();
acquire(&pi->lock);
while (i < n) {
if (pi->readopen == 0 || pr->killed) {
release(&pi->lock);
return -1;
}
if (pi->nwrite == pi->nread + PIPESIZE) { // DOC: pipewrite-full
wakeup(&pi->nread);
sleep(&pi->nwrite, &pi->lock);
} else {
char ch;
if (copyin(pr->pagetable, &ch, addr + i, 1) == -1)
break;
pi->data[pi->nwrite++ % PIPESIZE] = ch;
i++;
}
}
wakeup(&pi->nread);
release(&pi->lock);
return i;
}
int piperead(struct pipe *pi, uint64 addr, int n) {
int i;
struct proc *pr = myproc();
char ch;
acquire(&pi->lock);
while (pi->nread == pi->nwrite && pi->writeopen) { // DOC: pipe-empty
if (pr->killed) {
release(&pi->lock);
return -1;
}
sleep(&pi->nread, &pi->lock); // DOC: piperead-sleep
}
for (i = 0; i < n; i++) { // DOC: piperead-copy
if (pi->nread == pi->nwrite)
break;
ch = pi->data[pi->nread++ % PIPESIZE];
if (copyout(pr->pagetable, addr + i, &ch, 1) == -1)
break;
}
wakeup(&pi->nwrite); // DOC: piperead-wakeup
release(&pi->lock);
return i;
}
Code: wait, exit and kill 链接到标题
wait 首先会申请 condtion lock,即 wait_lock,保证 wait 不会错过 exit 的 wakeup,然后寻找子进程中的 ZOMBIE 进程,释放其资源和 proc 结构体,拷贝子进程的 exit status,释放锁,并返回子进程的 pid。如果 wait 发现没有子进程退出,就会调用 sleep 来等待子进程退出。被退出的子进程调用 wakeup 唤醒并由 scheduler 线程返回到 sleep 的下一条语句之后,它会继续执行查找 ZOMBIE 子进程并释放其资源的任务。
假设进程 a 调用 exit,那么exit 记录了 exit status,释放进程 a 的一些资源,调用 reparent 把 a 的子进程的转让给 init 进程,即进程 a 的子进程的父进程,会变成 init 进程。exit 还会将进程 a 的状态标记为 ZOMBIE,永远让出 cpu(毕竟 scheduler 线程只会选择 RUNNABLE 进程)。exit 会持有 wait_lock 和 p->lock,wait_lock 是生产者 exit 和消费者 wait 的 condtion lock,而持有 &p->lock 是为了防止 wait 在 exit 调用 swtch 之前就观察到该进程是 ZOMBIE 的了。为了防止死锁,exit 与 wait 申请锁的顺序是相同的。
kill 则只是将进程 p->killed 标记为 $1$,将进程状态从 SLEEPING 修改为 RUNNABLE,从而让进程能被唤醒,然后回到 usertrap.c 调用 exit 自杀,如果 victim 进程本身是在 user space 中运行的,那么它很快会因为定时器中断进入 usertrap 然后调用 exit。
kill 的将状态从 SLEEPING 修改为 RUNNABLE 可能带来隐患,睡眠中的进程可能没等到它需要的条件就被唤醒了,因此 while 循环中还需要判断 p->killed 的状态,如果被设置了,那么就放弃当前的活动,例如 pipe。
也有一些 xv6 的 sleep 所处的 while 循环不检查 p->killed,因为代码应该是原子操作的多步系统调用的中间,例如 virtio 驱动程序,它不检查 p->killed,等待磁盘 I/O 时被杀死的进程将不会退出,直到它完成当前系统调用并且 usertrap 看到了 p->killed 标志。
Process Locking 链接到标题
进程的锁(p->lock)应该是 xv6 中最复杂的锁了。在进程要读写 struct proc 的 p->state, p->chan, p->killed, p->xstate, p->pid 等字段时,都必须持有 p->lock,因为这些字段可能被其他进程,或者其他 cpu 上的调度器线程访问。
p->lock 做了以下事情:
- 与
p->state一起,避免从proc[]数组创建新线程时的 race condition; - 当进程被创建或者销毁时,避免它被别的进程看到;
- 避免父进程在
wait循环中看到一个状态为 ZOMBIE 但是还没有让出 cpu 的子进程; - 防止其他 cpu 的 scheduler 线程去运行一个马上要让出 cpu,已经将状态设置为 RUNNABLE 但是还没有结束
swtch调用的进程; - 保证一个 RUNNABLE 的进程只会被一个 scheduler 线程运行;
- 防止进程在调用
swtch中时,被定时器中断再驱使着去执行yield; - 防止
wakeup进程看到有状态处于 SLEEP 但是还没有完成让出 cpu 的动作的进程; - 使得
kill的检查p->pid和设置p->killed原子化; - It prevents the victim process of
killfrom exiting and perhaps being re-allocated betweenkill’s check ofp->pidand settingp->killed.- 假设没有这个
p->lock的话,它可能检查完p->pid之后,这个调用killed的进程就被暂停了,然后让出了 cpu,这时一个新进程被分配,并且二者具有相同的 pid,因此导致新进程受到影响,而不是影响原来的目标进程。
- 假设没有这个