前言
ByteStream(由发送方上层应用程序创建并写入数据),并将字节流转换为报文段发送给接收方。
代码实现
- 跟踪 TCP Receiver 的窗口,处理确认应答号和窗口大小
- 通过从
ByteStream
中读取内容来填充发送窗口,创建新的报文段(可以包含 SYN 和 FIN 标志),并发送它们 - 跟踪哪些分段已发送但尚未被接收方确认——我们称之为未完成报文段(outstanding segment)
- 如果发送报文段后经过足够长的时间仍未得到确认,则重新发送未完成的报文段
由于涉及到超时处理,我们可以先实现一个简单的定时器 Timer
,类声明如下所示:
class Timer {
private:
uint32_t _rto; // 超时时间
uint32_t _remain_time; // 剩余时间
bool _is_running; // 是否在运行
public:
Timer(uint32_t rto;
// 启动计时器
void start(;
// 停止计时器
void stop(;
// 是否超时
bool is_time_out(;
// 设置过去了多少时间
void elapse(size_t eplased;
// 设置超时时间
void set_time_out(uint32_t duration;
};
根据实验指导书的要求,定时器不能通过调用系统时间函数来知道过了多长时间,而是由外部传入的时长参数告知,这一点可以从 send_retx.cc
测试用例得到印证:
TCPSenderTestHarness test{"Retx SYN twice at the right times, then ack", cfg};
test.execute(ExpectSegment{}.with_no_flags(.with_syn(true.with_payload_size(0.with_seqno(isn;
test.execute(ExpectNoSegment{};
test.execute(ExpectState{TCPSenderStateSummary::SYN_SENT};
// 外部指定逝去的时间
test.execute(Tick{retx_timeout - 1u};
所以这个定时器的实现就很简单,外部通过调用 Timer::elapse(
告知定时器多久过去了,定时器只要更新一下剩余时长就好了:
Timer::Timer(uint32_t rto : _rto(rto, _remain_time(rto, _is_running(false {}
void Timer::start( {
_is_running = true;
_remain_time = _rto;
}
void Timer::stop( { _is_running = false; }
bool Timer::is_time_out( { return _remain_time == 0; }
void Timer::elapse(size_t elapsed {
if (elapsed > _remain_time {
_remain_time = 0;
} else {
_remain_time -= elapsed;
}
}
void Timer::set_time_out(uint32_t duration {
_rto = duration;
_remain_time = duration;
}
完成定时器之后,来看看 TCPSender
类有哪些成员:
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
// 未被确认的报文段
std::queue<std::pair<TCPSegment, uint64_t>> _outstand_segments{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//! the (absolute sequence number for the next byte to be sent
uint64_t _next_seqno{0};
// ackno checkpoint
uint64_t _ack_seq{0};
// 连续重传次数
uint32_t _consecutive_retxs{0};
// 未被确认的序号长度
uint64_t _outstand_bytes{0};
// 接收方窗口长度
uint16_t _window_size{1};
// 是否同步
bool _is_syned{false};
// 是否结束
bool _is_fin{false};
// 计时器
Timer _timer;
public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {};
//! \name "Input" interface for the writer
ByteStream &stream_in( { return _stream; }
const ByteStream &stream_in( const { return _stream; }
//! \brief A new acknowledgment was received
bool ack_received(const WrappingInt32 ackno, const uint16_t window_size;
//! \brief Generate an empty-payload segment (useful for creating empty ACK segments
void send_empty_segment(;
// 发送报文段
void send_segment(std::string &&data, bool syn = false, bool fin = false;
//! \brief create and send segments to fill as much of the window as possible
void fill_window(;
//! \brief Notifies the TCPSender of the passage of time
void tick(const size_t ms_since_last_tick;
//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
size_t bytes_in_flight( const;
//! \brief Number of consecutive retransmissions that have occurred in a row
unsigned int consecutive_retransmissions( const;
//! \brief TCPSegments that the TCPSender has enqueued for transmission.
std::queue<TCPSegment> &segments_out( { return _segments_out; }
//! \brief absolute seqno for the next byte to be sent
uint64_t next_seqno_absolute( const { return _next_seqno; }
//! \brief relative seqno for the next byte to be sent
WrappingInt32 next_seqno( const { return wrap(_next_seqno, _isn; }
};
可以看到,我们 TCPSender
有以下主要成员:
-
queue<pair<TCPSegment, uint64_t>> _outstand_segments:存放未被确认的报文段和它对应的绝对序列号的队列
-
uint64_t _ack_seq:上一次收到的绝对确认应答号
-
uint32_t _consecutive_retxs:最早发送的但是未被确认的报文段的重传次数,用于更新超时时间
-
uint64_t _outstand_bytes:所有未被确认的报文段所占序列号空间长度,SYN 和 FIN 也要占用一个序号
-
uint16_t _window_size:接收方窗口大小,初始值为 1,由于没有实现加性递增乘性递减(AIMD)拥塞控制机制,所以不用维护发送方的拥塞窗口大小,直接维护接收方窗口大小
-
bool _is_syned:是否成功同步
-
bool _is_fin:是否关闭连接
-
Timer _timer:定时器
queue<TCPSegment> _segments_out:待发送的报文段队列,外部程序会从这个队列里面取出报文段并发送出去
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn
: _isn(fixed_isn.value_or(WrappingInt32{random_device((}
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity
, _timer(retx_timeout {}
uint64_t TCPSender::bytes_in_flight( const { return _outstand_bytes; }
unsigned int TCPSender::consecutive_retransmissions( const { return _consecutive_retxs; }
丢包处理
根据实验指导书中的描述:
外部会定期调用 TCPSender::tick(
函数来告知它过了多长时间,TCPSender
要根据传入的时间判断最早发送的包是不是超时未被确认,如果是(定时器溢出),就说明这个包丢掉了,需要重传。
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick {
// 更新定时器
_timer.elapse(ms_since_last_tick;
if (!_timer.is_time_out(
return;
// 超时需要重发第一个报文段,同时将超时时间翻倍
_segments_out.push(_outstand_segments.front(.first;
_consecutive_retxs += 1;
_timer.set_time_out(_initial_retransmission_timeout * (1 << _consecutive_retxs;
_timer.start(;
}
这里只重传了一个报文段,而不是像回退 N 步(GBN)协议那样重传整个窗口内的报文段,这是因为 Lab2 中实现的接收方会缓存所有乱序到达的报文段,而 GBN 是直接将其丢弃掉了。如果我们重传的包被成功接收了,并且使接收方成功重组了整个发送窗口内的数据,就不需要重传后续的报文段了。如果没有成功重组,仍有部分数据缺失,接收方会回复一个它想要的报文段的序号,到时候重传这个报文段就行了。
发送报文段
TCPSender 的各个成员的值就是图中所标注的那样,这时候调用 TCPSender::fill_window(
发送的应该是 _next_seq
~ _ack_seq + _window_size
之间的数据。不过在发送数据之前需要完成三次握手,所以需要先判断 _is_syned
是否为 true
,如果为 false
就需要发送 SYN
包与接收端进行连接。所有数据都发送完成之后需要发送一个 FIN
报文段(可以携带最后一批数据或者不懈携带任何数据)说明 TCPSender
已经没有新数据要发送了,可以断开连接了。
void TCPSender::fill_window( {
if (!_is_syned {
// 等待 SYN 超时
if (!_outstand_segments.empty(
return;
// 发送一个 SYN 包
send_segment("", true;
} else {
size_t remain_size = max(_window_size, static_cast<uint16_t>(1 + _ack_seq - _next_seqno;
// 当缓冲区中有待发送数据时就发送数据报文段
while (remain_size > 0 && !_stream.buffer_empty( {
auto ws = min(min(remain_size, TCPConfig::MAX_PAYLOAD_SIZE, _stream.buffer_size(;
remain_size -= ws;
string &&data = _stream.peek_output(ws;
_stream.pop_output(ws;
// 置位 FIN
_is_fin |= (_stream.eof( && !_is_fin && remain_size > 0;
send_segment(std::move(data, false, _is_fin;
}
// 缓冲区输入结束时发送 FIN(缓冲区为空时不会进入循环体,需要再次发送)
if (_stream.eof( && !_is_fin && remain_size > 0 {
_is_fin = true;
send_segment("", false, true;
}
}
}
void TCPSender::send_segment(string &&data, bool syn, bool fin {
// 创建报文段
TCPSegment segment;
segment.header(.syn = syn;
segment.header(.fin = fin;
segment.header(.seqno = next_seqno(;
segment.payload( = std::move(data;
// 将报文段放到发送队列中
_segments_out.push(segment;
_outstand_segments.push({segment, _next_seqno};
// 更新序号
auto len = segment.length_in_sequence_space(;
_outstand_bytes += len;
_next_seqno += len;
}
void TCPSender::send_empty_segment( {
TCPSegment seg;
seg.header(.seqno = next_seqno(;
_segments_out.push(seg;
}
这里有一个地方值得思考的问题是:把同一个报文段保存到两个队列中不会导致数据的拷贝吗?实际上不会,因为 TCPSegment::_payload
的数据类型是 Buffer
,它的声明如下所示:
//! \brief A reference-counted read-only string that can discard bytes from the front
class Buffer {
private:
std::shared_ptr<std::string> _storage{};
size_t _starting_offset{};
public:
Buffer( = default;
//! \brief Construct by taking ownership of a string
Buffer(std::string &&str noexcept : _storage(std::make_shared<std::string>(std::move(str {}
//! \name Expose contents as a std::string_view
std::string_view str( const {
if (not _storage {
return {};
}
return {_storage->data( + _starting_offset, _storage->size( - _starting_offset};
}
operator std::string_view( const { return str(; }
//! \brief Get character at location `n`
uint8_t at(const size_t n const { return str(.at(n; }
//! \brief Size of the string
size_t size( const { return str(.size(; }
//! \brief Make a copy to a new std::string
std::string copy( const { return std::string(str(; }
//! \brief Discard the first `n` bytes of the string (does not require a copy or move
//! \note Doesn't free any memory until the whole string has been discarded in all copies of the Buffer.
void remove_prefix(const size_t n;
};
可以看到 Buffer
内部使用智能指针 shared_ptr<string> _storage
共享了同一份字符串,当 queue.push(buffer
的时候调用了 Buffer(const Buffer &
拷贝构造函数,只对 _storage
指针进行赋值而不涉及字符串复制操作。同时 Buffer(string &&str
构造函数接受右值,可以直接把传入的字符串偷取过来,无需拷贝,效率是很高的。
确认应答号处理
_outstand_segments 队列中绝对序列号小于绝对确认应答号的报文段。如果不存在未确认的报文段了就关闭定时器,否则得再次启动定时器,为重传下一个报文段做准备。
//! \param ackno The remote receiver's ackno (acknowledgment number
//! \param window_size The remote receiver's advertised window size
//! \returns `false` if the ackno appears invalid (acknowledges something the TCPSender hasn't sent yet
bool TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size {
auto ack_seq = unwrap(ackno, _isn, _ack_seq;
if (ack_seq == 0
return true;
// absolute ackno 不能落在窗口外
if (_is_syned && ack_seq > _next_seqno
return false;
_is_syned = true;
_window_size = window_size;
_ack_seq = ack_seq;
// 重置超时时间为初始值
_timer.set_time_out(_initial_retransmission_timeout;
_consecutive_retxs = 0;
// 移除已被确认的报文段
while (!_outstand_segments.empty( {
auto &[segment, seqno] = _outstand_segments.front(;
if (seqno >= ack_seq
break;
_outstand_bytes -= segment.length_in_sequence_space(;
_outstand_segments.pop(;
}
// 再次填满发送窗口
fill_window(;
// 如果还有没被确认的报文段就重启计时器
if (!_outstand_segments.empty(
_timer.start(;
else
_timer.stop(;
return true;
}
测试
在命令行中输入下述代码就能编译并测试所有与发送方有关的测试用例:
cd build
make -j8
ctest -R send_
测试结果如下,发现全部成功通过了:
总结
相比于 Lab2,Lab3 的难度更高,因为实验指导书的说明并不是很充分,很多东西还是靠复习课本和调试测试用例搞明白的,不过有一说一,CS144 的测试用例写的是真的好,代码很整洁,也用了建造者模式等设计模式,还是很值得学习的。期待最后一个实验 Lab4,以上~~