我们暂时结束了对于虚拟内存的探索,转向另外一个有趣而且有用的话题————多线程,或者说,并发。众所周知实际上我才知道 ,多线程是实现并发的手段之一,如果CPU \text{CPU} CPU 有多核那么我们甚至可以实现并行。当然,这次实验的主题就是调度。
操作系统的一个主要功能就是对硬件进行抽象,前面的虚拟内存就是对于内存的抽象,而进程/线程调度则是对于CPU \text{CPU} CPU 的抽象。通过合理调度,我们使得每个进程/线程都能在CPU \text{CPU} CPU 上被执行,这种雨露均沾就仿佛每个进程/线程都有属于自己的CPU \text{CPU} CPU 一样。
1. 在uthread.c中模拟线程切换 \text{1. 在uthread.c中模拟线程切换} 1. 在 uthread.c 中模拟线程切换
其实不难,这些东西我们在之前写sigalarm和sigreturn系统调用的时候已经初步尝试过一遍了。我们需要做的是:保存Callee-Saved \text{Callee-Saved} Callee-Saved 寄存器,让ra \text{ra} ra 指向待执行函数的地址,让sp \text{sp} sp 指向新线程的栈的地址。
于是我们发现,我们需要的东西就是一个context结构体。于是可以在thread结构体中加入一个context类型的字段ctx,这样我们进一步发现,虽然我们要用汇编实现一个thread_switch函数,但是事实上我们只需要把内核代码的swtch.S借鉴照抄 过来就可以。当然,为了更方便地照抄,你可以把ctx放在thread定义的开头,这样就改都不用改(如果放在后面,整体要加偏移)。
实际上需要写的就是thread_create
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void thread_create (void (*func)()) { struct thread *t ; for (t = all_thread; t < all_thread + MAX_THREAD; t++) { if (t->state == FREE) break ; } t->state = RUNNABLE; t->ctx.ra = (uint64)func; t->ctx.sp = (uint64)(t->stack + STACK_SIZE); t->ctx.s0 = current_thread->ctx.s0; t->ctx.s1 = current_thread->ctx.s1; t->ctx.s2 = current_thread->ctx.s2; t->ctx.s3 = current_thread->ctx.s3; t->ctx.s4 = current_thread->ctx.s4; t->ctx.s5 = current_thread->ctx.s5; t->ctx.s6 = current_thread->ctx.s6; t->ctx.s7 = current_thread->ctx.s7; t->ctx.s8 = current_thread->ctx.s8; t->ctx.s9 = current_thread->ctx.s9; t->ctx.s10 = current_thread->ctx.s10; t->ctx.s11 = current_thread->ctx.s11; }
2. 玩一玩类Unix系统多线程并行 \text{2. 玩一玩类Unix系统多线程并行} 2. 玩一玩类 Unix 系统多线程并行
这是最激动人心的时刻辣!我们突然之间跳出了xv6 \text{xv6} xv6 和qemu \text{qemu} qemu 。在这个任务中我们要到一个真正的类Unix \text{Unix} Unix 操作系统上试玩多线程并行。家用电脑很多都有多核CPU \text{CPU} CPU ,玩多线程自然就玩出了并行。
这次任务给了一个在类Unix \text{Unix} Unix 系统上运行的哈希表,在只用1 1 1 个线程的时候它可以正常运行,但是线程一多起来就会出问题,有的键值插入以后莫名其妙地没有了。我们的任务就是分析原因并且解决这个问题。
其实课上已经讲过了这个例子,表明并行插入链表的操作是不安全的。我们看这两个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 static void insert (int key, int value, struct entry **p, struct entry *n) { struct entry *e = malloc (sizeof (struct entry)); e->key = key; e->value = value; e->next = n; *p = e; } static void put (int key, int value) { int i = key % NBUCKET; struct entry *e = 0 ; for (e = table[i]; e != 0 ; e = e->next) { if (e->key == key) break ; } if (e){ e->value = value; } else { insert(key, value, &table[i], table[i]); } }
关键的问题在于insert函数没有原子性。看这个函数,我们是先在链表头部加上新节点,然后让链表头部指向新节点。一个线程的时候当然没什么问题,但是多个线程麻烦就来了。我们来考虑这么一种可能的情况:
假设现在的链表是:
k 1 → k 2 → . . . → k n k_1\rightarrow k_2\rightarrow ... \rightarrow k_n
k 1 → k 2 → . . . → k n
现在的链表头部h = k 1 h=k_1 h = k 1 (也就是代码里的table[i])
线程1 1 1 来插入一个新的点x x x ,它首先将x x x 插入到h h h 前面,但是h h h 的值不变,仍然指向原来的链表头部,那么现在变成了
x → k 1 → k 2 → . . . → k n x\rightarrow k_1\rightarrow k_2\rightarrow ... \rightarrow k_n
x → k 1 → k 2 → . . . → k n
我们的运气不够好,线程2 2 2 来了,它要插入一个y y y 。线程2 2 2 认为当前的链表头部还是h = k 1 h=k_1 h = k 1 ,于是它也把y y y 插入到k 1 k_1 k 1 前面。线程1 1 1 想把链表头部h h h 更新为x x x ,线程2 2 2 想把h h h 更新为y y y ,于是x , y x,y x , y 当中的某一个成为了新的链表头部并且指向k 1 k_1 k 1 ,另一个就无法访问到了。
分析了原因,我们就知道,只要给insert加锁,保持其原子性即可,这样我们每一步insert一定会插入一个新点并且把它设为新的链表头部。
此外,我们还注意到另一种可能性是不修改链表结构,但是修改某个键对应的值,这种情况下不需要加锁,因为它本身具有原子性。代码很简单,这里就不放出来了,给每个链表一个线程锁就可以。加锁以后结果就正确了,但是显而易见地没有不加锁的情况那么快了。
指南里说,如果你直接加锁的话可能会无法通过所有测试,因为比较慢,需要优化。。。然后我发现我写出来的就是比较快的版本,不用优化也过了。。。对此我的猜测是,另一种OK但是很慢的方式是给整个哈希表加锁(而不是给每个链表单独加锁),这么蠢的办法自然就慢一些了。。。
3. Barrier \text{3. Barrier} 3. Barrier
这也是并行编程中常常遇到的一个概念,故名思义,它的作用是将所有的线程“拦下来”使其同步。
barrier结构体的定义和一部分初始化代码已经给我们了,我们的目标是实现barrier()函数,每个线程调用barrier()时就会停下来,直到所有线程同步了再继续各跑各的。
这个题嘛,确实不难,个人对自己的做法正确性还是有信心。但是编译开了O2 \text{O2} O2 优化,然后给我把一个死循环(其实类似于一种锁的实现吧)给优化了,老是莫名奇妙地挂,然后去掉O2 \text{O2} O2 优化就好了。。。
这里还是有点麻烦,对于几个锁的理解要比较清楚虽然我也没多清楚 ,尤其要注意条件锁的机制,比如pthread_cond_wait存在虚拟唤醒的情况,最好拿循环包住,这些在课程中也有提到,这里不细说。
很容易想出一个大致的框架:进入barrier后首先取得锁,然后给bstate.nthread++,如果加完以后发现刚好等于nthread那就说明所有线程都已经调用过此函数了,开始准备返回,并且把bstate.nthread清0 0 0 ,否则的话就使用pthread_cond_wait进行睡眠,外面套一层循环条件是bstate.nthread != 0,这样我们就可以在所有线程调用过函数以后将其唤醒并返回。
需要注意的是,pthread_cond_wait返回后会自动获得锁,因此返回之前需要释放。
此外,有可能存在某个线程跑得比较快,先返回进入了下一轮的barrier并且修改barrier.nthread,我们自然不能把下一轮的barrier.nthread和这一轮的barrier.nthread混为一谈,虽然两者是同一个变量。因此,我加入了返回线程数的统计,只有当线程全部返回的时候才允许返回,否则不断轮询,这样就不会出现某个线程太快就跑掉的情况。
但是这种做法开了O2 \text{O2} O2 优化以后就挂了。。。问了学长,是因为这种做法产生了data race \text{data race} data race (多个线程同时读写同一个内存,且不全为读),是未定义行为,虽然我在代码里控制得非常严格,但是这种未定义行为自然导致O2 \text{O2} O2 优化以后出现了不可控的结果。
要改正其实也非常简单,只需要给读写操作加锁就行,虽然会慢一些,但是确确实实保证了数据访问的有序。虽然如此,这里我还是放出原始版本无他,懒得改了 以后还要系统学习并行编程,那时会再来重新考虑这些问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static void barrier () { pthread_mutex_lock(&bstate.barrier_mutex); bstate.nthread++; if (bstate.nthread == nthread) { bstate.nthread = 0 ; bstate.free_t ++; if (bstate.free_t == nthread) { bstate.free_t = 0 ; bstate.round++; } pthread_mutex_unlock(&bstate.barrier_mutex); pthread_cond_broadcast(&bstate.barrier_cond); while (bstate.free_t ); return ; } while (bstate.nthread) { pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex); } bstate.free_t ++; if (bstate.free_t == nthread) { bstate.free_t = 0 ; bstate.round++; } pthread_mutex_unlock(&bstate.barrier_mutex); while (bstate.free_t ); }
总而言之,这次实验确实不难,除了最后一个任务确实因为自己不知道data race \text{data race} data race 这种未定义行为折腾了很久。。。