注:基于xv6教材学习,有大段对翻译笔记的直接引用,以及一部分询问AI和结合自己理解的一些内容。写这份笔记只是为了方便自己复习和理解。


        https://github.com/huihongxiao/MIT6.S081


        xv6教材

        睡眠和唤醒:线程之间不仅需要隔离(通过锁实现),还需要交互(例如等待数据,等待子进程,等待磁盘I/O),xv6使用sleep和wakeup来实现这种条件同步。然后教材开始尝试构建睡眠与唤醒机制。

        第一种:这种实现方式代价很高,如果生产者很少行动,消费者将花费大量时间在while循环上空转。为了避免忙等待,我们需要一种方法能然消费者让出CPU,只在V增加了count之后才恢复运行。

100 struct semaphore {
101   struct spinlock lock;
102   int count;
103 };
104
105 void
106 V(struct semaphore *s)
107 {
108   acquire(&s->lock);
109   s->count += 1;
110   release(&s->lock);
111 }
112
113 void
114 P(struct semaphore *s)
115 {
116   while(s->count == 0)
117     ;
118   acquire(&s->lock);
119   s->count -= 1;
120   release(&s->lock);
121 }

        第二种:引入sleep(chan)和wakeup(chan)。sleep(chan)在任意值chan(称为等待通道wait channel)上休眠,wakeup(chan)唤醒所有在chan上睡眠的进程。消费者在count==0时调用sleep,生产者在count增加后调用wakeup。

        这种写法会导致一个经典的竟态条件:消费者检查条件,发现count==0。然后发生上下文切换或并发执行,生产者紧接着修改条件使count++并调用wakeup。因为消费者尚未来得及调用sleep,wakeup没有唤醒任何人。在调用wakeup之后,消费者才调用sleep入睡。此时消费者永远地丢失了唤醒信号。

        因此,我们必须设计一种机制来保证“检查条件”和“进入睡眠”是一个原子操作,以防止唤醒信号在检查条件和进入睡眠之间发生,然后丢失。

200 void
201 V(struct semaphore *s)
202 {
203   acquire(&s->lock);
204   s->count += 1;
205   wakeup(s);      // 新增:唤醒在 s 上睡眠的线程
206   release(&s->lock);
207 }
208
209 void
210 P(struct semaphore *s)
211 {
212   while(s->count == 0)
213     sleep(s);     // 新增:如果计数为0,则睡眠
214   acquire(&s->lock);
215   s->count -= 1;
216   release(&s->lock);
217 }

        第三种:这种实现中,我们把获取锁的位置提前了,以保证检查条件和进入睡眠操作的原子性。但这个版本会导致死锁,P在谁睡眠的时候持有锁,V将永远被阻塞,因此P永远无法被V唤醒。

300 void
301 V(struct semaphore *s)
302 {
303   acquire(&s->lock);
304   s->count += 1;
305   wakeup(s);
306   release(&s->lock);
307 }
308
309 void
310 P(struct semaphore *s)
311 {
312   acquire(&s->lock);     // 这里的锁提前了
313   while(s->count == 0)
314     sleep(s);            // 带着锁睡眠
315   s->count -= 1;
316   release(&s->lock);
317 }

        第四种:我们通过更改sleep的接口来修改上述方案,调用者必须将条件锁传递给sleep,以便sleep能在调用进程被标记为睡眠状态并在睡眠通道上等待之后,释放该锁,并切换到其他进程。该锁将强制并发的V等待P完成sleep之后才开始执行。这样wakeup就能找到正在睡眠的消费者并唤醒它。一旦消费者再次醒来,sleep会在返回之前重新获取锁,保持P在sleep之前仍持有锁的状态

400 void
401 V(struct semaphore *s)
402 {
403   acquire(&s->lock);
404   s->count += 1;
405   wakeup(s);
406   release(&s->lock);
407 }
408
409 void
410 P(struct semaphore *s)
411 {
412   acquire(&s->lock);
413   while(s->count == 0)
414     sleep(s, &s->lock);  // 关键修改:把锁传进 sleep
415   s->count -= 1;
416   release(&s->lock);
417 }

        xv6如何实现?

        基本思想是让sleep把当前今晨标记为SLEEPING,然后调用sched来释放CPU,wakeup则寻找在给定等待通道上睡眠的进程,并将他们标记为RUNNABLE。sleep和wakeup的调用者可以使用双方都方便的数字作为通道(channel)。xv6通常使用涉及等待的内核数据结构地址作为通道。

        sleep一开始持有lk。lk的作用和上面的s->lock相同。都是为了确保V不会在检测条件和进入睡眠之间尝试增加计数,导致丢失唤醒信号。

        之后,sleep会尝试获取p->lock。现在准备睡眠的进程持有p->lock和lk,接着,它会释放lk。由于wakeup唤醒进程许哟啊获取对应的p->lock,所以,并发的wakeup会等待p->lock释放才能唤醒p。

// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
  struct proc *p = myproc();
  
  // Must acquire p->lock in order to
  // change p->state and then call sched.
  // Once we hold p->lock, we can be
  // guaranteed that we won't miss any wakeup
  // (wakeup locks p->lock),
  // so it's okay to release lk.
  if(lk != &p->lock){  //DOC: sleeplock0
    acquire(&p->lock);  //DOC: sleeplock1
    release(lk);
  }

  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  if(lk != &p->lock){
    release(&p->lock);
    acquire(lk);
  }
}

// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
  struct proc *p;

  for(p = proc; p < &proc[NPROC]; p++) {
    acquire(&p->lock);
    if(p->state == SLEEPING && p->chan == chan) {
      p->state = RUNNABLE;
    }
    release(&p->lock);
  }
}

        虚假唤醒:有时会出现多个进程在同一个通道上睡眠的情况。例如,多个进程从同一个管道读取数据。单个wakeup将唤醒他们所有。其中一个将率先运行并获取sleep调用时使用的锁lk,并读取管道中等待的数据。其他进程将发现,尽管被唤醒却无数据可读。从他们的角度来看,这次唤醒是虚假的,他们必须再次进入睡眠。这也是为什么sleep总是由一个检查条件的循环包裹着被调用。

        条件锁:这里,我们管lk叫条件锁。条件锁的职责就是保证:检查完条件后,sleep之前条件不会被改变。但也可以说,条件锁的职责是:保证检查完条件,确认将要入睡,在完成入睡这一动作之前,唤醒者禁止发出唤醒信号。

        具体实践为:入睡者在检查入睡条件到入睡完成这一过程中,必须持有条件锁。唤醒者想要唤醒某个进程必须持有条件锁。


        管道

        pipe结构体:pipe结构体中包含一个锁和一个数据缓冲区。字段 nreadnwrite 统计读取和写入缓冲区的字节总数。缓冲区是环绕的:在 buf[PIPESIZE-1] 之后写入的下一个字节是 buf[0]。但计数本身(nreadnwrite)不环绕。这个惯例让实现能够区分满缓冲区(nwrite == nread + PIPESIZE)和空缓冲区(nwrite == nread),但这意味索引缓冲区时必须使用 buf[nread % PIPESIZE] 而不仅仅是 buf[nread]nwrite 同理)。

        假设piperead和pipewrite的调用在两个不同的CPU上发生。pipewrite首先获取管道的锁,该锁保护计数、数据机器关联的不变量。piperead随后也尝试获取锁,它在acquire中自旋等待锁。当piperead等待时,如果缓冲区满,pipewrite会调用wakeup来唤醒piperead,然后在pi->lock上睡眠,sleep会释放pi->lock,届时piperead可以获取pi->lock。

        piperead会进入for循环中,在管道中复制数据,并根据复制的字节数增加nread。piperead在返回前调用wakeup来唤醒任何睡眠的pipewrite

int
pipewrite(struct pipe *pi, uint64 addr, int n)
{
  int i;
  char ch;
  struct proc *pr = myproc();

  acquire(&pi->lock);
  for(i = 0; i < n; i++){
    while(pi->nwrite == pi->nread + PIPESIZE){  //DOC: pipewrite-full
      if(pi->readopen == 0 || pr->killed){
        release(&pi->lock);
        return -1;
      }
      wakeup(&pi->nread);
      sleep(&pi->nwrite, &pi->lock);
    }
    if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
      break;
    pi->data[pi->nwrite++ % PIPESIZE] = ch;
  }
  wakeup(&pi->nread);
  release(&pi->lock);
  return i;
}

int
piperead(struct pipe *pi, uint64 addr, int n)
{
  int i;
  struct proc *pr = myproc();
  char ch;

  acquire(&pi->lock);
  while(pi->nread == pi->nwrite && pi->writeopen){  //DOC: pipe-empty
    if(pr->killed){
      release(&pi->lock);
      return -1;
    }
    sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
  }
  for(i = 0; i < n; i++){  //DOC: piperead-copy
    if(pi->nread == pi->nwrite)
      break;
    ch = pi->data[pi->nread++ % PIPESIZE];
    if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
      break;
  }
  wakeup(&pi->nwrite);  //DOC: piperead-wakeup
  release(&pi->lock);
  return i;
}

        wait,exit,kill

        wait和kill:在第一章中我们在子进程退出与父进程等待时使用wait和kill。父进程可能在子进程死后很久才调用wait。所以我们需要保存子进程的死亡信息(例如PID和退出状态)直到父进程调用wait。

        xv6记录子进程死亡直到wait观察到它的方式是:exit将调用者置于ZOMBIE状态,它会一直保持该状态,直到父进程的wait注意到它,将其状态更改为UNUSED,复制子进程的退出状态,并将子进程的PID返回给父进程。

        如果父进程先于子进程退出,父进程会将子进程过继给init程序,init进程是一个永不终止的循环,它会调用wait函数杀死僵尸进程。

        wait使用调用进程的p->lock作为条件锁以避免丢失唤醒,并在开始就获取该锁。然后它扫描进程表,如果发现一个处于ZOMBIE状态的子进程,它会释放该子进程的资源及其proc结构,将子进程的退出状态复制到提供给wait的地址,并返回子进程的PID。如果wait发现了子进程但没有一个已退出,会调用sleep来等待它们中的任何一个退出,然后再次扫描。

        注:wait经常持有两把锁,它在尝试获取任何子进程的锁之前,先获取自己的锁。

        wait查看每个进程的np->parent来寻找其子进程,它在使用np->parent时不持有np->lock。这似乎违反了共享变量必须受锁保护的原则。但是,如果np是当前进程的祖先的话,获取np->lock可能导致死锁,因为违反了上述的先获取父进程锁,再获取子进程锁的顺序。并且由于一个进程的父进程字段只能由其父进程更改,除非当前进程(父进程)自己去修改它,否则该值不会改变。所以直接使用np->parent也是安全的。

// Wait for a child process to exit and return its pid.
// Return -1 if this process has no children.
int
wait(uint64 addr)
{
  struct proc *np;
  int havekids, pid;
  struct proc *p = myproc();

  // hold p->lock for the whole time to avoid lost
  // wakeups from a child's exit().
  acquire(&p->lock);

  for(;;){
    // Scan through table looking for exited children.
    havekids = 0;
    for(np = proc; np < &proc[NPROC]; np++){
      // this code uses np->parent without holding np->lock.
      // acquiring the lock first would cause a deadlock,
      // since np might be an ancestor, and we already hold p->lock.
      if(np->parent == p){
        // np->parent can't change between the check and the acquire()
        // because only the parent changes it, and we're the parent.
        acquire(&np->lock);
        havekids = 1;
        if(np->state == ZOMBIE){
          // Found one.
          pid = np->pid;
          if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate,
                                  sizeof(np->xstate)) < 0) {
            release(&np->lock);
            release(&p->lock);
            return -1;
          }
          freeproc(np);
          release(&np->lock);
          release(&p->lock);
          return pid;
        }
        release(&np->lock);
      }
    }

    // No point waiting if we don't have any children.
    if(!havekids || p->killed){
      release(&p->lock);
      return -1;
    }
    
    // Wait for a child to exit.
    sleep(p, &p->lock);  //DOC: wait-sleep
  }
}

        exit记录退出状态,释放一些资源,将所有子进程交给init进程,唤醒父进程(假设父进程正在wait子进程退出,那么子进程应该唤醒父进程),将调用者标记为ZOMBIE,并永久让出CPU。

        正在退出的进程在将其状态设置为ZOMBIE并唤醒父进程时,必须持有其父进程的锁,因为父进程的锁是wait中防止丢失唤醒的条件锁。

        子进程还必须持有自己的p->lock,否则父进程可能会在exit将状态设置为ZOMBIE后马上就尝试释放子进程。

        由于wait中我们先获取父进程锁,再获取子进程锁。所以在exit中,我们必须以同样的顺序获取锁。

        exit调用一个专门的唤醒函数wakeup1,该函数仅唤醒父进程,且仅当父进程在wait中睡眠时才唤醒。该函数和wakeup的重要区别是,wakeup1接受proc* p而不是void* chan作为参数,并且wakeup1假设你已经持有条件锁(即p->lock)。

// Exit the current process.  Does not return.
// An exited process remains in the zombie state
// until its parent calls wait().
void
exit(int status)
{
  struct proc *p = myproc();

  if(p == initproc)
    panic("init exiting");

  // Close all open files.
  for(int fd = 0; fd < NOFILE; fd++){
    if(p->ofile[fd]){
      struct file *f = p->ofile[fd];
      fileclose(f);
      p->ofile[fd] = 0;
    }
  }

  begin_op();
  iput(p->cwd);
  end_op();
  p->cwd = 0;

  // we might re-parent a child to init. we can't be precise about
  // waking up init, since we can't acquire its lock once we've
  // acquired any other proc lock. so wake up init whether that's
  // necessary or not. init may miss this wakeup, but that seems
  // harmless.
  acquire(&initproc->lock);
  wakeup1(initproc);
  release(&initproc->lock);

  // grab a copy of p->parent, to ensure that we unlock the same
  // parent we locked. in case our parent gives us away to init while
  // we're waiting for the parent lock. we may then race with an
  // exiting parent, but the result will be a harmless spurious wakeup
  // to a dead or wrong process; proc structs are never re-allocated
  // as anything else.
  acquire(&p->lock);
  struct proc *original_parent = p->parent;
  release(&p->lock);
  
  // we need the parent's lock in order to wake it up from wait().
  // the parent-then-child rule says we have to lock it first.
  acquire(&original_parent->lock);

  acquire(&p->lock);

  // Give any children to init.
  reparent(p);

  // Parent might be sleeping in wait().
  wakeup1(original_parent);

  p->xstate = status;
  p->state = ZOMBIE;

  release(&original_parent->lock);

  // Jump into the scheduler, never to return.
  sched();
  panic("zombie exit");
}

        kill:exit允许进程终止自身,而kill允许一个进程请求另一个进程终止。由于kill直接销毁另一个进程实在过于复杂,所以kill只做两件事情,将p->killed设置为1,如果进程在睡觉则将其唤醒。最终受害者会进入或离开内核,此时usertrap中的代码会检查p->killed是否被设置,如果设置了就调用exit。如果受害者正在用户空间中运行,它很快就会因为发起系统调用或因为定时器中断而进入内核。

        如果受害者正在睡眠状态,kill会导致受害者从sleep返回。这可能会有潜在危险,因为sleep正在等待的条件可能并不为真。然而,xv6对sleep的调用总是包裹在一个while循环中,该循环在sleep返回后会重新测试条件。一些sleep调用还在循环中测试p->killed,如果设置了该标志则放弃当前活动。当然,这样做的前提是放弃当前活动的行为本身正确。

        一些xv6的sleep循环不检查p->killed,因为代码正处于应该原子执行的多步系统调用中间。virtio 驱动程序 (kernel/virtio_disk.c:242) 就是一个例子:它不检查 p->killed,因为磁盘操作可能是一组写入操作之一,为了使文件系统保持正确状态,所有写入都需要完成。在等待磁盘 I/O 时被杀死的进程直到完成当前系统调用并且 usertrap 看到 killed 标志时才会退出。

// Kill the process with the given pid.
// The victim won't exit until it tries to return
// to user space (see usertrap() in trap.c).
int
kill(int pid)
{
  struct proc *p;

  for(p = proc; p < &proc[NPROC]; p++){
    acquire(&p->lock);
    if(p->pid == pid){
      p->killed = 1;
      if(p->state == SLEEPING){
        // Wake process from sleep().
        p->state = RUNNABLE;
      }
      release(&p->lock);
      return 0;
    }
    release(&p->lock);
  }
  return -1;
}

        进程锁(Process Locking):进程锁是xv6中最复杂的锁。理解它的一个简单的方式是:当读取或写入一下struct proc 字段时必须持有它:p->statep->chanp->killedp->xstatep->pid。 这些字段可能被其他进程使用,或者被其他核心上的调度器线程使用,所以很自然地,它们必须受锁保护。

        然而,p->lock的大多数用途是保护xv6进程数据结构和算法的更高层方面,以下是几个关键的场景:

        1.保证进程创建和销毁时的原子性。在进程创建(allocproc)和销毁(wait/freeproc)的瞬间,进程处于存在和销毁的边界。进程锁将“分配内存+初始化”或“读取退出码+释放内存”这些复杂的步骤封装成了一个外界看起来瞬间完成的动作。

        2.调度器切换的接力棒。当一个进程决定让出CPU时,它会调用swtch函数。进程在调用swtch前必须持有进程锁,并且不释放,直接带着锁切换出去。只有当调度器完全接管了CPU,才会由调度器释放这把锁。

        3.exit/wait逻辑的核心。在wait()中,父进程的进程锁充当了条件锁。他保证了检查条件和sleep这两个操作的原子性。子进程在exit()中想要唤醒父进程的话,必须持有条件锁。

        当持有p->lock时,进程的状态对于外界来说是静止的,它确保任何时刻,所有CPU核心都只有一个在操作这个进程的生命周期。要么是进程自己,要么是调度器,要么是父进程。


        Real World

        xv6的调度器使用的是轮转调度,这种方式简单,公平。而在真实操作系统中,调度器可能会优先考虑可运行的高优先级进程,而不是低优先级进程。于此同时,调度器可能还希望保证公平性和高吞吐量。

        此外,复杂的策略可能会导致意想不到的问题。例如,优先级反转和护送效应。当低优先级和高优先级进程共享一个锁时,可能会发生优先级反转,一旦低优先级进程获取了锁,它可能会阻止高优先级进程取得进展。当许多高优先级进程等待一个获取了共享锁的低优先级进程时,就会形成长长的等待“车队”;一旦车队形成,它可能会持续很长时间。为了避免这些问题,复杂的调度器中需要额外的机制。

        睡眠唤醒机制:sleep/wakeup是一种简单有效的方式,但也有其他方法。所有方法的首要挑战都是避免丢失唤醒。Linux内核的sleep使用一个显式的进程队列,称为等待队列,而不是通过等待通道,该队列有自己的内部锁。

        在wakeup中扫描整个进程表来寻找睡眠的子进程的方式是低效的。真实的操作系统会使用链表/队列,这些数据结构上保存着在该结构上睡眠的进程列表。例如Linux的等待队列。

        wakeup的实现会唤醒所有在特定通道上等待的进程,这一运行方式被称作惊群。真实的OS会引入信号量。其计数通常对应于诸如管道缓冲区中的可用字节数或进程拥有的僵尸子进程数之类的内容。使用显式计数作为抽象的一部分可以避免“丢失唤醒”问题:因为已经发生的唤醒次数有一个显式的计数。计数还避免了虚假唤醒和惊群问题。

        

        杀死进程:真实的OS中,杀死一个内核态很深的进程需要安全地释放每一层资源,这需要非常小心的操作。

        

        xv6 kill的缺陷:在killer检查条件(p->killed)和调用sleep之间,我们没有条件锁来保证不会丢失杀死当前进程的信号。

        内存分配优化:xv6中allocproc每次从头遍历数组找空位。Real OS会维护一个空闲链表 ,拿空位只需要O(1)。

        

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐