很早就听闻 CS144 Lab 的大名, 但是由于各种原因一直没有上手实践. 直到最近才开始着手实践, 本文将记录我在实践过程中的一些笔记.
由于本次实验和博客是并行进行的, 所以已经编辑公开的部分也可能会出现部分差错以及可能导致的多次修改.
我的仓库在 4ever-xxxl/CS144-Computer-Network .
环境搭建
运行环境
本次环境基于官方镜像 VirtualBox 搭建, 镜像文件以及搭建教程可以在 这里 找到. VirtualBox 启动后照常配置, 使用 Vscode 远程连接到虚拟机, Vscode 中配置插件 C/C++
GitLens
, 之后的实验都在 Vscode 中进行.
运行 setup_dev_env.sh 配置实验所需环境. 之后 git clone 仓库, 我使用的仓库是 PKUFlyingPig/CS144-Computer-Network , 再按照 README.md 配置即可.
这里有个坑就是由于缺失了 <array> <stdexcept> 两个文件头导致执行 make
时会报错, 需要在对应文件手动补上. (我也不是很理解为什么三年前的高星仓库 clone 下来不能直接用.
figure-1
error
测试环境
为了便于调试并且不影响 Release 版本测速, 我在实验目录下新建了 Debug 目录, 使用 cmake .. -DCMAKE_BUILD_TYPE=Debug
生成 Debug 版本的可执行文件. Vscode 使用的测试配置文件如下:
launch.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"version" : "0.2.0" ,
"configurations" : [
{
"name" : "CS144Lab debug" ,
"type" : "cppdbg" ,
"request" : "launch" ,
"program" : "${workspaceFolder}/CS144-Computer-Network/Debug/tests/${fileBasenameNoExtension}" , //!设置为测试程序源码相对应的目标程序路径
"args" : [],
"stopAtEntry" : false ,
"cwd" : "${workspaceFolder}" ,
"environment" : [],
"externalConsole" : false ,
"MIMode" : "gdb" ,
"setupCommands" : [
{
"description" : "为 gdb 启用整齐打印" ,
"text" : "-enable-pretty-printing" ,
"ignoreFailures" : true
}
],
"miDebuggerPath" : "/usr/bin/gdb"
}
]
}
但是这种情况下还是没办法愉快的调试, 因为 DEBUG 的编译参数是 -Og
, 还是会导致部分变量出现 OPTIMIZEOUT 的情况. 因此我们需要修改 /etc/cflags.cmake
文件, 将 -Og
改成 -O0
.
接下来就可以愉快的调试啦.
Lab_0
Writing webget
编写 get_URL 函数简略实现应用层 HTTP 请求响应的功能, 即使用 socket 向指定 IP 和 Path 发送 GET 请求并获取响应.
1
2
3
4
5
6
7
8
9
10
11
void get_URL ( const string & host , const string & path ) {
TCPSocket sock {};
sock . connect ( Address ( host , "http" ));
sock . write ( "GET " + path + " HTTP/1.1 \r\n " );
sock . write ( "Host: " + host + " \r\n " );
sock . write ( "Connection: close \r\n\r\n " );
while ( ! sock . eof ()) {
cout << sock . read ();
}
sock . close ();
}
这里同样有个坑就是如果主机开了代理并且开了 TUN 模式, 那么 sock.shutdown(SHUT_WR);
会导致异常退出. 具体原因还不是很清楚.
An in-memory reliable byte stream
编写一个循环字节流类, 需要向 _buffeer
中写入和读取数据, 其中读取部分分成了两步, 先复制再弹出. 代码如下
byte_stream.hh
1
2
3
4
5
6
7
8
9
10
11
#include <deque>
class ByteStream {
private :
size_t _capacity = 0 ;
size_t _read_count = 0 ;
size_t _write_count = 0 ;
std :: deque < char > _buffer = {};
bool _eof = false ;
bool _error = false ;
// ...
}
byte_stream.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
ByteStream :: ByteStream ( const size_t capacity ) : _capacity ( capacity ) {}
size_t ByteStream :: write ( const string & data ) {
size_t len = std :: min ( data . length (), remaining_capacity ());
for ( size_t i = 0 ; i < len ; i ++ ) {
_buffer . emplace_back ( data [ i ]);
}
_write_count += len ;
return len ;
}
string ByteStream :: peek_output ( const size_t len ) const {
size_t peekLen = std :: min ( len , buffer_size ());
return string (). assign ( _buffer . begin (), _buffer . begin () + peekLen );
}
void ByteStream :: pop_output ( const size_t len ) {
size_t popLen = std :: min ( len , buffer_size ());
for ( size_t i = 0 ; i < popLen ; i ++ ) {
_buffer . pop_front ();
}
_read_count += popLen ;
}
std :: string ByteStream :: read ( const size_t len ) {
std :: string readStream = ByteStream :: peek_output ( len );
ByteStream :: pop_output ( len );
return readStream ;
}
void ByteStream :: end_input () { _eof = true ; }
bool ByteStream :: input_ended () const { return _eof ; }
size_t ByteStream :: buffer_size () const { return _buffer . size (); }
bool ByteStream :: buffer_empty () const { return _buffer . size () == 0 ; }
bool ByteStream :: eof () const { return buffer_empty () && input_ended (); }
size_t ByteStream :: bytes_written () const { return _write_count ; }
size_t ByteStream :: bytes_read () const { return _read_count ; }
size_t ByteStream :: remaining_capacity () const { return _capacity - buffer_size (); }
Lab_1
stream_reassembler
编写一个流重组器, 用于将乱序的数据流重组成有序的数据流.
这个 lab 的难点在于乱序数据流的合并以及数据流结尾的边界问题.
对于乱序数据流, 我采用 deque 来进行存储, 用 _buffer
存储数据, _bitmap
存储数据是否已经被写入. 当数据流到来时, 先根据前后边界进行裁剪. 前边界为第一个未按序的字节序号, 后边界由缓冲区大小限制. 然后顺序扫一遍, 将数据写入 _buffer
未存储的位置, 即 _bitmap
为 false
的位置, 并将其置真. 最后顺序检查 _bitmap
, 将已经按序的头部数据弹出并写入 _output
中.
这里的实现并没有采用 set 等树型数据结构, 而是在双端队列中顺序存储. 虽然实现简单, 但是时间复杂度是 $O(n)$ 的, 理论上部分环节是可以达到 $O(\log n)$ 的, 留个坑等后面再改进吧.
然后是数据流结尾的边界问题, 只有当数据流结尾到来并且都能被写入时, 才能记录下 _eof
信号, 否则会导致数据流结尾的数据丢失. 这时结尾前可能仍然是乱序状态, 因此需要等待所有乱序数据排列完并写入后才能将 _output
关闭.
check_lab1
的运行时间在 0.75~0.85s 左右.
stream_reassembler.hh
1
2
3
4
5
6
7
8
9
10
11
12
class StreamReassembler {
private :
// Your code here -- add private members as necessary.
std :: deque < char > _buffer = {};
std :: deque < bool > _bitmap = {};
size_t _first_unassembled_idx = 0 ;
size_t _unassembled_bytes_num = 0 ;
bool _eof = false ;
ByteStream _output ; //!< The reassembled in-order byte stream
size_t _capacity ; //!< The maximum number of bytes
// ...
}
stream_reassembler.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
StreamReassembler :: StreamReassembler ( const size_t capacity )
: _buffer ( capacity , '\0' ), _bitmap ( capacity , false ), _output ( capacity ), _capacity ( capacity ) {}
void StreamReassembler :: push_substring ( const string & data , const size_t index , const bool eof ) {
if ( eof && _first_unassembled_idx + _capacity - _output . buffer_size () >= index + data . length ()) {
_eof = true ;
}
size_t front_boundary = std :: max ( index , _first_unassembled_idx );
size_t back_boundary = std :: min ( index + data . length (), _first_unassembled_idx + _capacity - _output . buffer_size ());
for ( size_t i = front_boundary ; i < back_boundary ; i ++ ) {
if ( _bitmap [ i - _first_unassembled_idx ]) {
continue ;
}
_buffer [ i - _first_unassembled_idx ] = data [ i - index ];
_bitmap [ i - _first_unassembled_idx ] = true ;
_unassembled_bytes_num ++ ;
}
std :: string _str = "" ;
while ( _bitmap . front ()) {
_str += _buffer . front ();
_buffer . pop_front ();
_bitmap . pop_front ();
_buffer . push_back ( '\0' );
_bitmap . push_back ( false );
}
_output . write ( _str );
_first_unassembled_idx += _str . length ();
_unassembled_bytes_num -= _str . length ();
if ( _eof && empty ()) {
_output . end_input ();
}
}
size_t StreamReassembler :: unassembled_bytes () const { return _unassembled_bytes_num ; }
bool StreamReassembler :: empty () const { return unassembled_bytes () == 0 ; }
size_t StreamReassembler :: ack_idx () const { return _first_unassembled_idx ; }
bool StreamReassembler :: input_ended () const { return _eof && empty (); }
Lab_2
wrapping_integers
编写一个包装整数类, 用于实现序列号的加减法. 实现 WrappingInt32
和 uint64_t
两个类之间的转换函数.
这里的实现很简单, 对于绝对序列号转化成序列号, 只需要加上基准偏移量 isn
即可, 同时由于序列号自动取模了, 因此不需要其他操作.
对于序列号转化成绝对序列号, 想要找到离 checkpoint
最近的绝对序列号. 首先注意到有以下式子成立:
$$
\begin{aligned}
\text{checkpoint} &= \text{n} - \text{isn} + k * 2^{32} + \text{remainder}\
\end{aligned}
$$
其中 k
为任意整数, remainder
为余数. 通过比较 2 * remainder
和 1 << 32
的大小, 就可以决定应该靠近哪一边. 计算绝对序列号时应该取 k
还是 k+1
.
但是这里有个坑, 就是当 checkpoint < remainder
时, 会导致算出的值小于 0, 但是我们绝对序列号的类型是 uint64_t
, 因此会导致溢出. 所以这种情况应该单独处理.
wrapping_integers.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
WrappingInt32 wrap ( uint64_t n , WrappingInt32 isn ) { return isn + uint32_t ( n ); }
uint64_t unwrap ( WrappingInt32 n , WrappingInt32 isn , uint64_t checkpoint ) {
uint64_t res = uint64_t ( n - isn );
uint64_t RING = 1ul << 32 ;
if ( res >= checkpoint ) {
uint64_t k = ( res - checkpoint ) / RING ;
uint64_t Mod = ( res - checkpoint ) % RING ;
if ( 2ul * Mod > RING && checkpoint >= Mod ) {
res -= ( k + 1ul ) * RING ;
} else {
res -= k * RING ;
}
} else {
uint64_t k = ( checkpoint - res ) / RING ;
uint64_t Mod = ( checkpoint - res ) % RING ;
if ( 2ul * Mod > RING ) {
res += ( k + 1ul ) * RING ;
} else {
res += k * RING ;
}
}
return res ;
}
tcp_receiver
实现 TCP 接收端, 用于接收 TCP 数据包并将其重组成有序的数据流. 需要处理 syn
fin
两种特殊数据包, 以及乱序数据包.
首先, 为了便于对 stream_reassembler
的数据调用, 我们新建了两个接口, ack_idx
和 input_ended
, 用于获取绝对 ack 序列号和判断是否结束.
stream_reassembler.hh
1
2
3
4
5
6
//! The expected number of syn in the next segment
size_t ack_idx () const ;
//! \brief Is the stream_reassembler ending?
//! \returns `true` if stream_reassembler has ended
bool input_ended () const ;
stream_reassembler.cc
1
2
3
size_t StreamReassembler :: ack_idx () const { return _first_unassembled_idx ; }
bool StreamReassembler :: input_ended () const { return _eof && empty (); }
接下来是 tcp_receiver
的实现, 首先是 segment_received
函数, 用于接收数据包并处理. 处理的逻辑如下:
如果在 syn
之前收到数据包, 则直接丢弃.
_syn_flag
_fin_flag
或等于对应的标志位, 记录是否已经受到过对应的数据包.
如果收到 syn
数据包, 则记录 isn
序列号, 并将 syn
数据包的序列号加一作为下一个期望的序列号.
绝对序列号通过 unwrap
函数转化成序列号, 并调用 stream_reassembler
的 push_substring
函数进行处理. 这里的参数 index
是数据包内容的序列号, 所以需要减一.
然后是 ackno
函数的实现, 略有小坑, 需要判断是否收到过 syn
数据包. 如果收到过, 那么通过绝对序列号计算序列号返回即可; 如果没有收到过, 那么 ackno
应该返回 std::nullopt
而不是 0. ( 为啥不可以返回 0 啊喂 ).
PS: 哦由于 syn
数据包有初始偏移量 isn
所以在未收到 syn
数据包时, 绝对序列号应该是 0, 序列号是无法预测的. 所以这里的 ackno
函数应该返回 std::nullopt
而不是 0.
window_size
则较为简单, 直接返回 stream_reassembler
的剩余容量即可.
check_lab2
的运行时间在 0.90~1.10s 左右.
tcp_receiver.hh
1
2
3
4
5
6
7
8
9
10
11
12
class TCPReceiver {
//! Our data structure for re-assembling bytes.
StreamReassembler _reassembler ;
bool _syn_flag = false ;
bool _fin_flag = false ;
size_t _abs_seqno = 0 ;
WrappingInt32 _seqno { 0 };
WrappingInt32 _isn { 0 };
//! The maximum number of bytes we'll store.
size_t _capacity ;
// ...
}
tcp_receiver.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void TCPReceiver :: segment_received ( const TCPSegment & seg ) {
if ( ! seg . header (). syn && ! _syn_flag ) {
return ;
}
_syn_flag |= seg . header (). syn ;
_fin_flag |= seg . header (). fin ;
if ( seg . header (). syn ) {
_isn = seg . header (). seqno ;
}
_seqno = seg . header (). seqno + seg . header (). syn ;
_abs_seqno = unwrap ( _seqno , _isn , _reassembler . ack_idx ());
_reassembler . push_substring ( seg . payload (). copy (), _abs_seqno - 1 , seg . header (). fin );
}
optional < WrappingInt32 > TCPReceiver :: ackno () const {
size_t _abs_ackno = _reassembler . ack_idx () + _syn_flag + _reassembler . input_ended ();
if ( _abs_ackno > 0 ) {
return wrap ( _abs_ackno , _isn );
} else {
return std :: nullopt ;
}
}
size_t TCPReceiver :: window_size () const { return _capacity - _reassembler . stream_out (). buffer_size (); }
Lab_3
tcp_sender
实现 TCP 发送端, 用于发送 TCP 数据包并接收 ACK 数据包. 主要函数有 fill_window
ack_received
tick
三个. 下面介绍各个函数的实现逻辑.
Function fill_window()
figure-2
sending_space
计算接收端当前窗口剩余大小 sending_space
, 默认为 1. 计算公式如上图 figure-2
所示.
当 sending_space
大于 0 且未发送 fin
数据包时, 持续发送数据包以填满窗口.
序列号 seqno
为 next_seqno
, 下一个待发送数据包的序列号.
如果未发送 syn
数据包, 则设置 header().syn = true
, sending_space
减一.
计算能发送的数据包大小并填入 payload()
, sending_space
减去数据包大小.
如果 sending_space
大于 0 且数据流结束, 则设置 header().fin = true
, sending_space
减一.
检测是否为空数据报, 是则退出函数.
将数据包加入发送队列和超时队列, 并更新 next_seqno
和 bytes_in_flight
.
发送数据包, 并检测是否开始计时.
重复 2~9 步骤直到窗口填满或者数据流结束.
Function ack_received()
如果收到的 ACK 数据包不在发送队列中, 则退出函数.
更新存储的接收端窗口大小 window_size
.
如果收到的 ACK 数据包序列号大于等于超时队列中的序列号, 则将超时队列中的数据包弹出, 并更新 bytes_in_flight
和计时.
重复第 3 步骤直到收到的 ACK 数据包序列号小于超时队列中的序列号或者超时队列为空.
如果有更新队列, 那么重新计时. 如果队列为空, 则停止计时.
Function tick()
如果计时器已经停止, 则退出函数.
计算是否超时, 如果没有超时, 则退出函数.
如果超时, 则重传第一个未确认的数据包, 并重新计时. 如果接收方有空余, 那么重传次数 +1, 超时时间翻倍.
tcp_sender.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TCPSender {
private :
bool _fin_sent = false ;
uint64_t _abs_ackno = 0 ;
uint64_t _bytes_in_flight = 0 ;
uint16_t _receiver_window_size = 1 ;
bool _time_running = false ;
unsigned int _rto = 0 ;
unsigned int _time_elapsed = 0 ;
unsigned int _consecutive_retransmissions = 0 ;
std :: queue < TCPSegment > _segments_outstanding {};
// ...
}
tcp_sender.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender :: TCPSender ( const size_t capacity , const uint16_t retx_timeout , const std :: optional < WrappingInt32 > fixed_isn )
: _rto ( retx_timeout )
, _isn ( fixed_isn . value_or ( WrappingInt32 { random_device ()()}))
, _initial_retransmission_timeout { retx_timeout }
, _stream ( capacity ) {}
uint64_t TCPSender :: bytes_in_flight () const { return _bytes_in_flight ; }
void TCPSender :: fill_window () {
size_t sending_space = _abs_ackno + ( _receiver_window_size != 0 ? _receiver_window_size : 1 ) - _next_seqno ;
while ( sending_space > 0 && ! _fin_sent ) {
TCPSegment seg ;
seg . header (). seqno = next_seqno ();
if ( _next_seqno == 0 ) {
seg . header (). syn = true ;
sending_space -- ;
}
size_t read_size = min ( sending_space , TCPConfig :: MAX_PAYLOAD_SIZE );
seg . payload () = stream_in (). read ( read_size );
sending_space -= seg . payload (). size ();
if ( stream_in (). eof () && sending_space > 0 ) {
seg . header (). fin = true ;
_fin_sent = true ;
sending_space -- ;
}
if ( seg . length_in_sequence_space () == 0 ) {
return ;
}
segments_out (). emplace ( seg );
if ( ! _time_running ) {
_time_running = true ;
_time_elapsed = 0 ;
}
_segments_outstanding . push ( seg );
_next_seqno += seg . length_in_sequence_space ();
_bytes_in_flight += seg . length_in_sequence_space ();
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender :: ack_received ( const WrappingInt32 ackno , const uint16_t window_size ) {
_abs_ackno = unwrap ( ackno , _isn , _next_seqno );
if ( _abs_ackno > _next_seqno ) {
return ;
}
_receiver_window_size = window_size ;
bool new_ack = false ;
while ( ! _segments_outstanding . empty ()) {
TCPSegment seg = _segments_outstanding . front ();
size_t len = seg . length_in_sequence_space ();
uint64_t seqno = unwrap ( seg . header (). seqno , _isn , _next_seqno );
if ( seqno + len > _abs_ackno ) {
break ;
}
_segments_outstanding . pop ();
_bytes_in_flight -= len ;
new_ack = true ;
}
if ( new_ack ) {
_rto = _initial_retransmission_timeout ;
_time_elapsed = 0 ;
_time_running = ! _segments_outstanding . empty ();
_consecutive_retransmissions = 0 ;
}
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender :: tick ( const size_t ms_since_last_tick ) {
if ( ! _time_running ) {
return ;
}
_time_elapsed += ms_since_last_tick ;
if ( _time_elapsed >= _rto ) {
_segments_out . push ( _segments_outstanding . front ());
if ( _receiver_window_size > 0 ) {
_consecutive_retransmissions ++ ;
_rto <<= 1 ;
}
_time_elapsed = 0 ;
}
}
unsigned int TCPSender :: consecutive_retransmissions () const { return _consecutive_retransmissions ; }
void TCPSender :: send_empty_segment () {
TCPSegment seg ;
seg . header (). seqno = next_seqno ();
_segments_out . push ( seg );
}
注意
目前可能打算暂时做到这里, 之后有时间再继续更新. 接下来时间会做一些 CNSS Dev 的题目, 之后可能会更新一些个人题解.