MIT6.S081 Lab-7: Multithreading

我们暂时结束了对于虚拟内存的探索,转向另外一个有趣而且有用的话题————多线程,或者说,并发。众所周知实际上我才知道,多线程是实现并发的手段之一,如果CPU\text{CPU}有多核那么我们甚至可以实现并行。当然,这次实验的主题就是调度。

操作系统的一个主要功能就是对硬件进行抽象,前面的虚拟内存就是对于内存的抽象,而进程/线程调度则是对于CPU\text{CPU}的抽象。通过合理调度,我们使得每个进程/线程都能在CPU\text{CPU}上被执行,这种雨露均沾就仿佛每个进程/线程都有属于自己的CPU\text{CPU}一样。

1. 在uthread.c中模拟线程切换\text{1. 在uthread.c中模拟线程切换}

其实不难,这些东西我们在之前写sigalarmsigreturn系统调用的时候已经初步尝试过一遍了。我们需要做的是:保存Callee-Saved\text{Callee-Saved}寄存器,让ra\text{ra}指向待执行函数的地址,让sp\text{sp}指向新线程的栈的地址。

于是我们发现,我们需要的东西就是一个context结构体。于是可以在thread结构体中加入一个context类型的字段ctx,这样我们进一步发现,虽然我们要用汇编实现一个thread_switch函数,但是事实上我们只需要把内核代码的swtch.S借鉴照抄过来就可以。当然,为了更方便地照抄,你可以把ctx放在thread定义的开头,这样就改都不用改(如果放在后面,整体要加偏移)。

实际上需要写的就是thread_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void 
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->ctx.ra = (uint64)func;
t->ctx.sp = (uint64)(t->stack + STACK_SIZE);
t->ctx.s0 = current_thread->ctx.s0;
t->ctx.s1 = current_thread->ctx.s1;
t->ctx.s2 = current_thread->ctx.s2;
t->ctx.s3 = current_thread->ctx.s3;
t->ctx.s4 = current_thread->ctx.s4;
t->ctx.s5 = current_thread->ctx.s5;
t->ctx.s6 = current_thread->ctx.s6;
t->ctx.s7 = current_thread->ctx.s7;
t->ctx.s8 = current_thread->ctx.s8;
t->ctx.s9 = current_thread->ctx.s9;
t->ctx.s10 = current_thread->ctx.s10;
t->ctx.s11 = current_thread->ctx.s11;
}

2. 玩一玩类Unix系统多线程并行\text{2. 玩一玩类Unix系统多线程并行}

这是最激动人心的时刻辣!我们突然之间跳出了xv6\text{xv6}qemu\text{qemu}。在这个任务中我们要到一个真正的类Unix\text{Unix}操作系统上试玩多线程并行。家用电脑很多都有多核CPU\text{CPU},玩多线程自然就玩出了并行。

这次任务给了一个在类Unix\text{Unix}系统上运行的哈希表,在只用11个线程的时候它可以正常运行,但是线程一多起来就会出问题,有的键值插入以后莫名其妙地没有了。我们的任务就是分析原因并且解决这个问题。

其实课上已经讲过了这个例子,表明并行插入链表的操作是不安全的。我们看这两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void 
insert(int key, int value, struct entry **p, struct entry *n)
{
struct entry *e = malloc(sizeof(struct entry));
e->key = key;
e->value = value;
e->next = n;
*p = e;
}

static
void put(int key, int value)
{
int i = key % NBUCKET;

// is the key already present?
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key)
break;
}
if(e){
// update the existing key.
e->value = value;
} else {
// the key is new.
insert(key, value, &table[i], table[i]);
}
}

关键的问题在于insert函数没有原子性。看这个函数,我们是先在链表头部加上新节点,然后让链表头部指向新节点。一个线程的时候当然没什么问题,但是多个线程麻烦就来了。我们来考虑这么一种可能的情况:

假设现在的链表是:

k1k2...knk_1\rightarrow k_2\rightarrow ... \rightarrow k_n

现在的链表头部h=k1h=k_1(也就是代码里的table[i])

线程11来插入一个新的点xx,它首先将xx插入到hh前面,但是hh的值不变,仍然指向原来的链表头部,那么现在变成了

xk1k2...knx\rightarrow k_1\rightarrow k_2\rightarrow ... \rightarrow k_n

我们的运气不够好,线程22来了,它要插入一个yy。线程22认为当前的链表头部还是h=k1h=k_1,于是它也把yy插入到k1k_1前面。线程11想把链表头部hh更新为xx,线程22想把hh更新为yy,于是x,yx,y当中的某一个成为了新的链表头部并且指向k1k_1,另一个就无法访问到了。

分析了原因,我们就知道,只要给insert加锁,保持其原子性即可,这样我们每一步insert一定会插入一个新点并且把它设为新的链表头部。

此外,我们还注意到另一种可能性是不修改链表结构,但是修改某个键对应的值,这种情况下不需要加锁,因为它本身具有原子性。代码很简单,这里就不放出来了,给每个链表一个线程锁就可以。加锁以后结果就正确了,但是显而易见地没有不加锁的情况那么快了。

指南里说,如果你直接加锁的话可能会无法通过所有测试,因为比较慢,需要优化。。。然后我发现我写出来的就是比较快的版本,不用优化也过了。。。对此我的猜测是,另一种OK但是很慢的方式是给整个哈希表加锁(而不是给每个链表单独加锁),这么蠢的办法自然就慢一些了。。。

3. Barrier\text{3. Barrier}

这也是并行编程中常常遇到的一个概念,故名思义,它的作用是将所有的线程“拦下来”使其同步。

barrier结构体的定义和一部分初始化代码已经给我们了,我们的目标是实现barrier()函数,每个线程调用barrier()时就会停下来,直到所有线程同步了再继续各跑各的。

这个题嘛,确实不难,个人对自己的做法正确性还是有信心。但是编译开了O2\text{O2}优化,然后给我把一个死循环(其实类似于一种锁的实现吧)给优化了,老是莫名奇妙地挂,然后去掉O2\text{O2}优化就好了。。。

这里还是有点麻烦,对于几个锁的理解要比较清楚虽然我也没多清楚,尤其要注意条件锁的机制,比如pthread_cond_wait存在虚拟唤醒的情况,最好拿循环包住,这些在课程中也有提到,这里不细说。

很容易想出一个大致的框架:进入barrier后首先取得锁,然后给bstate.nthread++,如果加完以后发现刚好等于nthread那就说明所有线程都已经调用过此函数了,开始准备返回,并且把bstate.nthread00,否则的话就使用pthread_cond_wait进行睡眠,外面套一层循环条件是bstate.nthread != 0,这样我们就可以在所有线程调用过函数以后将其唤醒并返回。

需要注意的是,pthread_cond_wait返回后会自动获得锁,因此返回之前需要释放。

此外,有可能存在某个线程跑得比较快,先返回进入了下一轮的barrier并且修改barrier.nthread,我们自然不能把下一轮的barrier.nthread和这一轮的barrier.nthread混为一谈,虽然两者是同一个变量。因此,我加入了返回线程数的统计,只有当线程全部返回的时候才允许返回,否则不断轮询,这样就不会出现某个线程太快就跑掉的情况。

但是这种做法开了O2\text{O2}优化以后就挂了。。。问了学长,是因为这种做法产生了data race\text{data race}(多个线程同时读写同一个内存,且不全为读),是未定义行为,虽然我在代码里控制得非常严格,但是这种未定义行为自然导致O2\text{O2}优化以后出现了不可控的结果。

要改正其实也非常简单,只需要给读写操作加锁就行,虽然会慢一些,但是确确实实保证了数据访问的有序。虽然如此,这里我还是放出原始版本无他,懒得改了以后还要系统学习并行编程,那时会再来重新考虑这些问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static void 
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//
pthread_mutex_lock(&bstate.barrier_mutex);
bstate.nthread++;
if(bstate.nthread == nthread)
{
bstate.nthread = 0;
bstate.free_t++;
if(bstate.free_t == nthread)
{
bstate.free_t = 0;
bstate.round++;
}
pthread_mutex_unlock(&bstate.barrier_mutex);
pthread_cond_broadcast(&bstate.barrier_cond);
while(bstate.free_t);// wait for other threads
return ;
}
while(bstate.nthread)
{
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);// broadcast wake it up, and we acquire the lock
}
bstate.free_t++;
if(bstate.free_t == nthread)
{
bstate.free_t = 0;
bstate.round++;
}
pthread_mutex_unlock(&bstate.barrier_mutex);
while(bstate.free_t);// we'll return only when all threads prepare to return
}

总而言之,这次实验确实不难,除了最后一个任务确实因为自己不知道data race\text{data race}这种未定义行为折腾了很久。。。