Class: Thrift::HeaderTransport

Inherits:
BaseTransport show all
Defined in:
lib/thrift/transport/header_transport.rb

Overview

HeaderTransport implements the THeader framing protocol.

THeader is a transport that adds headers and supports multiple protocols and transforms. It can auto-detect and communicate with legacy protocols (framed/unframed binary/compact) for backward compatibility.

Wire format:

+----------------------------------------------------------------+
| LENGTH (4 bytes, big-endian, excludes itself)                  |
+----------------------------------------------------------------+
| HEADER MAGIC (2 bytes: 0x0FFF) | FLAGS (2 bytes)               |
+----------------------------------------------------------------+
| SEQUENCE NUMBER (4 bytes)                                      |
+----------------------------------------------------------------+
| HEADER SIZE/4 (2 bytes)        | HEADER DATA (variable)...    |
+----------------------------------------------------------------+
| PAYLOAD (variable)                                             |
+----------------------------------------------------------------+

Constant Summary collapse

HEADER_MAGIC =

Header magic value (first 2 bytes of header)

0x0FFF
MAX_FRAME_SIZE =

Maximum frame size (~1GB)

0x3FFFFFFF
BINARY_VERSION_MASK =

Binary protocol version mask and version 1

0xffff0000
BINARY_VERSION_1 =
0x80010000
COMPACT_PROTOCOL_ID =

Compact protocol ID

0x82
COMPACT_VERSION_MASK =
0x1f
COMPACT_VERSION =
0x01

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseTransport

#read_all, #read_byte, #read_into_buffer

Constructor Details

#initialize(transport, allowed_client_types = nil, default_protocol = HeaderSubprotocolID::COMPACT) ⇒ HeaderTransport

Creates a new HeaderTransport wrapping the given transport.

Parameters:

  • transport (BaseTransport)

    The underlying transport to wrap

  • allowed_client_types (Array<Integer>) (defaults to: nil)

    Allowed client types for auto-detection. Defaults to all types for backward compatibility.

  • default_protocol (Integer) (defaults to: HeaderSubprotocolID::COMPACT)

    Default protocol ID (BINARY or COMPACT)



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/thrift/transport/header_transport.rb', line 93

def initialize(transport, allowed_client_types = nil, default_protocol = HeaderSubprotocolID::COMPACT)
  @transport = transport
  @client_type = HeaderClientType::HEADERS
  @protocol_id = default_protocol
  @allowed_client_types = allowed_client_types || [
    HeaderClientType::HEADERS,
    HeaderClientType::FRAMED_BINARY,
    HeaderClientType::UNFRAMED_BINARY,
    HeaderClientType::FRAMED_COMPACT,
    HeaderClientType::UNFRAMED_COMPACT
  ]

  @read_buffer = StringIO.new(Bytes.empty_byte_buffer)
  @write_buffer = StringIO.new(Bytes.empty_byte_buffer)

  @read_headers = {}
  @write_headers = {}
  @write_transforms = []

  @sequence_id = 0
  @flags = 0
  @max_frame_size = MAX_FRAME_SIZE
end

Instance Attribute Details

#flagsObject (readonly)

Returns the value of attribute flags.



85
86
87
# File 'lib/thrift/transport/header_transport.rb', line 85

def flags
  @flags
end

#protocol_idObject (readonly)

Returns the value of attribute protocol_id.



85
86
87
# File 'lib/thrift/transport/header_transport.rb', line 85

def protocol_id
  @protocol_id
end

#sequence_idObject

Returns the value of attribute sequence_id.



85
86
87
# File 'lib/thrift/transport/header_transport.rb', line 85

def sequence_id
  @sequence_id
end

Instance Method Details

#add_transform(transform_id) ⇒ Object

Adds a transform to apply when writing

Parameters:

  • transform_id (Integer)

    Transform ID (e.g., HeaderTransformID::ZLIB)



160
161
162
163
164
165
# File 'lib/thrift/transport/header_transport.rb', line 160

def add_transform(transform_id)
  unless transform_id == HeaderTransformID::ZLIB
    raise TransportException.new(TransportException::UNKNOWN, "Unknown transform: #{transform_id}")
  end
  @write_transforms << transform_id unless @write_transforms.include?(transform_id)
end

#clear_headersObject

Clears all write headers



153
154
155
# File 'lib/thrift/transport/header_transport.rb', line 153

def clear_headers
  @write_headers.clear
end

#closeObject



133
134
135
# File 'lib/thrift/transport/header_transport.rb', line 133

def close
  @transport.close
end

#flushObject



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/thrift/transport/header_transport.rb', line 199

def flush
  payload = @write_buffer.string
  @write_buffer = StringIO.new(Bytes.empty_byte_buffer)

  return if payload.empty?
  if payload.bytesize > @max_frame_size
    raise TransportException.new(TransportException::UNKNOWN, "Attempting to send frame that is too large")
  end

  case @client_type
  when HeaderClientType::HEADERS
    flush_header_format(payload)
  when HeaderClientType::FRAMED_BINARY, HeaderClientType::FRAMED_COMPACT
    flush_framed(payload)
  when HeaderClientType::UNFRAMED_BINARY, HeaderClientType::UNFRAMED_COMPACT
    @transport.write(payload)
    @transport.flush
  else
    flush_header_format(payload)
  end
end

#get_headersObject

Returns the headers read from the last frame



138
139
140
# File 'lib/thrift/transport/header_transport.rb', line 138

def get_headers
  @read_headers
end

#openObject



129
130
131
# File 'lib/thrift/transport/header_transport.rb', line 129

def open
  @transport.open
end

#open?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/thrift/transport/header_transport.rb', line 125

def open?
  @transport.open?
end

#read(sz) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/thrift/transport/header_transport.rb', line 175

def read(sz)
  # Try reading from existing buffer
  data = @read_buffer.read(sz)
  data = Bytes.empty_byte_buffer if data.nil?

  bytes_left = sz - data.bytesize
  return data if bytes_left == 0

  # Handle unframed passthrough - read directly from underlying transport
  if @client_type == HeaderClientType::UNFRAMED_BINARY ||
     @client_type == HeaderClientType::UNFRAMED_COMPACT
    return data + @transport.read(bytes_left)
  end

  # Need to read the next frame
  read_frame(bytes_left)
  additional = @read_buffer.read(bytes_left)
  data + (additional || Bytes.empty_byte_buffer)
end

#reset_protocolObject

Reads the next frame to detect protocol/client type before decoding.



226
227
228
229
230
# File 'lib/thrift/transport/header_transport.rb', line 226

def reset_protocol
  return unless @read_buffer.nil? || @read_buffer.eof?

  read_frame(0)
end

#set_header(key, value) ⇒ Object

Sets a header to be written with the next flush

Parameters:

  • key (String)

    Header key (must be binary string)

  • value (String)

    Header value (must be binary string)



146
147
148
149
150
# File 'lib/thrift/transport/header_transport.rb', line 146

def set_header(key, value)
  key = Bytes.force_binary_encoding(key.to_s)
  value = Bytes.force_binary_encoding(value.to_s)
  @write_headers[key] = value
end

#set_max_frame_size(size) ⇒ Object

Sets the maximum allowed frame size



168
169
170
171
172
173
# File 'lib/thrift/transport/header_transport.rb', line 168

def set_max_frame_size(size)
  if size <= 0 || size > MAX_FRAME_SIZE
    raise ArgumentError, "max_frame_size must be > 0 and <= #{MAX_FRAME_SIZE}"
  end
  @max_frame_size = size
end

#to_sObject



221
222
223
# File 'lib/thrift/transport/header_transport.rb', line 221

def to_s
  "header(#{@transport.to_s})"
end

#write(buf) ⇒ Object



195
196
197
# File 'lib/thrift/transport/header_transport.rb', line 195

def write(buf)
  @write_buffer.write(Bytes.force_binary_encoding(buf))
end