Class: Async::HTTP::Protocol::HTTP2::Stream
- Inherits:
-
Protocol::HTTP2::Stream
- Object
- Protocol::HTTP2::Stream
- Async::HTTP::Protocol::HTTP2::Stream
- Defined in:
- lib/async/http/protocol/http2/stream.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#pool ⇒ Object
Returns the value of attribute pool.
Instance Method Summary collapse
- #add_header(key, value) ⇒ Object
-
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called.
-
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
-
#initialize ⇒ Stream
constructor
A new instance of Stream.
-
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
- #process_data(frame) ⇒ Object
- #process_headers(frame) ⇒ Object
- #receive_trailing_headers(headers, end_stream) ⇒ Object
-
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
- #update_local_window(frame) ⇒ Object
- #wait_for_input ⇒ Object
- #window_updated(size) ⇒ Object
Constructor Details
#initialize ⇒ Stream
Returns a new instance of Stream.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/async/http/protocol/http2/stream.rb', line 18 def initialize(*) super @headers = nil @pool = nil # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil # Output buffer, writing request body or response body (window_updated): @output = nil end |
Instance Attribute Details
#headers ⇒ Object
Returns the value of attribute headers.
33 34 35 |
# File 'lib/async/http/protocol/http2/stream.rb', line 33 def headers @headers end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
37 38 39 |
# File 'lib/async/http/protocol/http2/stream.rb', line 37 def input @input end |
#pool ⇒ Object
Returns the value of attribute pool.
35 36 37 |
# File 'lib/async/http/protocol/http2/stream.rb', line 35 def pool @pool end |
Instance Method Details
#add_header(key, value) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/async/http/protocol/http2/stream.rb', line 39 def add_header(key, value) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ":" raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" elsif key =~ /[A-Z]/ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else @headers.add(key, value) end end |
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
-
A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
-
A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task.
While the input stream is relatively straight forward, the output stream can trigger the second case above
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/async/http/protocol/http2/stream.rb', line 153 def closed(error) super if input = @input @input = nil input.close_write(error) end if output = @output @output = nil output.stop(error) end if pool = @pool and @connection pool.release(@connection) end return self end |
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/async/http/protocol/http2/stream.rb', line 122 def finish_output(error = nil) return if self.closed? trailer = @output&.trailer @output = nil if error send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) else # Write trailer? if trailer&.any? send_headers(trailer, ::Protocol::HTTP2::END_STREAM) else send_data(nil, ::Protocol::HTTP2::END_STREAM) end end end |
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
79 80 81 82 83 84 85 |
# File 'lib/async/http/protocol/http2/stream.rb', line 79 def prepare_input(length) if @input.nil? @input = Input.new(self, length) else raise ArgumentError, "Input body already prepared!" end end |
#process_data(frame) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/async/http/protocol/http2/stream.rb', line 94 def process_data(frame) data = frame.unpack if @input unless data.empty? @input.write(data) end if frame.end_stream? @input.close_write end end return data rescue ::Protocol::HTTP2::ProtocolError raise rescue # Anything else... send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end |
#process_headers(frame) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/async/http/protocol/http2/stream.rb', line 57 def process_headers(frame) if @headers and frame.end_stream? self.receive_trailing_headers(super, frame.end_stream?) else self.receive_initial_headers(super, frame.end_stream?) end if @input and frame.end_stream? @input.close_write end rescue ::Protocol::HTTP2::HeaderError => error Console.logger.debug(self, error) send_reset_stream(error.code) end |
#receive_trailing_headers(headers, end_stream) ⇒ Object
51 52 53 54 55 |
# File 'lib/async/http/protocol/http2/stream.rb', line 51 def receive_trailing_headers(headers, end_stream) headers.each do |key, value| add_header(key, value) end end |
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
115 116 117 118 119 |
# File 'lib/async/http/protocol/http2/stream.rb', line 115 def send_body(body, trailer = nil) @output = Output.new(self, body, trailer) @output.start end |
#update_local_window(frame) ⇒ Object
87 88 89 90 91 92 |
# File 'lib/async/http/protocol/http2/stream.rb', line 87 def update_local_window(frame) consume_local_window(frame) # This is done on demand in `Input#read`: # request_window_update end |
#wait_for_input ⇒ Object
73 74 75 |
# File 'lib/async/http/protocol/http2/stream.rb', line 73 def wait_for_input return @input end |
#window_updated(size) ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/async/http/protocol/http2/stream.rb', line 141 def window_updated(size) super @output&.window_updated(size) return true end |