Thrift系列 | Thrift发送过程源码分析

mac2022-06-30  24

这边梳理一下整个RPC通信以及数据包的过程(从看源代码角度出发的),首先是客户端调用ping这个函数,

Test test; test.num1 = 1000; test.num2 = 1000; test.str = "000000"; test.bs = "111111"; client.ping(test);

那么调用这个函数其实是分为两步的,一部是send_ping,另一部分是recv_ping,下面我们专注于send_ping部分。

void CalculatorClient::ping(const Test& test) { send_ping(test); recv_ping(); }

send_ping部分主要如下所示,首先是写入message开始的相关信息,之后写入相关参数的信息,最后写入message结束标识符。最后transport层再写入end,这个函数返回的就是整个buffer区的大小,当然还需要flush掉。

void CalculatorClient::send_ping(const Test& test) { int32_t cseqid = 0; oprot_->writeMessageBegin("ping", ::apache::thrift::protocol::T_CALL, cseqid); Calculator_ping_pargs args; args.test = &test; args.write(oprot_); oprot_->writeMessageEnd(); oprot_->getTransport()->writeEnd(); oprot_->getTransport()->flush(); }

最后来贴一下server端和client端的配置信息

// client stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090)); stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket)); stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); CalculatorClient client(protocol); // server stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090)); stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket)); stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); CalculatorClient client(protocol);

1. 写入message开始的相关信息

如下所示,是TBinaryProtocol的写入的函数,首先是4 bytes的version,再是4 bytes(string length)+函数名长度+4 bytes的seqid。

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageBegin(const std::string& name, const TMessageType messageType, const int32_t seqid) { if (this->strict_write_) { int32_t version = (VERSION_1) | ((int32_t)messageType); uint32_t wsize = 0; wsize += writeI32(version); wsize += writeString(name); wsize += writeI32(seqid); return wsize; } else { uint32_t wsize = 0; wsize += writeString(name); wsize += writeByte((int8_t)messageType); wsize += writeI32(seqid); return wsize; } }

2. 传递参数部分

接下来传递的是函数参数的相关信息了,下面分以下几种方式进行讨论

2.1. 当参数类型是int_32等

参数部分的字节数是1 byte的参数Type和2 bytes的参数ID,通过这两个量可以来定位参数的位置,从而把相应参数传进去。当参数传递完成之后,最后写入field stop标志符。

1.为什么fieldend为0?个人觉得因为前面已经知道了参数type,也就没有必要在写入end标识符了。

2.bool、int_8、int_16、int_32、int_64、double是同理的。

uint32_t Calculator_add_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { uint32_t xfer = 0; ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("Calculator_add_pargs"); // 0 byte // 3 bytes(参数ID和Type) xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1); xfer += oprot->writeI32((*(this->num1))); // int type is 4 bytes xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2); xfer += oprot->writeI32((*(this->num2))); xfer += oprot->writeFieldEnd(); xfer += oprot->writeFieldStop(); // 1 byte xfer += oprot->writeStructEnd(); // 0 byte return xfer; }

2.1.1. writeStructBegin是写入0 byte

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructBegin(const char* name) { (void)name; return 0; }

2.1.2. writeFieldBegin是写入3 bytes

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId) { (void)name; uint32_t wsize = 0; wsize += writeByte((int8_t)fieldType); wsize += writeI16(fieldId); return wsize; }

2.1.3. writeI32函数是写入4 bytes

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeI32(const int32_t i32) { int32_t net = (int32_t)ByteOrder_::toWire32(i32); this->trans_->write((uint8_t*)&net, 4); return 4; }

2.1.4. writeFieldEnd函数是写入0字节

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldEnd() { return 0; }

2.1.5. writeFieldStop是写入1字节

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldStop() { return writeByte((int8_t)T_STOP); }

2.1.6. writeStructEnd函数是写入0字节

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructEnd() { return 0; }

2.2. 当参数类型是字符串

当参数类型是字符串的话,同样需要写入1 byte的参数Type和2 bytes的参数ID,但是除写入字符串外,还要写入4字节的字符串长度。通过字符串长度字段,可以完整的将参数中的字符串传进去。

uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { uint32_t xfer = 0; ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("Calculator_ping_pargs"); // 0 byte // 3 bytes(参数ID和Type) xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRING, 1); // string length + 4 bytes(store the string length) xfer += oprot->writeString((*(this->test))); xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldStop(); // 1 byte xfer += oprot->writeStructEnd(); // 0 byte return xfer; }

2.2.1. writeString函数4字节的字符串长度+字符串本身

uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeString(const StrType& str) { if (str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)())) throw TProtocolException(TProtocolException::SIZE_LIMIT); uint32_t size = static_cast<uint32_t>(str.size()); uint32_t result = writeI32((int32_t)size); if (size > 0) { this->trans_->write((uint8_t*)str.data(), size); } return result + size; }

2.3. 当参数类型是bytes数组的时候

它还是会当做string来写入,同传字符串一样

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeBinary(const std::string& str) { return TBinaryProtocolT<Transport_, ByteOrder_>::writeString(str); }

2.4. 当参数类型是struct时

参数类型为struct时,那么同其他参数类型一样,也需要拿出3个字节存储struct的类型和ID,之后是依次存储struct中的内容,最后写入1 byte的field stop。

uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { uint32_t xfer = 0; ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("Calculator_ping_pargs"); // 0 byte xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRUCT, 1); // 3 bytes xfer += (*(this->test)).write(oprot); // 具体bytes number看下面这段代码 xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldStop(); // 1 byte xfer += oprot->writeStructEnd(); // 0 byte return xfer; }

下面这段代码是依次存储struct 中的内容。

uint32_t Test::write(::apache::thrift::protocol::TProtocol* oprot) const { uint32_t xfer = 0; ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("Test"); // 0 byte xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1); // 3 bytes xfer += oprot->writeI32(this->num1); // int type is 4 bytes xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_DOUBLE, 2); // 3 bytes xfer += oprot->writeDouble(this->num2); // double type is 8 bytes xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldBegin("str", ::apache::thrift::protocol::T_STRING, 3); // 3 bytes xfer += oprot->writeString(this->str); // string length + 4 bytes(store the string length) xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldBegin("bs", ::apache::thrift::protocol::T_STRING, 4); // 3 bytes xfer += oprot->writeBinary(this->bs); // string length + 4 bytes(store the string length) xfer += oprot->writeFieldEnd(); // 0 byte xfer += oprot->writeFieldStop(); // 1 byte xfer += oprot->writeStructEnd(); // 0 byte return xfer; }

3. 写入message end的部分

对于写入message end来说,写入的是0 byte。

template <class Transport_, class ByteOrder_> uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageEnd() { return 0; }

4. 最后是transport层的操作

oprot_->getTransport()->writeEnd(); oprot_->getTransport()->flush();

由于我们TFramedTransport,所以要比TBufferedTransport多传4 bytes,先来看一下这两个函数,其中writeEnd函数并没有写入任何东西,而是直接返回缓冲区的大小。

uint32_t TFramedTransport::writeEnd() { return static_cast<uint32_t>(wBase_ - wBuf_.get()); }

在这边transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);可以看到flush的时候是把字节串的内容和4 bytes的字节串的长度写进去了。wBase_ = wBuf_.get() + sizeof(sz_nbo);这一部分的操作是重置wBase_,从wBuf_后面4 bytes的地方开始。

void TFramedTransport::flush() { int32_t sz_hbo, sz_nbo; assert(wBufSize_ > sizeof(sz_nbo)); // Slip the frame size into the start of the buffer. sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo))); sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo)); memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo)); if (sz_hbo > 0) { // Note that we reset wBase_ (with a pad for the frame size) // prior to the underlying write to ensure we're in a sane state // (i.e. internal buffer cleaned) if the underlying write throws // up an exception wBase_ = wBuf_.get() + sizeof(sz_nbo); // Write size and frame body. transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo); } // Flush the underlying transport. transport_->flush(); // reclaim write buffer if (wBufSize_ > bufReclaimThresh_) { wBufSize_ = DEFAULT_BUFFER_SIZE; wBuf_.reset(new uint8_t[wBufSize_]); setWriteBuffer(wBuf_.get(), wBufSize_); // reset wBase_ with a pad for the frame size int32_t pad = 0; wBase_ = wBuf_.get() + sizeof(pad); } }

TBufferedTransport和TFramedTransport都是有缓存的,均继承TBufferBase,调用下一层TTransport类进行读写操作,结构极为相似。只是TFramedTransport以帧为传输单位,帧结构为:4个字节(int32_t)+传输字节串,头4个字节是存储后面字节串的长度,该字节串才是正确需要传输的数据,因此TFramedTransport每传一帧要比TBufferedTransport和TSocket多传4个字节;

本段参考 xiazemin 的泽民博客 https://xiazemin.github.io/MyBlog/web/2019/06/12/transport.html#

5. 总结整理

假如client端和server端采用的都是TFramedTransport和TBinaryProtocol配置,RPC过程封包如下

首先是message begin的写入:4 bytes的version字段+4 bytes用来存储函数名长度的字段+函数名长度字段+4bytes的sequence ID字段;

接下去是函数参数部分的写入,针对不同参数的类型来说,都会写入1 byte参数的类型和2 bytes的参数ID(用于定位),之后再写入参数的值,而对于binary数组或者string,那么在写入参数值之前需要先写入binary数组的长度或者string的长度(为了得到正确的参数值),当函数参数部分的内容写完之后会最后写入1 byte的field stop(表示函数参数部分结束)。针对struct类型也是如此,struct也是一个参数类型,所以首先需要用1 byte参数的类型和2 bytes的参数ID来表示传入的struct参数,之后还需要对struct的内容进行写入,而struct的内容的写入同写函数部分内容类似,为表示struct 的内容写入完全,最后也需要写入field stop。当struct的内容写入完全之后,在传递参数这块最后依旧还得写入一个file stop表示函数参数部分结束。

之后是message end的写入,只是针对上述提到的配置,不会写入任何内容;

最后由于是TFramedTransport,那么会多加一个4 bytes的字段,用来表示上述字节串的长度;

本文参考及推荐博客

1.浅谈Thrift内部实现原理

欢迎关注微信公众号【一口程序锅】,一口想煮点技术的锅。

最新回复(0)