Class: HTTP2::Stream
- Inherits:
-
Object
- Object
- HTTP2::Stream
- Includes:
- Emitter, Error, FlowBuffer
- Defined in:
- lib/http/2/stream.rb
Overview
A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.
This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.
+--------+
PP | | PP
,--------| idle |--------.
/ | | \
v +--------+ v
+----------+ | +----------+
| | | H | |
,---|:reserved | | |:reserved |---.
| | (local) | v | (remote) | |
| +----------+ +--------+ +----------+ |
| | :active | | :active | |
| | ,-------|:active |-------. | |
| | H / ES | | ES \ H | |
| v v +--------+ v v |
| +-----------+ | +-----------+ |
| |:half_close| | |:half_close| |
| | (remote) | | | (local) | |
| +-----------+ | +-----------+ |
| | v | |
| | ES/R +--------+ ES/R | |
| `----------->| |<-----------' |
| R | :close | R |
`-------------------->| |<--------------------'
+--------+
Constant Summary collapse
- STREAM_OPEN_STATES =
%i[open half_closed_local half_closing closing].freeze
Constants included from FlowBuffer
Instance Attribute Summary collapse
-
#closed ⇒ Object
readonly
Reason why connection was closed.
-
#dependency ⇒ Object
readonly
Returns the value of attribute dependency.
-
#id ⇒ Object
readonly
Stream ID (odd for client initiated streams, even otherwise).
-
#local_window ⇒ Object
(also: #window)
readonly
Size of current stream flow control window.
-
#parent ⇒ Object
readonly
Request parent stream of push stream.
-
#remote_window ⇒ Object
readonly
Returns the value of attribute remote_window.
-
#state ⇒ Object
readonly
Stream state as defined by HTTP 2.0.
-
#weight ⇒ Object
readonly
Stream priority as set by initiator.
Attributes included from FlowBuffer
Instance Method Summary collapse
- #calculate_content_length(data_length) ⇒ Object
-
#cancel ⇒ Object
Sends a RST_STREAM indicating that the stream is no longer needed.
-
#chunk_data(payload, max_size) ⇒ Object
Chunk data into max_size, yield each chunk, then return final chunk.
-
#close(error = :stream_closed) ⇒ Object
Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.
- #closed? ⇒ Boolean
-
#data(payload, end_stream: true) ⇒ Object
Sends DATA frame containing response payload.
-
#headers(headers, end_headers: true, end_stream: false) ⇒ Object
Sends a HEADERS frame containing HTTP response headers.
-
#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream
constructor
Initializes new stream.
- #promise(headers, end_headers: true, &block) ⇒ Object
-
#receive(frame) ⇒ Object
(also: #<<)
Processes incoming HTTP 2.0 frames.
-
#refuse ⇒ Object
Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.
-
#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object
Sends a PRIORITY frame with new stream priority value (can only be performed by the client).
-
#send(frame) ⇒ Object
Processes outgoing HTTP 2.0 frames.
- #verify_trailers(frame) ⇒ Object
-
#window_update(increment) ⇒ Object
Sends a WINDOW_UPDATE frame to the peer.
Methods included from Emitter
Methods included from FlowBuffer
Constructor Details
#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream
Initializes new stream.
Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/http/2/stream.rb', line 78 def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) stream_error(:protocol_error, msg: "stream can't depend on itself") if id == dependency @connection = connection @id = id @weight = weight @dependency = dependency # from mixins @listeners = Hash.new { |hash, key| hash[key] = [] } @send_buffer = FrameBuffer.new process_priority(weight: weight, dependency: dependency, exclusive: exclusive) @local_window_max_size = connection.local_settings[:settings_initial_window_size] @local_window = connection.local_settings[:settings_initial_window_size] @remote_window = connection.remote_settings[:settings_initial_window_size] @parent = parent @state = state @error = false @closed = false @_method = @_content_length = @_status_code = @_trailers = nil @_waiting_on_trailers = false @received_data = false @activated = false on(:window) { |v| @remote_window = v } on(:local_window) { |v| @local_window_max_size = @local_window = v } end |
Instance Attribute Details
#closed ⇒ Object (readonly)
Reason why connection was closed.
63 64 65 |
# File 'lib/http/2/stream.rb', line 63 def closed @closed end |
#dependency ⇒ Object (readonly)
Returns the value of attribute dependency.
56 57 58 |
# File 'lib/http/2/stream.rb', line 56 def dependency @dependency end |
#id ⇒ Object (readonly)
Stream ID (odd for client initiated streams, even otherwise).
46 47 48 |
# File 'lib/http/2/stream.rb', line 46 def id @id end |
#local_window ⇒ Object (readonly) Also known as: window
Size of current stream flow control window.
59 60 61 |
# File 'lib/http/2/stream.rb', line 59 def local_window @local_window end |
#parent ⇒ Object (readonly)
Request parent stream of push stream.
52 53 54 |
# File 'lib/http/2/stream.rb', line 52 def parent @parent end |
#remote_window ⇒ Object (readonly)
Returns the value of attribute remote_window.
56 57 58 |
# File 'lib/http/2/stream.rb', line 56 def remote_window @remote_window end |
#state ⇒ Object (readonly)
Stream state as defined by HTTP 2.0.
49 50 51 |
# File 'lib/http/2/stream.rb', line 49 def state @state end |
#weight ⇒ Object (readonly)
Stream priority as set by initiator.
55 56 57 |
# File 'lib/http/2/stream.rb', line 55 def weight @weight end |
Instance Method Details
#calculate_content_length(data_length) ⇒ Object
189 190 191 192 193 194 195 196 |
# File 'lib/http/2/stream.rb', line 189 def calculate_content_length(data_length) return unless @_content_length && data_length @_content_length -= data_length return if @_content_length >= 0 stream_error(:protocol_error, msg: "received more data than what was defined in content-length") end |
#cancel ⇒ Object
Sends a RST_STREAM indicating that the stream is no longer needed.
293 294 295 |
# File 'lib/http/2/stream.rb', line 293 def cancel send(type: :rst_stream, error: :cancel) end |
#chunk_data(payload, max_size) ⇒ Object
Chunk data into max_size, yield each chunk, then return final chunk
274 275 276 277 278 279 280 281 282 |
# File 'lib/http/2/stream.rb', line 274 def chunk_data(payload, max_size) total = payload.bytesize cursor = 0 while (total - cursor) > max_size yield payload.byteslice(cursor, max_size) cursor += max_size end payload.byteslice(cursor, total - cursor) end |
#close(error = :stream_closed) ⇒ Object
Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.
288 289 290 |
# File 'lib/http/2/stream.rb', line 288 def close(error = :stream_closed) send(type: :rst_stream, error: error) end |
#closed? ⇒ Boolean
107 108 109 |
# File 'lib/http/2/stream.rb', line 107 def closed? @state == :closed end |
#data(payload, end_stream: true) ⇒ Object
Sends DATA frame containing response payload.
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/http/2/stream.rb', line 256 def data(payload, end_stream: true) # Split data according to each frame is smaller enough # TODO: consider padding? max_size = @connection.remote_settings[:settings_max_frame_size] if payload.bytesize > max_size payload = chunk_data(payload, max_size) do |chunk| send(type: :data, flags: 0, payload: chunk) end end flags = 0 flags |= END_STREAM if end_stream send(type: :data, flags: flags, payload: payload) end |
#headers(headers, end_headers: true, end_stream: false) ⇒ Object
Sends a HEADERS frame containing HTTP response headers. All pseudo-header fields MUST appear in the header block before regular header fields.
228 229 230 231 232 233 |
# File 'lib/http/2/stream.rb', line 228 def headers(headers, end_headers: true, end_stream: false) flags = end_headers ? END_HEADERS : 0 flags |= END_STREAM if end_stream || @_method == "HEAD" send(type: :headers, flags: flags, payload: headers) end |
#promise(headers, end_headers: true, &block) ⇒ Object
235 236 237 238 239 240 |
# File 'lib/http/2/stream.rb', line 235 def promise(headers, end_headers: true, &block) raise ArgumentError, "must provide callback" unless block flags = end_headers ? END_HEADERS : 0 emit(:promise, self, headers, flags, &block) end |
#receive(frame) ⇒ Object Also known as: <<
Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/http/2/stream.rb', line 114 def receive(frame) transition(frame, false) frame_type = frame[:type] #: Symbol case frame_type when :data # @type var frame: data_frame # 6.1. DATA # If a DATA frame is received whose stream is not in "open" or # "half closed (local)" state, the recipient MUST respond with a # stream error (Section 5.4.2) of type STREAM_CLOSED. stream_error(:stream_closed) unless STREAM_OPEN_STATES.include?(@state) || (@state == :closed && @closed == :local_rst) @received_data = true calculate_content_length(frame[:length]) update_local_window(frame) # Emit DATA frame emit(:data, frame[:payload]) unless frame[:ignore] calculate_window_update(@local_window_max_size) when :headers # @type var frame: headers_frame stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed @_method ||= frame[:method] @_status_code ||= frame[:status] @_content_length ||= frame[:content_length] @_trailers ||= frame[:trailer] if @_waiting_on_trailers || (@received_data && (!@_status_code || @_status_code >= 200)) # An endpoint that receives a HEADERS frame without the END_STREAM flag set after receiving a final # (non-informational) status code MUST treat the corresponding request or response as malformed. verify_trailers(frame) end emit(:headers, frame[:payload]) unless frame[:ignore] @_waiting_on_trailers = !@_trailers.nil? when :push_promise emit(:promise_headers, frame[:payload]) unless frame[:ignore] when :continuation stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed stream_error(:protocol_error) if @received_data when :priority # @type var frame: priority_frame process_priority(frame) when :window_update # @type var frame: window_update_frame process_window_update(frame: frame) when :altsvc # @type var frame: origin_frame # 4. The ALTSVC HTTP/2 Frame # An ALTSVC frame on a # stream other than stream 0 containing non-empty "Origin" information # is invalid and MUST be ignored. emit(frame_type, frame) if !frame[:origin] || frame[:origin].empty? end complete_transition(frame) end |
#refuse ⇒ Object
Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.
299 300 301 |
# File 'lib/http/2/stream.rb', line 299 def refuse send(type: :rst_stream, error: :refused_stream) end |
#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object
Sends a PRIORITY frame with new stream priority value (can only be performed by the client).
247 248 249 250 |
# File 'lib/http/2/stream.rb', line 247 def reprioritize(weight: 16, dependency: 0, exclusive: false) stream_error if @id.even? send(type: :priority, weight: weight, dependency: dependency, exclusive: exclusive) end |
#send(frame) ⇒ Object
Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/http/2/stream.rb', line 203 def send(frame) case frame[:type] when :data # @type var frame: data_frame # stream state management is maintained in send_data return send_data(frame) when :window_update # @type var frame: window_update_frame @local_window += frame[:increment] when :priority # @type var frame: priority_frame process_priority(frame) end manage_state(frame) do emit(:frame, frame) end end |
#verify_trailers(frame) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/http/2/stream.rb', line 175 def verify_trailers(frame) stream_error(:protocol_error, msg: "trailer headers frame must close the stream") unless end_stream?(frame) return unless @_trailers trailers = frame[:payload] return unless trailers.respond_to?(:each) trailers.each do |field, _| # rubocop:disable Style/HashEachMethods @_trailers.delete(field) break if @_trailers.empty? end stream_error(:protocol_error, msg: "didn't receive all expected trailer headers") unless @_trailers.empty? end |
#window_update(increment) ⇒ Object
Sends a WINDOW_UPDATE frame to the peer.
306 307 308 309 310 311 |
# File 'lib/http/2/stream.rb', line 306 def window_update(increment) # emit stream-level WINDOW_UPDATE unless stream is closed return if @state == :closed || @state == :remote_closed send(type: :window_update, increment: increment) end |