Class: Async::HTTP::Protocol::HTTP2::Stream
- Inherits:
-
Protocol::HTTP2::Stream
- Object
- Protocol::HTTP2::Stream
- Async::HTTP::Protocol::HTTP2::Stream
- Defined in:
- lib/async/http/protocol/http2/stream.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#pool ⇒ Object
Returns the value of attribute pool.
Instance Method Summary collapse
- #add_header(key, value) ⇒ Object
-
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called.
-
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
-
#initialize ⇒ Stream
constructor
A new instance of Stream.
-
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
- #process_data(frame) ⇒ Object
- #process_headers(frame) ⇒ Object
- #receive_trailing_headers(headers, end_stream) ⇒ Object
-
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
- #update_local_window(frame) ⇒ Object
- #wait_for_input ⇒ Object
- #window_updated(size) ⇒ Object
Constructor Details
#initialize ⇒ Stream
Returns a new instance of Stream.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/async/http/protocol/http2/stream.rb', line 18 def initialize(*) super @headers = nil @pool = nil # Input buffer, reading request body, or response body (receive_data): @length = nil @input = nil # Output buffer, writing request body or response body (window_updated): @output = nil end |
Instance Attribute Details
#headers ⇒ Object
Returns the value of attribute headers.
33 34 35 |
# File 'lib/async/http/protocol/http2/stream.rb', line 33 def headers @headers end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
37 38 39 |
# File 'lib/async/http/protocol/http2/stream.rb', line 37 def input @input end |
#pool ⇒ Object
Returns the value of attribute pool.
35 36 37 |
# File 'lib/async/http/protocol/http2/stream.rb', line 35 def pool @pool end |
Instance Method Details
#add_header(key, value) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/async/http/protocol/http2/stream.rb', line 39 def add_header(key, value) if key == CONNECTION raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!" elsif key.start_with? ":" raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!" elsif key =~ /[A-Z]/ raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!" else @headers.add(key, value) end end |
#closed(error) ⇒ Object
When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
-
A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
-
A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task.
While the input stream is relatively straight forward, the output stream can trigger the second case above
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/async/http/protocol/http2/stream.rb', line 154 def closed(error) super if input = @input @input = nil input.close_write(error) end if output = @output @output = nil output.stop(error) end if pool = @pool and @connection pool.release(@connection) end return self end |
#finish_output(error = nil) ⇒ Object
Called when the output terminates normally.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/async/http/protocol/http2/stream.rb', line 125 def finish_output(error = nil) return if self.closed? trailer = @output&.trailer @output = nil if error send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) else # Write trailer? if trailer&.any? send_headers(nil, trailer, ::Protocol::HTTP2::END_STREAM) else send_data(nil, ::Protocol::HTTP2::END_STREAM) end end end |
#prepare_input(length) ⇒ Input
Prepare the input stream which will be used for incoming data frames.
81 82 83 84 85 86 87 |
# File 'lib/async/http/protocol/http2/stream.rb', line 81 def prepare_input(length) if @input.nil? @input = Input.new(self, length) else raise ArgumentError, "Input body already prepared!" end end |
#process_data(frame) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/async/http/protocol/http2/stream.rb', line 96 def process_data(frame) data = frame.unpack if @input unless data.empty? @input.write(data) end if frame.end_stream? @input.close_write @input = nil end end return data rescue ::Protocol::HTTP2::ProtocolError raise rescue # Anything else... send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR) end |
#process_headers(frame) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/async/http/protocol/http2/stream.rb', line 57 def process_headers(frame) if @headers and frame.end_stream? self.receive_trailing_headers(super, frame.end_stream?) else self.receive_initial_headers(super, frame.end_stream?) end # TODO this might need to be in an ensure block: if input = @input and frame.end_stream? @input = nil input.close_write end rescue ::Protocol::HTTP2::HeaderError => error Console.logger.debug(self, error) send_reset_stream(error.code) end |
#receive_trailing_headers(headers, end_stream) ⇒ Object
51 52 53 54 55 |
# File 'lib/async/http/protocol/http2/stream.rb', line 51 def receive_trailing_headers(headers, end_stream) headers.each do |key, value| add_header(key, value) end end |
#send_body(body, trailer = nil) ⇒ Object
Set the body and begin sending it.
118 119 120 121 122 |
# File 'lib/async/http/protocol/http2/stream.rb', line 118 def send_body(body, trailer = nil) @output = Output.new(self, body, trailer) @output.start end |
#update_local_window(frame) ⇒ Object
89 90 91 92 93 94 |
# File 'lib/async/http/protocol/http2/stream.rb', line 89 def update_local_window(frame) consume_local_window(frame) # This is done on demand in `Input#read`: # request_window_update end |
#wait_for_input ⇒ Object
75 76 77 |
# File 'lib/async/http/protocol/http2/stream.rb', line 75 def wait_for_input return @input end |
#window_updated(size) ⇒ Object
144 145 146 147 148 |
# File 'lib/async/http/protocol/http2/stream.rb', line 144 def window_updated(size) super @output&.window_updated(size) end |