重头戏
这已经是第三阶段了,然而我连网络层实验都还没怎么做完,好在做了最基本的路由器实验,已经足够写这个部分了。
这三次实验的目的是,实现一个功能相对完整的 TCP/IP 协议栈。
回忆我们在网络层进行的实验,使得两个处于不同网络中的主机可以互相传送数据包了。
但是这并不够,在IP协议的基础上,TCP协议提供了可靠的全双工的传输服务,从而为更上层的应用层打下了基础。
个人认为,这三次实验就是整个实验课的最高潮了,前面所有的积累都是为了这三个实验上一气呵成打造相对完整的TCP
协议栈,打下网络协议栈的最后一块砖。
Lab-1. 基础协议栈 \text{Lab-1. 基础协议栈} Lab-1. 基础协议栈
本次实验不考虑丢包和拥塞控制,只要求实现最基本的传输。
1. Connection \text{1. Connection} 1. Connection
TCP的连接建立部分,服务器listen
和accept
,监听并且接受客户端通过connect
发起的连接建立请求,从而建立起一个TCP
套接字,维护连接的状态。
这里有如下几步:
服务器需要一个父套接字持续监听请求
客户端发送SYN
,服务器接收到之后,创建一个子套接字用于与该客户端进行持续连接,发送SYN-ACK
,同时很重要的一步是将该套接字的信息进行哈希,以便后续能够查找到。
客户端接收到SYN-ACK
之后,发送ACK
,连接建立完成
这个过程中,两方都会随机一个数作为序列号的“起点”,并且在SYN
和ACK
中发送给对方,用于后续的数据包的序列号。
2. Transmission \text{2. Transmission} 2. Transmission
这里涉及到各种seq
,ack
等字段的更新,如果完全分情况讨论非常复杂,仔细理解可以总结出一定的规律如下:
在发送带SYN
,FIN
或者数据的包时,都需要更新seq
字段,可以理解为“主动发送数据/控制信息的序列号”
在接收到数据包时,如果是新数据则需要更新ack
字段并回复,这也是ack
的字面意义,“告知收到”
然而最麻烦的还是滑动窗口的更新,在本次实验中,由于我们暂不考虑丢包等等,这个问题还能得到些简化,基本上就是发过来的包都是新的,把滑动窗口的左边界直接更新到ack
的值即可。
由于这实验中,都是很小很小的数据包,所以完全不用考虑缓冲区的问题,直接发送就行。
3. File Transmission \text{3. File Transmission} 3. File Transmission
大文件传输和小消息的重大区别就是缓冲区不够用了,如果一个一个包发送会极其慢,无法满足时限要求,因此我们必须一次性大批量地连续发送数据。
这就会导致一个情况,也就是传输数据的速度很有可能快于对方接受数据的速度。滑动窗口最大的大小和留给上层应用的缓冲区容量是一致的。我们发送的数据首先会存在对方的缓冲区中,并不一定立即得到处理,也不一定立即能回复ack
。
因此为了防止发送的数据溢出对方缓冲区,发送方就需要通过滑动窗口来控制发送的数据量。
具体而言, 在接收方从缓冲区中取出数据并且处理之后,缓冲区空间增加,相应的能处理的数据量也增加了,此时向发送方回复的包中,不止含有新的ack
,同时也会告知发送方更新后的接收窗口大小,发送方在接收到之后,便可以根据该字段来调整发送窗口的大小。发送方的滑动窗口只会在主动发送数据包和收到ack
时更新。
每发送一个包,发送方的snd_nxt
都会增加,同时滑动窗口会减少相应的长度,因为没有收到对方的ack
,就应该认为数据还在缓冲区中,能够发送的数据量就会减少。
4. Disconnection \text{4. Disconnection} 4. Disconnection
TCP的连接断开是一个比较复杂的过程,因为需要保证数据的完整性,所以需要经过四次握手。
主动关闭方发送FIN
,并且进入FIN_WAIT_1
状态
被动关闭方接收到FIN
,发送ACK
,并且进入CLOSE_WAIT
状态
被动关闭方发送FIN
,并且进入LAST_ACK
状态
主动关闭方接收到FIN
,发送ACK
,并且进入TIME_WAIT
状态
在TIME_WAIT
状态持续一段时间之后,连接才会真正断开,这需要通过一个定时器实现。实际上的定时器会有两种,这里使用的是其中的一种,另一种用于超时重传,在下面的可靠传输部分会详细介绍。
5. Implementation \text{5. Implementation} 5. Implementation
5.1. Socket \text{5.1. Socket} 5.1. Socket
首先看顶层的socket实现。
1 2 3 4 5 6 7 8 9 10 11 int tcp_sock_bind (struct tcp_sock* tsk, struct sock_addr* skaddr) { int err = 0 ; tcp_set_state(tsk, TCP_CLOSED); err = tcp_sock_set_sport(tsk, ntohs(skaddr->port)); return err; }
bind的实现还是非常简单的,只需要设置端口号即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int tcp_sock_connect (struct tcp_sock* tsk, struct sock_addr* skaddr) { tsk->sk_sip = ((iface_info_t *)(instance->iface_list.next))->ip; tsk->sk_sport = tcp_get_port(); tsk->sk_dip = ntohl(skaddr->ip); tsk->sk_dport = ntohs(skaddr->port); tcp_bind_hash(tsk); tcp_set_state(tsk, TCP_SYN_SENT); tcp_hash(tsk); tcp_send_control_packet(tsk, TCP_SYN); sleep_on(tsk->wait_connect); if (tsk->state == TCP_ESTABLISHED) return 0 ; else return -1 ; }
connect的实现如前所述,客户端方需要主动发送SYN
,切换自身状态,并且等待对方的SYN-ACK
,这里通过sleep_on
来实现等待。
1 2 3 4 5 6 7 8 9 10 int tcp_sock_listen (struct tcp_sock* tsk, int backlog) { tsk->backlog = backlog; tcp_set_state(tsk, TCP_LISTEN); tcp_hash(tsk); return 0 ; }
listen初始化了服务器的状态,即创建一个父套接字,用于持续监听请求,同时设置了最大的连接请求队列长度。
1 2 3 4 5 6 7 8 9 10 11 12 struct tcp_sock* tcp_sock_accept (struct tcp_sock* tsk) { if (list_empty(&tsk->accept_queue)) { log (DEBUG, "tcp sock accept queue is empty, sleep on wait_accept." ); sleep_on(tsk->wait_accept); } struct tcp_sock * socket = tcp_sock_accept_dequeue(tsk); return socket; }
对于accept
来说,我们认为协议栈会把发送过来的连接请求放到一个队列中,如果队列为空则需要等待,否则直接返回队列中的第一个套接字,这就是连接到服务器的客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void tcp_sock_close (struct tcp_sock* tsk) { log (DEBUG, "closing tcp sock." ); switch (tsk->state) { case TCP_ESTABLISHED: tcp_send_control_packet(tsk, TCP_FIN | TCP_ACK); tcp_set_state(tsk, TCP_FIN_WAIT_1); break ; case TCP_CLOSE_WAIT: tcp_send_control_packet(tsk, TCP_FIN | TCP_ACK); tcp_set_state(tsk, TCP_LAST_ACK); break ; case TCP_SYN_RECV: tcp_send_control_packet(tsk, TCP_RST); tcp_set_state(tsk, TCP_CLOSED); wake_up(tsk->wait_connect); wake_up(tsk->wait_recv); wake_up(tsk->wait_send); wake_up(tsk->wait_accept); break ; default : log (ERROR, "tcp sock state error." ); exit (1 ); } }
这个关闭基本上是按照四次挥手的过程来的,根据不同的状态发送不同的信息并设置新的状态,需要注意的是关闭的时候需要唤醒所有等待的条件来彻底释放资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 int tcp_sock_read (struct tcp_sock* tsk, char * buf, int len) { int read_len = 0 ; while (is_buffer_empty(tsk->rcv_buf) && tsk->state == TCP_ESTABLISHED) { log (DEBUG, "sleep on recv" ); sleep_on(tsk->wait_recv); } pthread_mutex_lock(&tsk->rcv_buf->lock); log (DEBUG, "wake up from recv" ); if (tsk->state == TCP_CLOSE_WAIT && ring_buffer_empty(tsk->rcv_buf)) { pthread_mutex_unlock(&tsk->rcv_buf->lock); return 0 ; } int newly_read_len = read_ring_buffer(tsk->rcv_buf, buf, len); log (DEBUG, "read %d bytes from ring buffer" , newly_read_len); read_len += newly_read_len; tsk->rcv_wnd = ring_buffer_free(tsk->rcv_buf); tcp_send_control_packet(tsk, TCP_ACK); pthread_mutex_unlock(&tsk->rcv_buf->lock); return read_len; }
读取数据的时候,需要注意的是,如果缓冲区为空,需要等待,这里通过sleep_on
实现。读取完数据之后,需要更新接收窗口的大小,并且发送ack
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 int tcp_sock_write (struct tcp_sock* tsk, char * buf, int len) { int sent_len = 0 ; while (sent_len < len) { while (tsk->snd_wnd == 0 && tsk->state == TCP_ESTABLISHED) sleep_on(tsk->wait_send); if (tsk->state == TCP_CLOSED) return -1 ; int send_len = min(tsk->snd_una + tsk->snd_wnd - tsk->snd_nxt, len - sent_len); send_len = min( send_len, 1514 - ETHER_HDR_SIZE - IP_BASE_HDR_SIZE - TCP_BASE_HDR_SIZE); if (!send_len) continue ; char * packet_buf = malloc (send_len + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE); char * data = packet_buf + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; memcpy (data, buf + sent_len, send_len); tcp_send_packet( tsk, packet_buf, send_len + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE); sent_len += send_len; } return len; }
写入数据的时候,需要注意的是,如果发送窗口为0,需要等待,这里通过sleep_on
实现。然后需要非常仔细地计算发送数据的长度,确保我们发送的数据包不会超过一个包的最大长度。如果一次发送不完,需要循环发送。
和以前一样,发送一个包会释放其内存,所以需要拷贝一份发送。
接下来是协议栈本身的实现。首先是实现一个函数专门用于确认收到的包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static void ack_data_packet (struct tcp_sock* tsk, struct tcp_cb* cb, char * packet) { char * data = packet + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; log (DEBUG, "ack data packet, seq: %d, ack: %d" , cb->seq, cb->ack); tcp_update_window_safe(tsk, cb); int data_len = cb->pl_len; int offset = tsk->rcv_nxt - cb->seq; data_len -= offset; if (data_len > 0 ) { pthread_mutex_lock(&tsk->rcv_buf->lock); log (DEBUG, "write data to rcv_buf, offset: %d, data_len: %d" , offset, data_len); bool old_empty = ring_buffer_empty(tsk->rcv_buf); write_ring_buffer(tsk->rcv_buf, data + offset, data_len); tsk->rcv_wnd = ring_buffer_free(tsk->rcv_buf); if (old_empty && !ring_buffer_empty(tsk->rcv_buf)) { log (DEBUG, "buffer is not empty, wake up receiving" ); wake_up(tsk->wait_recv); } log (DEBUG, "write data to rcv_buf succeeded, rcv_wnd: %d" , tsk->rcv_wnd); tsk->rcv_nxt = cb->seq_end; pthread_mutex_unlock(&tsk->rcv_buf->lock); } }
这里需要记得一点,缓存为空是应用线程睡眠的条件,因此破坏该条件时也要唤醒应用线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void tcp_process (struct tcp_sock* tsk, struct tcp_cb* cb, char * packet) { assert(tsk); log (DEBUG, "handle tcp packet: flags = %s, socket state = %s, ack = %d, seq = %d, rwnd = %d" , tcp_flags_str(cb->flags), tcp_state_str[tsk->state], cb->ack, cb->seq, cb->rwnd); if (cb->flags & TCP_RST) { tcp_sock_close(tsk); return ; } if (cb->flags & TCP_ACK) tcp_handle_ack(tsk, cb, packet); if (cb->flags & TCP_SYN) tcp_handle_syn(tsk, cb, packet); if (cb->flags & TCP_FIN) tcp_handle_fin(tsk, cb, packet); }
根据收到的包的类型,调用不同的处理函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 static void tcp_handle_ack (struct tcp_sock* tsk, struct tcp_cb* cb, char * packet) { assert(cb->flags & TCP_ACK); if (tsk->state == TCP_SYN_RECV) { if (cb->ack == tsk->snd_nxt) { tsk->snd_una = cb->ack; tcp_set_state(tsk, TCP_ESTABLISHED); tcp_hash(tsk); tcp_sock_accept_enqueue(tsk); wake_up(tsk->parent->wait_accept); } else log (ERROR, "received unexpected packet, drop it." ); } else if (tsk->state == TCP_SYN_SENT) { if (cb->ack == tsk->snd_nxt) { tsk->snd_una = cb->ack; tsk->rcv_nxt = cb->seq_end; tcp_update_window(tsk, cb); } else log (ERROR, "received unexpected packet, drop it." ); } else if (tsk->state == TCP_ESTABLISHED) { if (is_tcp_seq_valid(tsk, cb)) { log (DEBUG, "Received valid packet, seq: %d, ack: %d" , cb->seq, cb->ack); log (DEBUG, "Current receiving window from %d to %d" , tsk->rcv_nxt, tsk->rcv_nxt + tsk->rcv_wnd); tsk->snd_una = cb->ack; ack_data_packet(tsk, cb, packet); log (DEBUG, "Current receiving window from %d to %d" , tsk->rcv_nxt, tsk->rcv_nxt + tsk->rcv_wnd); log (DEBUG, "Current sending window from %d to %d" , tsk->snd_una, tsk->snd_una + tsk->snd_wnd); } else log (ERROR, "received packet with invalid seq, drop it." ); } else if (tsk->state == TCP_FIN_WAIT_1) { if (cb->ack == tsk->snd_nxt) { tsk->snd_una = cb->ack; tcp_set_state(tsk, TCP_FIN_WAIT_2); } else log (ERROR, "received unexpected packet, drop it." ); } else if (tsk->state == TCP_FIN_WAIT_2) { if (is_tcp_seq_valid(tsk, cb)) { tcp_update_window_safe(tsk, cb); tsk->rcv_nxt = cb->seq_end; wake_up(tsk->wait_send); } else log (ERROR, "received packet with invalid seq, drop it." ); } else if (tsk->state == TCP_LAST_ACK) { if (cb->ack == tsk->snd_nxt) { tsk->snd_una = cb->ack; tcp_set_state(tsk, TCP_CLOSED); } else log (ERROR, "received unexpected packet, drop it." ); } else if (tsk->state == TCP_TIME_WAIT) { if (cb->ack == tsk->snd_nxt) { tsk->snd_una = cb->ack; tcp_set_state(tsk, TCP_CLOSED); } else log (ERROR, "received unexpected packet, drop it." ); } else if (tsk->state == TCP_CLOSING) { if (cb->ack == tsk->snd_nxt) { tsk->snd_una = cb->ack; tcp_set_state(tsk, TCP_TIME_WAIT); tcp_set_timewait_timer(tsk); } else log (ERROR, "received unexpected packet, drop it." ); } else log (ERROR, "received unexpected packet, drop it." ); }
在不同的状态下,收到ack
的处理是不同的。除了几乎每个状态都需要的切换状态以外,在RECV_SYNC
状态下收到ack
我们需要初始化一个新的套接字,而在ESTABLISHED
状态下收到ack
我们需要确认收到的包。TIME_WAIT
状态下收到ack
则需要开启定时器准备关闭连接。由于ack
包是可能承载数据的,我们计算一个ack
包中实际数据的长度,根据其是否为0来判断是否为数据包。
接下来是处理其他类型的包,比如SYN
,FIN
等等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static void tcp_handle_syn (struct tcp_sock* tsk, struct tcp_cb* cb, char * packet) { assert(cb->flags & TCP_SYN); if (tsk->state == TCP_LISTEN) { struct tcp_sock * csk = alloc_tcp_sock(); csk->sk_sip = cb->daddr; csk->sk_sport = cb->dport; csk->sk_dip = cb->saddr; csk->sk_dport = cb->sport; csk->parent = tsk; csk->rcv_nxt = cb->seq_end; csk->iss = tcp_new_iss(); csk->snd_nxt = csk->iss; csk->snd_una = csk->iss; csk->rcv_wnd = TCP_DEFAULT_WINDOW; csk->snd_wnd = cb->rwnd; tcp_set_state(csk, TCP_SYN_RECV); tcp_hash(csk); tcp_send_control_packet(csk, TCP_SYN | TCP_ACK); list_add_tail(&csk->list , &tsk->listen_queue); } else if (tsk->state == TCP_SYN_SENT) { tcp_set_state(tsk, TCP_ESTABLISHED); tcp_hash(tsk); tcp_send_control_packet(tsk, TCP_ACK); wake_up(tsk->wait_connect); } else log (ERROR, "received unexpected packet, drop it." ); }
面对对方发来的SYN
包,目前我们只需要考虑两种可能性, 一种是服务器收到SYN
包,这时候需要创建一个新的套接字,然后发送SYN-ACK
包,另一种是客户端收到SYN-ACK
包,这时候需要确认收到,并且唤醒等待的条件,并且回复ACK
包。
最后是考虑FIN
包,其实FIN
和SYN
处理都是比较简单的,因为这两种包主要出现于连接的建立和断开阶段,用处都比较特定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 static void tcp_handle_fin (struct tcp_sock* tsk, struct tcp_cb* cb, char * packet) { assert(cb->flags & TCP_FIN); if (tsk->state == TCP_ESTABLISHED) { tsk->rcv_nxt = cb->seq_end; tcp_send_control_packet(tsk, TCP_ACK); tcp_set_state(tsk, TCP_CLOSE_WAIT); wake_up(tsk->wait_connect); wake_up(tsk->wait_recv); wake_up(tsk->wait_send); wake_up(tsk->wait_accept); } else if (tsk->state == TCP_FIN_WAIT_1) { tsk->rcv_nxt = cb->seq_end; tcp_send_control_packet(tsk, TCP_ACK); tcp_set_state(tsk, TCP_CLOSING); } else if (tsk->state == TCP_FIN_WAIT_2) { tsk->rcv_nxt = cb->seq_end; tcp_send_control_packet(tsk, TCP_ACK); tcp_set_state(tsk, TCP_TIME_WAIT); tcp_set_timewait_timer(tsk); } else if (tsk->state == TCP_LAST_ACK) { tsk->rcv_nxt = cb->seq_end; tcp_send_control_packet(tsk, TCP_ACK); tcp_set_state(tsk, TCP_CLOSED); tcp_unhash(tsk); } else log (ERROR, "received unexpected packet, drop it." ); }
比较特殊的地方就是注意一个unhash
和一个设置定时器。
最后本次实验使用到的一个定时器,用于处理TIME_WAIT
状态下的连接关闭。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void tcp_scan_timer_list () { struct tcp_timer *pos , *q ; pthread_mutex_lock(&timer_lock); list_for_each_entry_safe(pos, q, &timer_list, list ) { pos->timeout -= TCP_TIMER_SCAN_INTERVAL; if (pos->timeout <= 0 ) { struct tcp_sock *tsk = timewait_to_tcp_sock(pos); if (tsk->state == TCP_TIME_WAIT) { tcp_set_state(tsk, TCP_CLOSED); tcp_unhash(tsk); } list_delete_entry(&pos->list ); } } pthread_mutex_unlock(&timer_lock); } void tcp_set_timewait_timer (struct tcp_sock *tsk) { pthread_mutex_lock(&timer_lock); struct tcp_timer *timewait_timer = &tsk->timewait; timewait_timer->type = 0 ; timewait_timer->timeout = TCP_TIMEWAIT_TIMEOUT; timewait_timer->enable = 1 ; list_add_tail(&timewait_timer->list , &timer_list); pthread_mutex_unlock(&timer_lock); }
我们之前没有提到计时器列表的老化操作,不过对于本次实验来说倒也没啥必要,因为就一个套接字,自然也就只有一个定时器,老不老化都比较无所谓的那种。
5. Pits \text{5. Pits} 5. Pits
一样的,作为TCP协议栈的开山之作,本次实验码量依然不小,而且还是有大量的坑。
不知道是我的实现上有问题还是别的问题,wake_up
有时可能会出现假唤醒,所以注意循环检查条件。
环形缓存一定要注意操作的原子性,为保证线程安全,需要加锁。但是锁一定要放在ring_buffer
结构体里数据区的前面…否则初始化ring_buffer的时候一个内存分配就给把锁给覆盖了。
窗口更新什么的细节就不说了,这个真的只能靠自己慢慢去仔细覆盖每种情况的所有操作。我能想到的最暴力的办法就是直接抓reference给他彻底看明白并且对上。
由于没有实现丢包机制,绝不可一次性大量发送数据包,这样会导致bufferfloat
进而导致丢包,导致两方传输出现异常。我的策略是发送一定长度就睡眠一会儿。
性能是个很关键的问题,本次实验在文件传输上有着10
秒的时间限制,因此不能睡得时间太长,也不能让每两次睡眠之间的发送长度太短。
6. Summary \text{6. Summary} 6. Summary
本次实验实现了一个基本的TCP协议栈,暂时不包括丢包和拥塞控制,但是已经能够实现基本的连接建立,数据传输和连接断开。后续的实验都将在这个基础上进行。
Lab-2. 可靠传输 \text{Lab-2. 可靠传输} Lab-2. 可靠传输
实际网络中,丢包是难以避免的,所以我们需要实现可靠传输,这就需要我们对于丢包进行检测,并且在适当的时间重新发送丢失的数据包。
这里最主要的模块有三个:重传定时器,发送包队列和乱序包缓冲。
1. Sender \text{1. Sender} 1. Sender
对于发送方而言,由于可能存在重新传输数据包的情况,所以传完一个包以后不能就把它丢了,还得先缓存起来,等到收到相应的ack
时再释放。
控制重传的大头就是重传定时器,当发送一个新的包时,如果定时器没有启动则需要启动定时器,在收到一个新的ack
时则需要重置定时器。如果定时器超时则需要从发送队列中取出队头的包重新发送。
如果重复性地丢包,对于一个包我们会重传至多三次,每次的超时时间都会加倍,如果超过三次超时那大概是连不上了,发送RST
包断开连接。
2. Receivers \text{2. Receivers} 2. Receivers
一旦发生丢包,接收方收到的包就会出现乱序的情况,也就不能按顺序直接填入缓冲区了,这就需要一个乱序包缓冲。用于将seq
超过当前rcv_nxt
的包全部存起来,等到缺失的包到来时再从该缓冲中按顺序依次ack
数据包并且填入缓冲区。当我们收到一个新的包并且更新ack
的时候,便可以把乱序缓冲中可以ack
的包给ack
掉。
在本次实验中,我采用了非常暴力的方式管理乱序缓冲,用一个链表存储并且插入时按顺序保持有序,虽然很慢,但是考虑到丢包的情形并不多,这不会严重损害性能。
3. Implementation \text{3. Implementation} 3. Implementation
有三个重要的新模块,重传定时器,发送包队列和乱序包缓冲。
首先看乱序包缓冲的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 static void ack_ofo_packets (struct tcp_sock *tsk) { while (!list_empty(&tsk->rcv_ofo_buf)) { struct tcp_ofo_packet *ofo_packet_iter = list_entry(tsk->rcv_ofo_buf.next, struct tcp_ofo_packet, list ); if (ofo_packet_iter->cb.seq > tsk->rcv_nxt) break ; if (ofo_packet_iter->cb.seq_end >= tsk->rcv_nxt) { log (DEBUG, "ack ofo packet, seq: %d, ack: %d" , ofo_packet_iter->cb.seq, ofo_packet_iter->cb.ack); ack_data_packet(tsk, &ofo_packet_iter->cb, ofo_packet_iter->packet); } struct tcp_ofo_packet *ofo_packet_iter_q = ofo_packet_iter; ofo_packet_iter = list_entry(ofo_packet_iter->list .next, struct tcp_ofo_packet, list ); list_delete_entry(&ofo_packet_iter_q->list ); free_ofo_packet(ofo_packet_iter_q); } } static void pend_ofo_packet (struct tcp_sock *tsk, struct tcp_cb *cb, char *packet) { assert(cb->seq > tsk->rcv_nxt); log (DEBUG, "pend packet, seq: %d, ack: %d" , cb->seq, cb->ack); struct tcp_ofo_packet *ofo_packet = malloc (sizeof (struct tcp_ofo_packet)); int packet_len = cb->pl_len + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; ofo_packet->packet = malloc (packet_len); memcpy (ofo_packet->packet, packet, packet_len); memcpy (&ofo_packet->cb, cb, sizeof (struct tcp_cb)); insert_ofo_packet(ofo_packet, &tsk->rcv_ofo_buf); }
没啥好说的,就一个有序的链表,插入时保持有序,一旦snd_una
发生更新,都需要从头开始顺序查一遍列表,把可以ack
的包ack
掉。
接下来是发送缓存的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static void update_send_buffer (struct tcp_sock *tsk, struct tcp_cb *cb) { pthread_mutex_lock(&tsk->send_lock); struct pending_packet *pos , *q ; list_for_each_entry_safe(pos, q, &tsk->send_buf, list ) { if (less_than_32b(cb->ack, pos->seq_end)) break ; list_delete_entry(&pos->list ); free (pos->packet); free (pos); } pthread_mutex_unlock(&tsk->send_lock); }
也是一个道理,收到ack
之后,把ack
掉的包从发送缓存中删除。
同时需要对几个发送的函数也稍作修改,以tcp_send_control_packet
为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void tcp_send_control_packet (struct tcp_sock *tsk, u8 flags) { assert(tsk->retrans_timer.type == 1 ); int pkt_size = ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; char *packet = malloc (pkt_size); if (!packet) { log (ERROR, "malloc tcp control packet failed." ); return ; } struct iphdr *ip = packet_to_ip_hdr(packet); struct tcphdr *tcp = (struct tcphdr *)((char *)ip + IP_BASE_HDR_SIZE); u16 tot_len = IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; ip_init_hdr(ip, tsk->sk_sip, tsk->sk_dip, tot_len, IPPROTO_TCP); tcp_init_hdr(tcp, tsk->sk_sport, tsk->sk_dport, tsk->snd_nxt, tsk->rcv_nxt, flags, tsk->rcv_wnd); tcp->checksum = tcp_checksum(ip, tcp); log (DEBUG, "send control packet, flags: %s, seq: %d, ack: %d, rwnd: %d" , tcp_flags_str(flags), tsk->snd_nxt, tsk->rcv_nxt, tsk->rcv_wnd); if (flags & (TCP_SYN | TCP_FIN)) { insert_control_send_buffer(tsk, packet, pkt_size); tsk->snd_nxt += 1 ; tcp_set_retrans_timer(tsk); } ip_send_packet(packet, pkt_size); }
如果是“会导致seq
增大的包”,或者说是我们“主动发送”的包,需要更新seq
,将其插入发送缓存,并且设置重传定时器。
重传定时器的实现如下,一定要注意的是,重传定时器维护的是一段数据包的超时情况,这个维护直到发送的包全部被ack
或者因为丢包触发超时而终止,而不是说每发一个包都要维护一个重传定时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 void tcp_set_retrans_timer (struct tcp_sock *tsk) { pthread_mutex_lock(&timer_lock); struct tcp_timer *retrans_timer = &tsk->retrans_timer; assert(retrans_timer->type == 1 ); if (retrans_timer->enable) { pthread_mutex_unlock(&timer_lock); return ; } log (DEBUG, "set retrans timer" ); retrans_timer->enable = 1 ; retrans_timer->timeout = TCP_RETRANS_INTERVAL_INITIAL; list_add_tail(&retrans_timer->list , &timer_list); pthread_mutex_unlock(&timer_lock); } void tcp_try_update_retrans_timer (struct tcp_sock *tsk) { pthread_mutex_lock(&timer_lock); struct tcp_timer *retrans_timer = &tsk->retrans_timer; assert(retrans_timer->type == 1 ); if (retrans_timer->enable == 0 ) { pthread_mutex_unlock(&timer_lock); return ; } log (DEBUG, "try update retrans timer" ); retrans_timer->timeout = TCP_RETRANS_INTERVAL_INITIAL; pthread_mutex_unlock(&timer_lock); } void tcp_unset_retrans_timer (struct tcp_sock *tsk) { pthread_mutex_lock(&timer_lock); struct tcp_timer *retrans_timer = &tsk->retrans_timer; assert(retrans_timer->type == 1 ); if (retrans_timer->enable == 0 ) { pthread_mutex_unlock(&timer_lock); return ; } log (DEBUG, "unset retrans timer" ); retrans_timer->enable = 0 ; list_delete_entry(&retrans_timer->list ); pthread_mutex_unlock(&timer_lock); }
这几个函数还是比较好理解的。这里我做了一个处理是,如果一个重传定时器处于启用状态,那么就将其放在计时器列表里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void tcp_scan_timer_list () { struct tcp_timer *pos , *q ; pthread_mutex_lock(&timer_lock); list_for_each_entry_safe(pos, q, &timer_list, list ) { if (pos->type == 0 ) handle_timewait_timer(pos); else if (pos->type == 1 ) { assert(pos->enable == 1 ); handle_retrans_timer(pos); } else log (ERROR, "Unknown timer type %d" , pos->type); } pthread_mutex_unlock(&timer_lock); }
接下来是处理重传定时器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 static bool handle_retrans_timeout (struct tcp_sock *tsk) { log (DEBUG, "retrans timeout" ); pthread_mutex_lock(&tsk->send_lock); log (DEBUG, "current snd_una: %d" , tsk->snd_una); while (!list_empty(&tsk->send_buf)) { struct pending_packet *pos = list_entry(tsk->send_buf.next, struct pending_packet, list ); if (less_or_equal_32b(pos->seq_end, tsk->snd_una)) { list_delete_entry(&pos->list ); free (pos->packet); free (pos); } else break ; } if (list_empty(&tsk->send_buf)) { pthread_mutex_unlock(&tsk->send_lock); return true ; } struct pending_packet *pos = list_entry(tsk->send_buf.next, struct pending_packet, list ); char *packet = pos->packet; if (pos->retrans_times > TCP_MAX_RETRANS) { log (DEBUG, "max retrans times reached, terminate connection" ); tcp_terminate(tsk); pthread_mutex_unlock(&tsk->send_lock); return true ; } pos->retrans_times++; tsk->retrans_timer.timeout = TCP_RETRANS_INTERVAL_INITIAL << pos->retrans_times; int packet_len = pos->len + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; assert(less_than_32b(tsk->snd_una, pos->seq_end)); log (DEBUG, "retrans packet, seq: %d, seq_end: %d" , pos->seq, pos->seq_end); log (DEBUG, "retrans timeout is now set to %d" , tsk->retrans_timer.timeout); pos->packet = move_and_ip_send_packet(packet, packet_len); pthread_mutex_unlock(&tsk->send_lock); return false ; }
首先为了保险起见,依然是把ack
掉的包从发送缓存中删除。然后是重传的逻辑,如果重传次数太多就直接断开连接,否则就重传。
1 2 3 4 5 6 7 static char * move_and_ip_send_packet (char * packet, int packet_len) { char * moved_packet = (char *)malloc (packet_len); assert(moved_packet != NULL ); memcpy (moved_packet, packet, packet_len); ip_send_packet(packet, packet_len); return moved_packet; }
因为ip_send_packet会释放掉packet,所以我们还是需要先拷贝一份,在此之后原来的packet就不可再使用,这很像std::move
,即所谓的移动语义,因此函数名是move_and_ip_send_packet
。
4. Pits and Falls \text{4. Pits and Falls} 4. Pits and Falls
一如既往的,这次实验踩的坑也不少。
同样的,还是要比较次数时的比较带不带等号,我因为多了一次比较导致一遇到丢包就会多等一次重传,严重损害了性能。
删除定时器的时候一定要明确自己删除的目标,不要误删。
本次实验涉及到的锁很多,注意锁的顺序,不要死锁。
然而,即便如此,本次实验我的实现依然还是有些问题,首先是对于连接建立和断开时的丢包,我没有处理好,由于自己之前建立起来的框架并未考虑丢包的问题,要新加入这方面的逻辑变得不容易。其次是如果输出调试信息到控制台,则会导致整个连接运行一段时间后彻底停下。这里真的非常感谢助教,愿意不辞辛苦也不计安全性地让我直接上服务器调试(当然没调出问题)最后还亲自帮我看脚本,帮助我定位了问题所在。
后来老师点出来,这还是由于整个程序没有做到并发安全导致的,因为输出到控制台是一个非常耗时的操作,有可能导致线程之间的冲突,相比之下,无论是不输出调试信息还是输出到文件都不会这么耗时,所以这个问题也就不会出现。
不过,我最后还是没有发现问题在哪儿。。。
5. Summary \text{5. Summary} 5. Summary
本次实验给之前的基础上加上了丢包处理,使得整个协议栈更加完善,通过三个部件,即重传定时器,发送包队列和乱序包缓冲的配合,实现了发送方对于丢包重传的处理,以及接收方对于到达数据包乱序的处理,从而实现了可靠的传输。
但是现在的实现只能说在大部分情况下不会出现问题,在一些特殊情况下还是会出现问题,比如连接建立和断开时的丢包,比如输出到控制台的问题。只能说,希望后续还会有时间重新审视这份代码,找出问题所在。
Lab-3. 拥塞控制 \text{Lab-3. 拥塞控制} Lab-3. 拥塞控制
在实际网络中,我们并不知道整个网络的情况,如果一味地发送数据包,很可能会加剧网络拥塞,所以我们需要实现拥塞控制机制,评估网络状态,并且控制发送窗口,使得发送的数据包数量不会超过网络的承载能力。
1. Reno & NewReno \text{1. Reno \& NewReno} 1. Reno & NewReno
本次实验实现的是NewReno
拥塞控制算法,这是一种比较经典的拥塞控制算法,要理解NewReno
中的New
,我首先查找资料,了解了其前身Reno
算法~~(其实一部分原因是PPT实在看不懂)~~。
Reno
算法主要分为三个状态:Slow Start
,Congestion Avoidance
和Fast Recovery
。其状态转换关系如下:
NewReno
算法对Reno
算法进行了改进,在所谓的快速重传阶段,Reno
算法只会重传一个丢失的包,而NewReno
算法会重传所有连续的丢失的包。
2. Implementation \text{2. Implementation} 2. Implementation
接下来是实现。这个实验是我在TCP三个实验中耗时最长的一个,因为自己怎么也没读懂PPT,所以又去搜集了其他资料对照,但是即使是其他资料,说法也不统一。。。而且为了加入拥塞控制,对原来的代码进行了很多的改动。中间有一次以为自己做对了,写报告的时候把结果图拿来对比PPT发现还有问题,又改了好几天。
首先考虑到拥塞控制整体是一个较为独立的模块,我将其全部封装进了一个类当中,有自己的状态转移和处理。
1 2 3 4 5 6 7 8 9 10 struct tcp_congestion_controller { pthread_mutex_t lock; enum tcp_cc_state state ; u32 ssthresh; u32 cwnd; u32 rp; u8 dup_cnt; };
每个字段的含义都还算比较直观吧,接下来我对于原有的基础进行了改造。
首先是改造后的处理ack
包的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 static inline int inflight (struct tcp_sock *tsk) { return (tsk->snd_nxt - tsk->snd_una) / TCP_MSS - tsk->cc.dup_cnt; } static void ack_packet (struct tcp_sock *tsk, struct tcp_cb *cb, char *packet) { char *data = packet + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; log (DEBUG, "ack data packet, seq %d, ack: %d" , cb->seq, cb->ack); int data_len = cb->pl_len; switch (tsk->state) { case TCP_SYN_RECV: tsk->rcv_nxt = cb->seq_end; tcp_set_state(tsk, TCP_ESTABLISHED); tcp_hash(tsk); tcp_sock_accept_enqueue(tsk); wake_up(tsk->parent->wait_accept); break ; case TCP_FIN_WAIT_1: tsk->rcv_nxt = cb->seq_end; tcp_set_state(tsk, TCP_FIN_WAIT_2); break ; case TCP_SYN_SENT: tsk->rcv_nxt = cb->seq_end; break ; case TCP_CLOSING: tcp_set_state(tsk, TCP_TIME_WAIT); tcp_set_timewait_timer(tsk); break ; case TCP_LAST_ACK: tcp_set_state(tsk, TCP_CLOSED); break ; default : break ; } bool old_no_allowed = tsk->no_allowed_to_send; log (DEBUG, "ack: %d, snd_una: %d" , cb->ack, tsk->snd_una); if (cb->ack <= tsk->snd_una) { log (DEBUG, "received duplicate ack, handle it" ); tcp_cc_handle_dup_ack(tsk); } else { log (DEBUG, "received new ack, handle it" ); tcp_cc_handle_new_ack(tsk, cb); } tcp_update_window_safe(tsk, cb); int send_able = tsk->snd_wnd / TCP_MSS - inflight(tsk); int packet_allowed_to_send = max(send_able, 0 ); if (packet_allowed_to_send > 0 && old_no_allowed) { log (DEBUG, "allowed to send, wake up sending" ); tsk->no_allowed_to_send = false ; wake_up(tsk->wait_send); } tcp_try_update_retrans_timer(tsk); if (tsk->snd_una == tsk->snd_nxt) { log (DEBUG, "all acked, unset retrans timer" ); tcp_unset_retrans_timer(tsk); } int offset = tsk->rcv_nxt - cb->seq; data_len -= offset; if (data_len > 0 ) { pthread_mutex_lock(&tsk->rcv_buf->lock); log (DEBUG, "write data to rcv_buf, offset: %d, data_len: %d" , offset, data_len); bool old_empty = ring_buffer_empty(tsk->rcv_buf); write_ring_buffer(tsk->rcv_buf, data + offset, data_len); tsk->rcv_wnd = ring_buffer_free(tsk->rcv_buf); tsk->rcv_nxt = cb->seq_end; if (old_empty && !ring_buffer_empty(tsk->rcv_buf)) { log (DEBUG, "buffer is not empty, wake up receiving" ); wake_up(tsk->wait_recv); } log (DEBUG, "write data to rcv_buf succeeded, rcv_wnd: %d" , tsk->rcv_wnd); pthread_mutex_unlock(&tsk->rcv_buf->lock); } } static void tcp_handle_ack (struct tcp_sock *tsk, struct tcp_cb *cb, char *packet) { assert(cb->flags & TCP_ACK); update_send_buffer(tsk, cb); if (!is_tcp_seq_valid(tsk, cb)) { log (DEBUG, "received packet with invalid seq, drop it." ); return ; } log (DEBUG, "receive seq: %d, current expected rcv_nxt: %d" , cb->seq, tsk->rcv_nxt); if (less_than_32b(cb->seq, tsk->rcv_nxt) && less_or_equal_32b(cb->seq_end, tsk->rcv_nxt)) { log (DEBUG, "receive outdated packet, send ack." ); tcp_cc_handle_dup_ack(tsk); tcp_send_control_packet(tsk, TCP_ACK); return ; } if (less_or_equal_32b(cb->seq, tsk->rcv_nxt)) { ack_packet(tsk, cb, packet); ack_ofo_packets(tsk); } else { pend_ofo_packet(tsk, cb, packet); log (DEBUG, "receive out-of-order packet, send ack." ); tcp_send_control_packet(tsk, TCP_ACK); } }
基本上就是把状态转移的逻辑给抽出来了。同时,相比于最开始的基础协议栈,不能发送包的条件变多了,因此我专门加了一个no_allowed_to_send
的标志位,用于标记当前是否可以发送包,当任意一个导致不能发送包的条件被破坏时,都尝试唤醒发送方。
除此以外,我们也需要对到来的ack
包进行区分了,如果当前收到的包seq_end
不超过snd_una
,表明发送方收到了一个重复的ack
,此时就需要进入拥塞控制针对重复ack
的处理,反之则是新ack
的处理。
在确认新的包的时候,我们需要计算当前是否还能够发送包,这需要以窗口大小减去在途的包的数量,如果可以发送,但是原来不能,那么就需要唤醒发送方。
其他的基本上比较大同小异。接下来就是拥塞控制中针对ack
包的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 void tcp_cc_handle_new_ack (struct tcp_sock *tsk, struct tcp_cb *cb) { pthread_mutex_lock(&tsk->cc.lock); tsk->cc.dup_cnt = 0 ; switch (tsk->cc.state) { case TCP_CC_SLOW_START: if (tsk->cc.cwnd <= tsk->cc.ssthresh) { tsk->cc.cwnd <<= 1 ; report(tsk->cc.cwnd); log (DEBUG, "cwnd: %d" , tsk->cc.cwnd); } else { tsk->cc.state = TCP_CC_CONGESTION_AVOIDANCE; } break ; case TCP_CC_CONGESTION_AVOIDANCE: tsk->cc.cwnd += TCP_MSS * TCP_MSS / tsk->cc.cwnd; report(tsk->cc.cwnd); log (DEBUG, "cwnd: %d" , tsk->cc.cwnd); break ; case TCP_CC_FAST_RECOVERY: tsk->cc.cwnd = tsk->cc.ssthresh; tsk->cc.state = TCP_CC_CONGESTION_AVOIDANCE; report(tsk->cc.cwnd); log (DEBUG, "cwnd: %d" , tsk->cc.cwnd); break ; default : log (ERROR, "unknown cc state" ); } pthread_mutex_unlock(&tsk->cc.lock); } void tcp_cc_handle_dup_ack (struct tcp_sock *tsk) { pthread_mutex_lock(&tsk->cc.lock); tsk->cc.dup_cnt++; log (DEBUG, "handle dup ack, current dup_cnt: %d" , tsk->cc.dup_cnt); switch (tsk->cc.state) { case TCP_CC_SLOW_START: if (tsk->cc.dup_cnt == 3 ) { log (DEBUG, "dup ack == 3, fast retransmit" ); tsk->cc.rp = tsk->snd_nxt; tsk->cc.state = TCP_CC_FAST_RECOVERY; tcp_fast_retransmit(tsk); tsk->cc.cwnd = tsk->cc.ssthresh + 3 * TCP_MSS; } report(tsk->cc.cwnd); log (DEBUG, "cwnd: %d" , tsk->cc.cwnd); break ; case TCP_CC_CONGESTION_AVOIDANCE: if (tsk->cc.dup_cnt == 3 ) { log (DEBUG, "dup ack == 3, fast retransmit" ); tsk->cc.rp = tsk->snd_nxt; tcp_fast_retransmit(tsk); tsk->cc.state = TCP_CC_FAST_RECOVERY; tsk->cc.cwnd = tsk->cc.ssthresh + 3 * TCP_MSS; } report(tsk->cc.cwnd); log (DEBUG, "cwnd: %d" , tsk->cc.cwnd); break ; case TCP_CC_FAST_RECOVERY: tsk->cc.cwnd += TCP_MSS; report(tsk->cc.cwnd); log (DEBUG, "cwnd: %d" , tsk->cc.cwnd); break ; default : log (ERROR, "unknown cc state" ); } pthread_mutex_unlock(&tsk->cc.lock); }
这里我并不是完全对着那张图写的,在TCP_CC_SLOW_START
阶段我是直接翻倍的。
关于收到new ack
,我们可以简单地理解为,最终一定会停留在拥塞控制阶段,其中设计的思路也很简单,new ack
是网络环境正常的一个标识,new ack
连续到达,就表明网络环境较为通畅,可以适当增大拥塞窗口,然而这个过程一定是有节制的,不然随着窗口越来越大,发送的包也会越来越多,影响网络的稳定性。因此无论从哪个阶段开始,收new ack
最终都会进入CONGESTION_AVOIDANCE
阶段。
对于dup ack
的处理略微有些复杂,这里也涉及到了快速重传。快速重传是NewReno
和Reno
的一个重大区别,Reno
在重传时只会重新传一个包,而NewReno
则不然,它会一直重传至所谓的“恢复点”,这是在进入FAST RECOVERY
阶段之前的snd_nxt
值。我们可以简单理解为:进入FAST RECOVERY
阶段时我们已经收到了一定数量的重复包,怀疑发送窗口中的包已经丢掉了,快速重传也就是重传这些包。这么做的理由在于,如果我们重复性地收到对方发来的已经过时的ack
,那么我们就可以推测对方能收到我们的包,但是很可能由于发生了丢包,导致无法确认我们的包。这时,
为了提高性能,我们不必等到超时再重传,而是可以直接重传这些包。
快速重传的函数长这样,它只是改了一下窗口和慢启动阈值,实际的重传要自己完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static void tcp_fast_retransmit (struct tcp_sock *tsk) { tsk->cc.ssthresh = tsk->cc.cwnd >> 1 ; if (tsk->snd_una >= tsk->cc.rp) { log (DEBUG, "no packet to fast retransmit" ); return ; } pthread_mutex_lock(&tsk->send_lock); struct pending_packet *pos , *q ; list_for_each_entry_safe(pos, q, &tsk->send_buf, list ) { if (less_than_32b(pos->seq_end, tsk->cc.rp)) retrans_pending_packet(tsk, pos); else break ; } pthread_mutex_unlock(&tsk->send_lock); }
接下来是处理超时重传时的拥塞控制机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 void tcp_cc_handle_rto (struct tcp_sock *tsk) { pthread_mutex_lock(&tsk->cc.lock); tsk->cc.dup_cnt = 0 ; tsk->cc.ssthresh = tsk->cc.cwnd >> 1 ; tsk->cc.cwnd = TCP_MSS; tsk->cc.loss_cnt += (tsk->snd_nxt - tsk->snd_una) / TCP_MSS; tsk->cc.state = TCP_CC_SLOW_START; report(tsk->cc.cwnd); pthread_mutex_unlock(&tsk->cc.lock); }
在不同状态下,对于超时重传,我们都会回到SLOW START
阶段,并且重置cwnd
,可以简单理解为超时重传表明网络环境不太好,我们需要重新评估网络环境,所以回到SLOW START
阶段,重新开始。
此外,tcp_sock_write
我也做了一个比较大的修改,使之适应拥塞控制机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 int tcp_sock_write (struct tcp_sock* tsk, char * buf, int len) { int sent_len = 0 ; while (sent_len < len) { while ((tsk->snd_wnd == 0 || tsk->no_allowed_to_send) && tsk->state == TCP_ESTABLISHED) { sleep_on(tsk->wait_send); if (tsk->state == TCP_CLOSED) return -1 ; } int packets_allowed_to_send = tsk->snd_wnd / TCP_MSS - inflight(tsk); if (packets_allowed_to_send < 0 ) packets_allowed_to_send = 0 ; log (DEBUG, "sending window: %d, inflight: %d, packets allowed to send: %d" , tsk->snd_wnd, inflight(tsk), packets_allowed_to_send); if (packets_allowed_to_send == 0 ) { tsk->no_allowed_to_send = true ; continue ; } int send_len = min(tsk->snd_una + tsk->snd_wnd - tsk->snd_nxt, len - sent_len); while (!send_len && tsk->state == TCP_ESTABLISHED) { sleep_on(tsk->wait_send); if (tsk->state == TCP_CLOSED) return -1 ; send_len = min(tsk->snd_una + tsk->snd_wnd - tsk->snd_nxt, len - sent_len); if (len == sent_len) return sent_len; } send_len = min( send_len, 1514 - ETHER_HDR_SIZE - IP_BASE_HDR_SIZE - TCP_BASE_HDR_SIZE); char * packet_buf = malloc (send_len + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE); char * data = packet_buf + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE; memcpy (data, buf + sent_len, send_len); tcp_send_packet( tsk, packet_buf, send_len + ETHER_HDR_SIZE + IP_BASE_HDR_SIZE + TCP_BASE_HDR_SIZE, true ); sent_len += send_len; } return len; }
需要注意的就是如前所说,发送包的条件变苛刻了,一旦发现我们不能发送包,就要睡觉,等待制约条件被破坏后被唤醒,继续发送。
在前面的代码中,频繁出现report
函数,该函数将当前的时间,以及cwnd
的值输出到文件,便于后续数据分析。
当然也就少不了绘制图的脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import matplotlib.pyplot as pltdef plot_cwnd (file_path ): times = [] cwnd_sizes = [] with open (file_path, 'r' ) as file: for line in file: time, cwnd = map (float , line.split()) times.append(time) cwnd_sizes.append(int (cwnd)) plt.plot(times, cwnd_sizes) plt.title('CWND Sizes Over Time' ) plt.xlabel('Time' ) plt.ylabel('CWND Size' ) plt.grid(True ) plt.show() file_path = './cwnd.txt' plot_cwnd(file_path)
3. Pits and Falls \text{3. Pits and Falls} 3. Pits and Falls
这次实验踩的坑也不少了。
处理dup_ack
里的判断dup_cnt
的条件,应该是写等于号,而非大于等于号,如果是大于等于号,在接收到第三个dup_ack
后,后续的每个dup_ack
都会导致一次快速重传!这无疑会让本来就不稳定的网络雪上加霜。
由于要根据snd_una
和seq_end
来判断是否是重复的ack
,这里的判断在“是否能取等”上面需要尤其小心边界问题,多一个等号可能就会误判所有的ack
。
由于发送方睡眠的可能性更多了,也要仔细注意所有可能需要唤醒发送方的位置,考虑好发送方的睡眠是由哪些变量决定的,在所有这些变量更新的地方都需要注意判断一下是否需要唤醒发送方。这个处理不好可能发送方就一睡不醒了。
4. Result \text{4. Result} 4. Result
根据输出结果绘制如下:
感觉大体趋势是很像的,最开始网络环境较好,cwnd
迅速增大,然后在超过阈值之后变为线性增长,不过这里由于时间太短不太看得出来,同时由于我们发了太多的包,网络环境也变得有些拥塞了,所以后面就收到了几个重复的ack
,导致窗口迅速减半,此后就是一直处于波动。中间也会有丢包,导致整个窗口变为初始值并且一段时间内发不了任何包,只能等待重传,以及收到对方的ack
。
5. Summary \text{5. Summary} 5. Summary
本次实验基本上实现了NewReno
拥塞控制算法,其原理是一个简单的状态机,根据网络环境中发生的不同事件切换不同的状态,评估网络环境并且作出相应的调整。
具体而言,其分为三个状态:SLOW START
,CONGESTION AVOIDANCE
和FAST RECOVERY
。SLOW START
阶段是指在网络环境较好时,我们可以快速增大发送窗口,以便更快地发送数据。CONGESTION AVOIDANCE
阶段是指在网络环境较差时,我们需要缓慢增大发送窗口,以便更好地适应网络环境。FAST RECOVERY
阶段是指在发生丢包时,我们可以快速重传,以便更快地恢复丢失的包。这也就是“慢启动,快重传”这几个词的来源,某种程度是,也回应了之前的bufferfloat
实验。
4. Summary for TCP \text{4. Summary for TCP} 4. Summary for TCP
终于结束了整个网络实验中最让我期待的部分,然而这部分的难度也是相当之大,中间也是因为自己的一些理解上的错误,心态上的问题,实现上的粗糙,耽误了很多时间。但是再怎么粗糙,一个简陋的网络协议栈也终于就此搭起来了。
这三次实验层层递进,中间一个小细节给了我很深刻的印象,在做第一个基础实验的时候,我由于对发送方的发送毫无控制,直接导致了bufferfloat
现象,出现了丢包,让接收方无法再接收任何数据,因此不得不让发送方休眠一会儿。而在实现了可靠传输之后,即使我去掉了这个睡眠,让发送方不停发送,接收方也能正常收到数据,但是仍然需要接受非常多的乱序包之后才能做出回应。而在终于实现了拥塞控制的快重传机制之后,双方回应丢包的速度就快多了。我原本以为加入拥塞控制以后传输会变得更慢,但是没想到由于双方对于丢包现象的快速响应,传输的时间没怎么变长,反而缩短了。这也给了我很大的成就感。
这三次实验也让我对于整个网络协议栈的结构有了更深的理解,至此,我基本上也初步理解了“网络是如何构建的”这个从应用贯穿到底层的问题。从这一点上讲,虽然这三次实验消耗很大,但是收获是值得的。