Class: Thrift::HeaderTransport
- Inherits:
-
BaseTransport
- Object
- BaseTransport
- Thrift::HeaderTransport
- Defined in:
- lib/thrift/transport/header_transport.rb
Overview
HeaderTransport implements the THeader framing protocol.
THeader is a transport that adds headers and supports multiple protocols and transforms. It can auto-detect and communicate with legacy protocols (framed/unframed binary/compact) for backward compatibility.
Wire format:
+----------------------------------------------------------------+
| LENGTH (4 bytes, big-endian, excludes itself) |
+----------------------------------------------------------------+
| HEADER MAGIC (2 bytes: 0x0FFF) | FLAGS (2 bytes) |
+----------------------------------------------------------------+
| SEQUENCE NUMBER (4 bytes) |
+----------------------------------------------------------------+
| HEADER SIZE/4 (2 bytes) | HEADER DATA (variable)... |
+----------------------------------------------------------------+
| PAYLOAD (variable) |
+----------------------------------------------------------------+
Constant Summary collapse
- HEADER_MAGIC =
Header magic value (first 2 bytes of header)
0x0FFF- MAX_FRAME_SIZE =
Maximum frame size (~1GB)
0x3FFFFFFF- BINARY_VERSION_MASK =
Binary protocol version mask and version 1
0xffff0000- BINARY_VERSION_1 =
0x80010000- COMPACT_PROTOCOL_ID =
Compact protocol ID
0x82- COMPACT_VERSION_MASK =
0x1f- COMPACT_VERSION =
0x01
Instance Attribute Summary collapse
-
#flags ⇒ Object
readonly
Returns the value of attribute flags.
-
#protocol_id ⇒ Object
readonly
Returns the value of attribute protocol_id.
-
#sequence_id ⇒ Object
Returns the value of attribute sequence_id.
Instance Method Summary collapse
-
#add_transform(transform_id) ⇒ Object
Adds a transform to apply when writing.
-
#clear_headers ⇒ Object
Clears all write headers.
- #close ⇒ Object
- #flush ⇒ Object
-
#get_headers ⇒ Object
Returns the headers read from the last frame.
-
#initialize(transport, allowed_client_types = nil, default_protocol = HeaderSubprotocolID::COMPACT) ⇒ HeaderTransport
constructor
Creates a new HeaderTransport wrapping the given transport.
- #open ⇒ Object
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
-
#reset_protocol ⇒ Object
Reads the next frame to detect protocol/client type before decoding.
-
#set_header(key, value) ⇒ Object
Sets a header to be written with the next flush.
-
#set_max_frame_size(size) ⇒ Object
Sets the maximum allowed frame size.
- #to_s ⇒ Object
- #write(buf) ⇒ Object
Methods inherited from BaseTransport
#read_all, #read_byte, #read_into_buffer
Constructor Details
#initialize(transport, allowed_client_types = nil, default_protocol = HeaderSubprotocolID::COMPACT) ⇒ HeaderTransport
Creates a new HeaderTransport wrapping the given transport.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/thrift/transport/header_transport.rb', line 93 def initialize(transport, allowed_client_types = nil, default_protocol = HeaderSubprotocolID::COMPACT) @transport = transport @client_type = HeaderClientType::HEADERS @protocol_id = default_protocol @allowed_client_types = allowed_client_types || [ HeaderClientType::HEADERS, HeaderClientType::FRAMED_BINARY, HeaderClientType::UNFRAMED_BINARY, HeaderClientType::FRAMED_COMPACT, HeaderClientType::UNFRAMED_COMPACT ] @read_buffer = StringIO.new(Bytes.empty_byte_buffer) @write_buffer = StringIO.new(Bytes.empty_byte_buffer) @read_headers = {} @write_headers = {} @write_transforms = [] @sequence_id = 0 @flags = 0 @max_frame_size = MAX_FRAME_SIZE end |
Instance Attribute Details
#flags ⇒ Object (readonly)
Returns the value of attribute flags.
85 86 87 |
# File 'lib/thrift/transport/header_transport.rb', line 85 def flags @flags end |
#protocol_id ⇒ Object (readonly)
Returns the value of attribute protocol_id.
85 86 87 |
# File 'lib/thrift/transport/header_transport.rb', line 85 def protocol_id @protocol_id end |
#sequence_id ⇒ Object
Returns the value of attribute sequence_id.
85 86 87 |
# File 'lib/thrift/transport/header_transport.rb', line 85 def sequence_id @sequence_id end |
Instance Method Details
#add_transform(transform_id) ⇒ Object
Adds a transform to apply when writing
160 161 162 163 164 165 |
# File 'lib/thrift/transport/header_transport.rb', line 160 def add_transform(transform_id) unless transform_id == HeaderTransformID::ZLIB raise TransportException.new(TransportException::UNKNOWN, "Unknown transform: #{transform_id}") end @write_transforms << transform_id unless @write_transforms.include?(transform_id) end |
#clear_headers ⇒ Object
Clears all write headers
153 154 155 |
# File 'lib/thrift/transport/header_transport.rb', line 153 def clear_headers @write_headers.clear end |
#close ⇒ Object
133 134 135 |
# File 'lib/thrift/transport/header_transport.rb', line 133 def close @transport.close end |
#flush ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/thrift/transport/header_transport.rb', line 199 def flush payload = @write_buffer.string @write_buffer = StringIO.new(Bytes.empty_byte_buffer) return if payload.empty? if payload.bytesize > @max_frame_size raise TransportException.new(TransportException::UNKNOWN, "Attempting to send frame that is too large") end case @client_type when HeaderClientType::HEADERS flush_header_format(payload) when HeaderClientType::FRAMED_BINARY, HeaderClientType::FRAMED_COMPACT flush_framed(payload) when HeaderClientType::UNFRAMED_BINARY, HeaderClientType::UNFRAMED_COMPACT @transport.write(payload) @transport.flush else flush_header_format(payload) end end |
#get_headers ⇒ Object
Returns the headers read from the last frame
138 139 140 |
# File 'lib/thrift/transport/header_transport.rb', line 138 def get_headers @read_headers end |
#open ⇒ Object
129 130 131 |
# File 'lib/thrift/transport/header_transport.rb', line 129 def open @transport.open end |
#open? ⇒ Boolean
125 126 127 |
# File 'lib/thrift/transport/header_transport.rb', line 125 def open? @transport.open? end |
#read(sz) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/thrift/transport/header_transport.rb', line 175 def read(sz) # Try reading from existing buffer data = @read_buffer.read(sz) data = Bytes.empty_byte_buffer if data.nil? bytes_left = sz - data.bytesize return data if bytes_left == 0 # Handle unframed passthrough - read directly from underlying transport if @client_type == HeaderClientType::UNFRAMED_BINARY || @client_type == HeaderClientType::UNFRAMED_COMPACT return data + @transport.read(bytes_left) end # Need to read the next frame read_frame(bytes_left) additional = @read_buffer.read(bytes_left) data + (additional || Bytes.empty_byte_buffer) end |
#reset_protocol ⇒ Object
Reads the next frame to detect protocol/client type before decoding.
226 227 228 229 230 |
# File 'lib/thrift/transport/header_transport.rb', line 226 def reset_protocol return unless @read_buffer.nil? || @read_buffer.eof? read_frame(0) end |
#set_header(key, value) ⇒ Object
Sets a header to be written with the next flush
146 147 148 149 150 |
# File 'lib/thrift/transport/header_transport.rb', line 146 def set_header(key, value) key = Bytes.force_binary_encoding(key.to_s) value = Bytes.force_binary_encoding(value.to_s) @write_headers[key] = value end |
#set_max_frame_size(size) ⇒ Object
Sets the maximum allowed frame size
168 169 170 171 172 173 |
# File 'lib/thrift/transport/header_transport.rb', line 168 def set_max_frame_size(size) if size <= 0 || size > MAX_FRAME_SIZE raise ArgumentError, "max_frame_size must be > 0 and <= #{MAX_FRAME_SIZE}" end @max_frame_size = size end |
#to_s ⇒ Object
221 222 223 |
# File 'lib/thrift/transport/header_transport.rb', line 221 def to_s "header(#{@transport.to_s})" end |
#write(buf) ⇒ Object
195 196 197 |
# File 'lib/thrift/transport/header_transport.rb', line 195 def write(buf) @write_buffer.write(Bytes.force_binary_encoding(buf)) end |