MIT6.S081 Lab-8: Locks

这几次Lab\text{Lab}的主题都是“并发控制”,xv6\text{xv6}在锁这方面的处理虽然简单而且正确,但是造成了大量的锁冲突,导致处理并发以及并行的效率并不高。这次的实验中,我们需要实现一些改进的设计,保证并发正确性的同时提高操作系统处理并发的效率也提高自己病发的频率

1. 并行的内存分配加速\text{1. 并行的内存分配加速}

任务首先让我们跑了一个小测试,用一堆进程不断的分配以及释放内存,来证明现在的xv6\text{xv6}在并行内存分配中存在大量的锁冲突,效率低下。其原因就是我们对空闲内存的加锁过于粗暴————不同的CPU\text{CPU}完全可以同时访问互不相同的内存,但是因为内存只有一把锁并且只能被一个CPU\text{CPU}取得,导致其他的CPU\text{CPU}只能傻等着。

方案也很简单,给每个CPU\text{CPU}一个单独的空闲列表,每个单独的空闲列表都有一把锁,需要用谁的空闲列表就去取谁的锁即可。

这里可能存在一个问题,那就是如果某个CPU\text{CPU}的空闲列表用完了,那么它就需要从其他的CPU\text{CPU}手里“偷”空闲内存。这种情况下需要同时取得两个空闲列表的锁,然后我们就可以放心地进行链表操作,将被偷的CPU\text{CPU}的空闲内存分配给当前CPU\text{CPU},当该内存被释放时就到了当前CPU\text{CPU}的空闲列表里。

需要注意的一个小点是,在调用mycpu()以及使用其返回值以的时候,都需要保证中断被关闭,因此要在前后套上push_offpop_off

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// only cpu0 will call this, so there is no need to lock
// after kinit is called,
void
kinit()
{
for(int i = 0; i < NCPU; i++)
initlock(&cpus[i].kmem.lock, "kmem");
freerange(end, (void*)PHYSTOP);
}

void
freerange(void *pa_start, void *pa_end)
{
char *p;
p = (char*)PGROUNDUP((uint64)pa_start);
for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE)
kfree(p);
}

// Free the page of physical memory pointed at by v,
// which normally should have been returned by a
// call to kalloc(). (The exception is when
// initializing the allocator; see kinit above.)
void
kfree(void *pa)
{
struct run *r;

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;
push_off();
struct cpu *c = mycpu();
acquire(&c->kmem.lock);
r->next = c->kmem.freelist;
c->kmem.freelist = r;
release(&c->kmem.lock);
pop_off();
}

// cpu C steal memory from other cpus
void* steal(struct cpu *C)
{
struct run *r = 0;
for(struct cpu *c = cpus; c < cpus + NCPU; c++)
{
if(c != C)
{
acquire(&c->kmem.lock);
if(c->kmem.freelist)
{
r = c->kmem.freelist;
c->kmem.freelist = r->next;// remove r from the free list of c
// there is no need to put r into the free list of C
// because r is allocated, when we kfree r
release(&c->kmem.lock);
break;
}
release(&c->kmem.lock);
}
}
return r; // in case NCPU = 1
}

// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;
push_off();
struct cpu *c = mycpu();
acquire(&c->kmem.lock);
r = c->kmem.freelist;
if(r)
c->kmem.freelist = r->next;
else r = steal(c);
release(&c->kmem.lock);
pop_off();
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

2. 磁盘块缓存的优化\text{2. 磁盘块缓存的优化}

目前的xv6\text{xv6}另外一个锁冲突重灾区是磁盘块缓存。读取磁盘的操作非常非常慢,所以一般都要读到内存里缓存好,当然,任意时刻一个磁盘块缓存只能被一个进程访问。之前优化内存分配的方法也就是给每个CPU\text{CPU}都维护一个“空闲列表”这个时候就不行了。因为分配内存的时候,只要有内存就可以,我们不在乎内存在哪儿,但是对于磁盘块缓存显然不是这回事儿。

目前的xv6\text{xv6}在这方面很naive\text{naive},首先要加锁,把已缓存的磁盘块循环遍历一遍,如果没找到要访问的磁盘块还要先缓存,这样才能释放磁盘块缓存的锁。这个过程长而且没必要,我们把整个磁盘缓存都循环了一遍只是为了找那么特定的一个缓存块,这很浪费。

要优化这个访问,其实可以用一个比较神奇的思路。我们其实还是可以把所有的磁盘缓存块分成几条链表(而不是当前的一条链表),或者说用挂链表来存所有磁盘块。

插点题外话,在指南里说要用哈希表,我愣了半天,后来才明白,他们说的哈希表这个并不维护什么映射,单纯只是用的挂链表的结构。。。在我这里为肯定就直说挂链表了,算是一些名词的误解吧。

在前一次实验中我们已经知道了并行的挂链表,所以这并不是难事,给每条链表都加锁即可。

当然,我们也不可能完全避免锁冲突,有些情况下的锁冲突是可以容忍的。例如:哈希冲突(但是指南说希望你能够改进哈希表的设计以尽可能减少这种情况),两个进程访问同一个磁盘缓存块(这属于是神仙也救不回来的情况),两个进程同时发生了缓存miss\text{miss}于是不得不遍历磁盘块来进行替换等等。

所以我们的思路非常简单,当我们调用bget函数的时候,首先给要访问的一条链表加锁,并且访问这条链表,查看缓存是否命中,如果命中了就很好,我们就可以直接释放锁然后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
struct buf *b;

// Is the block already cached?
uint idx = blockno % HASHPRIME;
acquire(&bcache.lock[idx]);
for(b = bcache.head[idx].next; b != &bcache.head[idx]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock[idx]);
acquiresleep(&b->lock);
return b;
}
}

如果缓存未命中,那么我们优先在这条链表中查找是否有可以替换的缓存块,运气好的话我们也能找到一个并且替换。上面这两种情况都是只需要对当前访问链表加锁即可,思路非常直接,唯一要注意的可能是及时释放锁以及链表操作,xv6\text{xv6}在这里用的链表操作起来稍微有些不习惯。。。不过看明白定义也就好了,并不是什么麻烦事儿。

在运气最坏的情况下,我们在当前这条链表里既不能找到缓存块也找不到可以替换的块,就只能试图在其它链表里寻找可以替换的缓存块了,这也不难,循环所有链表(除了当前列表)并且获取锁(同时拥有两个锁),然后将待替换的块从它原本的链表中删除,插入当前链表中并且修改信息即可。当然,同样要记得及时释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
for(int i = 0; i < HASHPRIME; i++)
{
if(i == idx) continue;
acquire(&bcache.lock[i]);
// we need to hold both the locks of the i'th table and the idx'th table
for(b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev){
if(b->refcnt == 0) {
// we find a replaceable buffer, now we remove it from i'th table and insert it into idx'th table
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
b->next->prev = b->prev;
b->prev->next = b->next;
// we now insert the buffer into the idx'th table
b->next = bcache.head[idx].next;
b->prev = &bcache.head[idx];
bcache.head[idx].next->prev= b;
bcache.head[idx].next = b;
// we then return with bcache.lock[idx] released
release(&bcache.lock[i]);
release(&bcache.lock[idx]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.lock[i]);
}

这样就做完了。。。吗?

如果我们运行usertests,会发现有的测试卡住不动了。如果你还能记得起上课讲的概念或者拿gdb\text{gdb}调试一下就会发现发生了死锁,这正是指南里面提醒我们的情况:

When replacing a block, you might move a struct buf from one bucket to another bucket, because the new block hashes to a different bucket. You might have a tricky case: the new block might hash to the same bucket as the old block. Make sure you avoid deadlock in that case.

当然,我只注意了前面的部分然后认为自己的思路正确,就没注意到后面那句选择性失明然后当然就寄了。

回忆课上讲过的,这种死锁的情形常常是这样的:一个进程获取了锁L1L_1,正在等待获取锁L2L_2,而另一个进程获取了锁L2L_2,正在等待获取锁L1L_1。这种情况下两者都不可能获取到锁,一直在等待。解决的方法就是给所有锁排序,规定获取多个锁时必须按照顺序获取。

因此,我们就得在进行其它链表的访问之前释放当前链表的锁,然后在循环里一次性获取两个,并且是按顺序获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
release(&bcache.lock[idx]);
// we fail to find a replaceable buffer in idx'th hash table
// we try to replace a buffer in other hash table
for(int i = 0; i < HASHPRIME; i++)
{
if(i == idx) continue;
if(i < idx)
{
acquire(&bcache.lock[i]);
acquire(&bcache.lock[idx]);
}
else
{
acquire(&bcache.lock[idx]);
acquire(&bcache.lock[i]);
}
// we need to hold both the locks of the i'th table and the idx'th table
for(b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev){
if(b->refcnt == 0) {
// we find a replaceable buffer, now we remove it from i'th table and insert it into idx'th table
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
b->next->prev = b->prev;
b->prev->next = b->next;
// we now insert the buffer into the idx'th table
b->next = bcache.head[idx].next;
b->prev = &bcache.head[idx];
bcache.head[idx].next->prev= b;
bcache.head[idx].next = b;
// we then return with bcache.lock[idx] released
release(&bcache.lock[i]);
release(&bcache.lock[idx]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.lock[i]);
release(&bcache.lock[idx]);
}

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
struct {
struct spinlock lock[HASHPRIME];
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head[HASHPRIME];
} bcache;

void
binit(void)
{
struct buf *b;

uint i = 0;
for(i = 0; i < HASHPRIME; i++)
{
initlock(&bcache.lock[i], "bhash");
bcache.head[i].prev = &bcache.head[i];
bcache.head[i].next = &bcache.head[i];
}
i = 0;
for(b = bcache.buf; b < bcache.buf+NBUF; b++, i++){
if(i == HASHPRIME) i = 0;
b->next = bcache.head[i].next;
b->prev = &bcache.head[i];
bcache.head[i].next->prev = b;
bcache.head[i].next = b;
// Create linked list of buffers
initsleeplock(&b->lock, "buffer");
}
}

// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

// Is the block already cached?
uint idx = blockno % HASHPRIME;
acquire(&bcache.lock[idx]);
for(b = bcache.head[idx].next; b != &bcache.head[idx]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock[idx]);
acquiresleep(&b->lock);
return b;
}
}
// Not cached in idx'th hash table.
// Recycle the other least recently used (LRU) unused buffer in idx'th hash table.
// in this case, we keep the hold of the lock of idx'th hash table
// and try to replace a buffer
for(b = bcache.head[idx].prev; b != &bcache.head[idx]; b = b->prev){
if(b->refcnt == 0) {
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.lock[idx]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.lock[idx]);
// we fail to find a replaceable buffer in idx'th hash table
// we try to replace a buffer in other hash table
for(int i = 0; i < HASHPRIME; i++)
{
if(i == idx) continue;
if(i < idx)
{
acquire(&bcache.lock[i]);
acquire(&bcache.lock[idx]);
}
else
{
acquire(&bcache.lock[idx]);
acquire(&bcache.lock[i]);
}
// we need to hold both the locks of the i'th table and the idx'th table
for(b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev){
if(b->refcnt == 0) {
// we find a replaceable buffer, now we remove it from i'th table and insert it into idx'th table
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
b->next->prev = b->prev;
b->prev->next = b->next;
// we now insert the buffer into the idx'th table
b->next = bcache.head[idx].next;
b->prev = &bcache.head[idx];
bcache.head[idx].next->prev= b;
bcache.head[idx].next = b;
// we then return with bcache.lock[idx] released
release(&bcache.lock[i]);
release(&bcache.lock[idx]);
acquiresleep(&b->lock);
return b;
}
}
release(&bcache.lock[i]);
release(&bcache.lock[idx]);
}
panic("bget: no buffers");
}

至于这里的魔数HASHPRIME选的几。。。首先哈希表自然是选质数比较好,然后考虑到源码中NBUF3030,我选择了2929

总之记得及时释放,搞清楚链表操作,预防死锁,这样就做完了,这算是做得最快的一次hard\text{hard}难度的任务了,因为代码集中在两个函数内部,并且思路很清晰,理解以后做起来比较快。