Class: Async::HTTP::Protocol::HTTP2::Stream
- Inherits:
-
Protocol::HTTP2::Stream
- Object
- Protocol::HTTP2::Stream
- Async::HTTP::Protocol::HTTP2::Stream
- Defined in:
- lib/async/http/protocol/http2/stream.rb
Overview
An HTTP/2 stream that manages headers, input data, and output data for a single request/response exchange.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#pool ⇒ Object
Returns the value of attribute pool.
Instance Method Summary collapse
-
#add_header(key, value, trailer: false) ⇒ Object
Add a header to the stream, validating against HTTP/2 constraints.
-
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called.
-
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
-
#initialize ⇒ Stream
constructor
Initialize the stream state.
-
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
-
#process_data(frame) ⇒ Object
Process an incoming DATA frame and write it to the input body.
-
#process_headers(frame) ⇒ Object
Process an incoming HEADERS frame, dispatching to initial or trailing header handling.
-
#receive_trailing_headers(headers, end_stream) ⇒ Object
Process trailing headers received after the body.
-
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
-
#update_local_window(frame) ⇒ Object
Update the local flow control window after receiving data.
- #wait_for_input ⇒ Object
-
#window_updated(size) ⇒ Object
Called when the flow control window is updated.
Constructor Details
#initialize ⇒ Stream
Initialize the stream state.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/async/http/protocol/http2/stream.rb', line 20 def initialize(*) super @headers = nil @pool = nil # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil # Output buffer, writing request body or response body (window_updated): @output = nil end |
Instance Attribute Details
#headers ⇒ Object
Returns the value of attribute headers.
35 36 37 |
# File 'lib/async/http/protocol/http2/stream.rb', line 35 def headers @headers end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
39 40 41 |
# File 'lib/async/http/protocol/http2/stream.rb', line 39 def input @input end |
#pool ⇒ Object
Returns the value of attribute pool.
37 38 39 |
# File 'lib/async/http/protocol/http2/stream.rb', line 37 def pool @pool end |
Instance Method Details
#add_header(key, value, trailer: false) ⇒ Object
Add a header to the stream, validating against HTTP/2 constraints.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/async/http/protocol/http2/stream.rb', line 44 def add_header(key, value, trailer: false) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ":" raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" elsif key =~ /[A-Z]/ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else @headers.add(key, value, trailer: trailer) end end |
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
-
A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
-
A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task.
While the input stream is relatively straight forward, the output stream can trigger the second case above
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/async/http/protocol/http2/stream.rb', line 176 def closed(error) super if input = @input @input = nil input.close_write(error) end if output = @output @output = nil output.stop(error) end if pool = @pool and @connection pool.release(@connection) end return self end |
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/async/http/protocol/http2/stream.rb', line 142 def finish_output(error = nil) return if self.closed? trailer = @output&.trailer @output = nil if error send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) else # Write trailer? if trailer&.any? send_headers(trailer, ::Protocol::HTTP2::END_STREAM) else send_data(nil, ::Protocol::HTTP2::END_STREAM) end end end |
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
94 95 96 97 98 99 100 |
# File 'lib/async/http/protocol/http2/stream.rb', line 94 def prepare_input(length) if @input.nil? @input = Input.new(self, length) else raise ArgumentError, "Input body already prepared!" end end |
#process_data(frame) ⇒ Object
Process an incoming DATA frame and write it to the input body.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/async/http/protocol/http2/stream.rb', line 114 def process_data(frame) data = frame.unpack if @input unless data.empty? @input.write(data) end if frame.end_stream? @input.close_write end end return data rescue ::Protocol::HTTP2::ProtocolError raise rescue # Anything else... send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end |
#process_headers(frame) ⇒ Object
Process an incoming HEADERS frame, dispatching to initial or trailing header handling.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/async/http/protocol/http2/stream.rb', line 67 def process_headers(frame) if @headers and frame.end_stream? self.receive_trailing_headers(super, frame.end_stream?) else self.receive_initial_headers(super, frame.end_stream?) end if @input and frame.end_stream? @input.close_write end rescue ::Protocol::HTTP::InvalidTrailerError => error Console.warn(self, error) send_reset_stream(::Protocol::HTTP2::Error::PROTOCOL_ERROR) rescue ::Protocol::HTTP2::HeaderError => error Console.debug(self, "Error while processing headers!", error) send_reset_stream(error.code) end |
#receive_trailing_headers(headers, end_stream) ⇒ Object
Process trailing headers received after the body.
59 60 61 62 63 |
# File 'lib/async/http/protocol/http2/stream.rb', line 59 def receive_trailing_headers(headers, end_stream) headers.each do |key, value| add_header(key, value, trailer: true) end end |
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
135 136 137 138 139 |
# File 'lib/async/http/protocol/http2/stream.rb', line 135 def send_body(body, trailer = nil) @output = Output.new(self, body, trailer) @output.start end |
#update_local_window(frame) ⇒ Object
Update the local flow control window after receiving data.
104 105 106 107 108 109 |
# File 'lib/async/http/protocol/http2/stream.rb', line 104 def update_local_window(frame) consume_local_window(frame) # This is done on demand in `Input#read`: # request_window_update end |
#wait_for_input ⇒ Object
88 89 90 |
# File 'lib/async/http/protocol/http2/stream.rb', line 88 def wait_for_input return @input end |
#window_updated(size) ⇒ Object
Called when the flow control window is updated.
164 165 166 167 168 169 170 |
# File 'lib/async/http/protocol/http2/stream.rb', line 164 def window_updated(size) super @output&.window_updated(size) return true end |