UCAS-Network-Lab-8-Ip Lookup

当时啊,直到DDL过了我都没复现出来,最后一听报告才发现全班就我一个傻子在复现这个…

0. 为什么需要优化IP查找\text{0. 为什么需要优化IP查找}

在实验课上一般我们需要找个什么东西,很多都是遍历一遍,前面的路由器实验中,我们也是遍历一遍,然后找到了就返回,找不到就返回一个空指针。但是在真实世界中的路由器往往需要处理大量的数据包,而且每个数据包都需要查找一次,这样的话,如果每次都是遍历一遍,那么效率就会非常低下,所以我们需要对这个过程进行优化。

这次实验的目的就非常纯粹,就是优化IP查找的过程,使得查找的效率更高。

1. 朴素Trie\text{1. 朴素Trie}

Trie树是一种非常经典的数据结构,它的基本思想就是把字符串中的每个字符都看作是一个节点,然后把字符串中的每个字符都放在一个节点上,这样就可以通过遍历节点来找到字符串。对于二进制数,Trie则尤其好用。一种朴素的想法就是,既然每个IP地址都是32位的二进制数,那么我们就可以把每个IP地址都看作是一个字符串,然后把每个IP地址都放在一个节点上。查找时从上往下按照每一位的值来选择左右子树,遇到一个存有端口信息的节点时,就可以记录一次答案,然后继续往下查找,直到结束,这样,最深的存有端口信息的节点就对应于最长前缀匹配的端口信息。

这种方法的优点就是非常简单,而且实现起来也非常容易,相比于遍历一遍的方法,效率也有了很大的提升,但是这样还是不够。我们接下来还需要再进行优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

static void trie_insert(node_t* trie,
uint32_t ip,
uint32_t mask,
uint32_t port) {
node_t* curr = trie;
for (int i = 0; i < mask; i++) {
int bit = (ip >> (31 - i)) & 1;
if (bit) {
if (curr->rchild == NULL)
curr->rchild = new_trie_node(I_NODE, -1);
curr = curr->rchild;
} else {
if (curr->lchild == NULL)
curr->lchild = new_trie_node(I_NODE, -1);
curr = curr->lchild;
}
}
curr->type = M_NODE;
curr->port = port;
}

这就是动态插入的算法,用这个就可以动态插入一个IP地址,然后在查找的时候,就可以根据这个树来查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void create_tree(const char* forward_file) {
char buf[BUFSIZ], data[BUFSIZ];
FILE* fp = fopen(forward_file, "r");
if (fp == NULL) {
fprintf(stderr, "Error: cannot open file %s\n", forward_file);
exit(1);
}
trie = new_trie_node(I_NODE, -1);
while (fgets(buf, sizeof(buf), fp) != NULL) {
uint32_t mask, port;
sscanf(buf, "%s %d %d", data, &mask, &port);
uint32_t ip = ipstr_to_int(data);
trie_insert(trie, ip, mask, port);
}
fclose(fp);
}

uint32_t trie_lookup(node_t* trie, int limit, uint32_t ip) {
node_t* curr = trie;
uint32_t port = -1;
for (int i = 0; i < limit; i++) {
int bit = (ip >> (31 - i)) & 1;
if (bit) {
if (curr->rchild == NULL)
break;
curr = curr->rchild;
}
else {
if (curr->lchild == NULL)
break;
curr = curr->lchild;
}

if (curr->type == M_NODE)
port = curr->port;
}
return port;
}

2. PopTrie\text{2. PopTrie}

课程组给出了一篇论文PopTrie,这是一种Trie的变体结构,能够极大地压缩数据结构占用的空间,同时也能提高查找的效率。

我做的时候真的就对着这篇论文复现去了,然后考虑到整个查找是不带修改的,还非常强迫症地把整个数据结构都在内存中紧密地排列了起来,实现起来要了命。。。最后一问同学们才发现好多人都没按照论文去做,甚至有的把Trie变成了四叉就过了。。。

另一个坑爹的点是,原论文似乎并没有说如果查不到的话应该怎么做。。。我最后悟了半天才想明白,其实“查不到”也相当于一种答案,也作为一种“端口”记录在叶子节点上就好了。

2.1. PopTrie结构\text{2.1. PopTrie结构}

PopTrie的结构特点我总结了一下,有如下三点:

  1. 将每k位视作一个字符,这样就可以一次性进行k位的查找,极大压缩了树的高度。我的实现与论文中对于k的选择有所不同,为了能够对齐,我选择了一次性比较5位,原论文则是6位。

  2. 使用了一个“直接映射”,用来跳过前12位的比较和查找,相当于一共做了2122^{12}棵树,查找时直接找到对应的树,从第12位开始比较。

  3. 将节点分为两类,各自使用一个位图(在原论文中称为vectorleafvec)进行信息的压缩,通过popcount来高效(因为这个一般是有硬件指令支持的)地索引儿子节点,子节点是连续排列的,因此要索引的话只需要一个popcount计数。

关于第3点,可以理解为使用bitmap压缩保存了“是否有第i号儿子/叶子”的信息,然后可以将儿子/叶子节点在内存中紧密排列好,这样就可以通过popcount来得到某个儿子节点在所有儿子中的偏移量,然后就可以直接索引到这个儿子节点了。

我基本实现了上述的结构特点,不过我并不知道在紧密压缩的情况下还如何高效地插入IP地址。在写完以后我去github上找到了参考实现,发现参考实现里的树结构其实应该不是完完全全紧密排列在内存中的。。。

不过好在本次实验不需要我们进行插入,所以我也就把整棵树压缩在了一起,不过这么一来,树的构建就有些头疼了。

2.2. PopTrie构建\text{2.2. PopTrie构建}

我把PopTrie分为了两个阶段构建,第一阶段是增量式构造一个松散的32叉Trie以及其直接映射,第二阶段是将这个松散的Trie压缩成紧密的PopTrie。

这个思路似乎论文里没有提到,我学到这个野路子其实是从另外的地方(图形学里构建静态的缓存友好的空间结构,一种思路就是先松散后压缩)来的,实际实现起来有些痛苦,最后好歹还是做出来了。

3. 实现以及实验结果\text{3. 实现以及实验结果}

实际实现的时候,我首先用C语言做了一个“模板”vector类,没办法,因为这里不是C++片场,没有STL可以用,我只能自己做一个,其主要用途是管理动态的连续内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#define std_vector(T) std_vector_##T##_t

#define std_vector_define(T) \
typedef struct std_vector(T) { \
T* data; \
uint32_t size; \
uint32_t capacity; \
} std_vector(T); \

#define push_back(T) push_back_##T
#define push_back_define(T) \
static inline void push_back(T)(std_vector(T)* v, T x) {\
if (v->size >= v->capacity) { \
v->capacity = ((v->capacity) << 1); \
v->data = (T*)realloc(v->data, sizeof(T) * (v->capacity)); \
} \
v->data[v->size++] = x; \
} \

#define at(T) at_##T
#define at_define(T) \
static inline T at(T)(std_vector(T)* v, uint32_t i) { \
return v->data[i]; \
} \

#define reserve(T) reserve_##T
#define reserve_define(T) \
static inline void reserve(T)(std_vector(T)* v, uint32_t n) { \
v->data = (T*)malloc(sizeof(T) * n); \
v->capacity = n; \
} \

#define emplace_back(T) emplace_back_##T
#define emplace_back_define(T) \
static inline void emplace_back(T)(std_vector(T)* v) { \
if (v->size >= v->capacity) { \
v->capacity = ((v->capacity) << 1); \
v->data = (T*)realloc(v->data, sizeof(T) * v->capacity); \
} \
v->size++; \
} \

#define vector_init(T) vector_init_##T
#define vector_init_define(T) \
static inline void vector_init(T)(std_vector(T)* v) { \
v->data = (T*)malloc(sizeof(T)); \
v->size = 0; \
v->capacity = 1; \
} \

#define std_vector_instantiate(T) \
std_vector_define(T) \
push_back_define(T) \
at_define(T) \
reserve_define(T) \
emplace_back_define(T) \
vector_init_define(T) \

然后我实现了一个poptrie_raw类,这个类是一个松散的32叉Trie。对于前12位直接使用一个数据结构保存其查询结果。然后走4次,每次处理5个bit。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

typedef struct poptrie_raw_node {
node_type_t type;
uint32_t port;
bool has_child;
int8_t longest[1 << 5];
struct poptrie_raw_node* ch[1 << 5];
} poptrie_raw_node_t;

static node_t* direct_prefix_trie;
static node_t* direct_map_trie;

typedef poptrie_raw_node_t* poptrie_raw_node_ptr_t;
std_vector_instantiate(poptrie_raw_node_ptr_t);
static std_vector(poptrie_raw_node_ptr_t) raw_poptrie;

这里有一点儿细节,有的IP地址在插入的时候,其mask在减去12后可能不是5位对齐的,我的处理方式是将其手动补全至5位对齐,查询的结果也自然地复制,这样实际会导致相当多连续但是冗余的叶子节点,这就是接下来的leafvec重点解决的问题之一,此处暂且不表。

由于压缩带来的另一个问题是落在同一层的不同mask的处理,长的mask的查询结果会覆盖短的mask的查询结果,这也就将32路的结果分成了一段段的区间,每一段区间内的查询结果是一样的,为此我们还需要给每个分支加一个longest数组,用来记录这个分支下的区间的最长mask。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
static void poptrie_raw_insert(uint32_t ip, uint32_t mask, uint32_t port) {
uint32_t index = keep(ip, DIRECT_INDEX_BITS);
if (mask <= DIRECT_INDEX_BITS) {
trie_insert(direct_prefix_trie, index, mask, port);
return;
}
uint32_t dindex = trie_lookup(direct_map_trie, DIRECT_INDEX_BITS, index);
if (dindex == (uint32_t)-1) {
push_back(poptrie_raw_node_ptr_t)(&raw_poptrie,
new_poptrie_raw_node(I_NODE));
dindex = raw_poptrie.size - 1;
trie_insert(direct_map_trie, index, DIRECT_INDEX_BITS, dindex);
}
poptrie_raw_node_t* curr = raw_poptrie.data[dindex];
int i = DIRECT_INDEX_BITS;
for (; i + 5 < mask; i += 5) {
int value = extract(ip, i, 5);
if (curr->ch[value] == NULL) {
curr->ch[value] = new_poptrie_raw_node(I_NODE);
curr->has_child = true;
}
curr = curr->ch[value];
}
int value = extract(ip, i, 5);
int rest_prefix_length = mask - i;
for (int prefix = 0; prefix < (1 << 5); prefix++) {
if (extract(prefix, 27, rest_prefix_length) !=
extract(value, 27, rest_prefix_length))
continue;
if (curr->ch[prefix] == NULL) {
curr->has_child = true;
curr->ch[prefix] = new_poptrie_raw_node(M_NODE);
curr->longest[prefix] = rest_prefix_length;
curr->ch[prefix]->port = port;
}
if (curr->longest[prefix] == -1 || curr->longest[prefix] < rest_prefix_length) {
curr->ch[prefix]->port = port;
curr->longest[prefix] = rest_prefix_length;
}
}
}

void create_tree_advance(const char* forward_file) {
char buf[BUFSIZ], data[BUFSIZ];
FILE* fp = fopen(forward_file, "r");
if (fp == NULL) {
fprintf(stderr, "Error: cannot open file %s\n", forward_file);
exit(1);
}
memset(pop_trie.D, 0xff, sizeof(pop_trie.D));
vector_init(poptrie_raw_node_ptr_t)(&raw_poptrie);
vector_init(poptrie_node_t)(&pop_trie.N);
vector_init(char)(&pop_trie.L);
direct_map_trie = new_trie_node(I_NODE, -1);
direct_prefix_trie = new_trie_node(I_NODE, -1);
while (fgets(buf, sizeof(buf), fp) != NULL) {
uint32_t mask, port;
sscanf(buf, "%s %d %d", data, &mask, &port);
uint32_t ip = ipstr_to_int(data);
poptrie_raw_insert(keep(ip, mask), mask, port);
}
fclose(fp);

memset(pop_trie.D, 0xff, sizeof(pop_trie.D));
for (uint32_t ip = 0; ip < (1 << DIRECT_INDEX_BITS); ip++) {
uint32_t dindex = trie_lookup(direct_map_trie,
DIRECT_INDEX_BITS,
ip << (32 - DIRECT_INDEX_BITS));
if (dindex == (uint32_t)-1) {
dindex = trie_lookup(direct_prefix_trie,
DIRECT_INDEX_BITS,
ip << (32 - DIRECT_INDEX_BITS));
// assume: there are no more tha 2^30 ports
// so if dindex is -1, it indicates that no entry is found
pop_trie.D[ip] = dindex | (1ul << 31);
}
else {
poptrie_raw_node_t* raw = raw_poptrie.data[dindex];
char dans = trie_lookup(direct_prefix_trie,
DIRECT_INDEX_BITS,
ip << (32 - DIRECT_INDEX_BITS));
if (pop_trie.D[ip] == -1) {
emplace_back(poptrie_node_t)(&pop_trie.N);
pop_trie.D[ip] = pop_trie.N.size - 1;
poptrie_flatten(raw, pop_trie.D[ip], dans);
}
}
}
free_trie(direct_map_trie);
free_trie(direct_prefix_trie);
}

这里为了记录“从前12位到raw节点的直接映射”,我再一次利用了一个trie来记录这个映射,这个trie的构建和查询都和普通的trie一样,只是在查询的时候,我们需要把查询结果再次映射到raw_poptrie上。

如果没有查到直接映射,那么我们就需要新建一个raw_poptrie的节点,然后再往下走,直到走到最后一层,我们就可以把这个节点标记为M_NODE,并且记录下这个节点的port,同时我们还需要记录下这个节点的longest,这个longest就是用来记录这个节点下的区间的最长mask。

接下来就是压缩的过程了,这个过程就是把raw_poptrie中的节点压缩到一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static void poptrie_flatten(poptrie_raw_node_t* raw, int index, char ans) {
uint32_t base[2] = {-1, -1}, vector = 0, leafvec = 0;
char last_port = -2;
char new_ans[32];
for (int i = 0; i < (1 << 5); i++) {
char cur_port = (raw->ch[i] == NULL || raw->ch[i]->port == -1) ? ans : raw->ch[i]->port;
if (raw->ch[i] == NULL || !raw->ch[i]->has_child) {
if (cur_port != last_port) {
push_back(char)(&pop_trie.L, cur_port);
if (base[0] == (uint32_t)(-1))
base[0] = pop_trie.L.size - 1;
leafvec |= (1ull << i);
last_port = cur_port;
}
}
if (raw->ch[i] == NULL) continue;
if (raw->ch[i]->has_child) {
emplace_back(poptrie_node_t)(&pop_trie.N);
if (base[1] == (uint32_t)(-1))
base[1] = pop_trie.N.size - 1;
new_ans[i] = cur_port;
vector |= (1ull << i);
}
}
pop_trie.N.data[index].base[0] = base[0];
pop_trie.N.data[index].base[1] = base[1];
pop_trie.N.data[index].vector = vector;
pop_trie.N.data[index].leafvec = leafvec;
for (int i = 0; i < 32; i++)
if (vector & (1 << i)) {
poptrie_flatten(raw->ch[i],
base[1] + popcount(vector & ((2ull << i) - 1)) - 1, new_ans[i]);
}
free(raw);
raw = NULL;
}

在压缩的时候,我们就需要重点找出这些区间,并作出划分,将其在leafvec上进行标记。

最后的查询基本上就是和论文保持一致了,由于我把查不到的情况记为-1也当成一种“端口”,所以查询的时候也不怎么需要特殊处理。

此外我还有一个小小的处理,对于直接映射的节点,如果是能直接查到答案的,我会把最高位标记为1。如此一来,就可以区分直接映射是直接映射到了答案还是映射到了节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static uint32_t poptrie_lookup(uint32_t ip) {
uint32_t index = extract(ip, 0, DIRECT_INDEX_BITS);
uint32_t dindex = pop_trie.D[index];
if ((dindex & (1ul << 31))) {
if (dindex == -1)
return -1;
return dindex & ((1ul << 31) - 1);
}
index = dindex;
uint32_t offset = DIRECT_INDEX_BITS;
uint32_t value = extract(ip, offset, 5);
uint32_t vector = pop_trie.N.data[index].vector;
uint32_t base;
uint32_t bc;
while (vector & (1ull << value)) {
base = pop_trie.N.data[index].base[1];
bc = popcount(vector & ((2ull << value) - 1));
index = base + bc - 1;
offset += 5;
value = extract(ip, offset, 5);
vector = pop_trie.N.data[index].vector;
}
base = pop_trie.N.data[index].base[0];
if (base == -1)
return -1;
bc = popcount(pop_trie.N.data[index].leafvec & ((2ull << value) - 1));
return pop_trie.L.data[base + bc - 1];
}
1
2
3
4
5
6
7
8
9
10
11
Constructing the basic tree......
Reading data from basic lookup table......
Looking up the basic port......
Constructing the advanced tree......
Reading data from advanced lookup table......
Looking up the advanced port......
Dumping result......
basic_pass-1
basic_lookup_time-18750us
advance_pass-1
advance_lookup_time-3195us

虽然树的构建花了很多很多时间,但是考虑查询操作,我们还是能够看到PopTrie的效率是非常高的,耗时是暴力trie的六分之一。

4. 总结\text{4. 总结}

这次实验的目的就是优化IP查找的过程,使得查找的效率更高。我们通过实现了一个朴素的trie树,然后又实现了一个PopTrie,其中对着论文复现PopTrie的过程虽然很艰难,但是我也通过自己的思考解决了一些没有在论文中找到答案的问题,最后实现了一个高效的IP查找算法,还是相当有成就感。

另外,这次实验也让我想起来了之前和一个学长谈论性能优化的经验:很多时候压缩空间就是提高缓存命中率,也就是提高性能的一种很重要的方式。这次实验也让我对这个观点有了更深的理解。