Class: Hyperion::Http2Handler::StreamingInput
- Inherits:
-
Object
- Object
- Hyperion::Http2Handler::StreamingInput
- Defined in:
- lib/hyperion/http2_handler.rb
Overview
Per-stream subclass that captures decoded request pseudo-headers, regular headers, and any DATA frame body bytes for later dispatch. Also exposes a ‘window_available` notification fan-out so the response-writer fiber can sleep until WINDOW_UPDATE arrives. 2.13-D — IO-shaped streaming request body backing `rack.input` for gRPC client-streaming and bidirectional RPCs. Push side is the `RequestStream#process_data` callback (one push per inbound DATA frame); read side is the Rack app’s ‘env.read` calls.
Reads block the calling fiber via ‘Async::Notification` until either bytes arrive or the writer side calls `close_writer` (END_STREAM on the request).
Contract notes:
* `read(n)` returns up to n bytes, or fewer at EOF (Rack 3 §11
IO-conformance: `read(length)` returns nil on EOF).
* `read` (no argument) reads until EOF and returns the rest.
* `each` yields chunk-by-chunk (one yield per pushed chunk).
* `rewind` is a no-op — streaming bodies aren't seekable.
* `close` is a no-op on the read side; the writer drives
end-of-stream via `close_writer`.
The class is intentionally narrow — it implements only the methods gRPC handlers and Rack apps actually call against ‘rack.input`. A real Rack app that does `rack.input.size` or `rack.input.gets` will raise (Rack 3 §11 doesn’t require those for streaming inputs).
Constant Summary collapse
- EMPTY_BIN =
String.new('', encoding: Encoding::ASCII_8BIT).freeze
Instance Attribute Summary collapse
-
#bytes_buffered ⇒ Object
readonly
Returns the value of attribute bytes_buffered.
Instance Method Summary collapse
- #close ⇒ Object
-
#close_writer ⇒ Object
2.13-D — signal end-of-request.
- #closed? ⇒ Boolean
- #each ⇒ Object
- #gets ⇒ Object
-
#initialize ⇒ StreamingInput
constructor
A new instance of StreamingInput.
-
#push(bytes) ⇒ Object
2.13-D — push a DATA-frame’s bytes onto the queue.
-
#read(length = nil) ⇒ Object
Read up to ‘length` bytes (or all remaining bytes when `length` is nil).
- #rewind ⇒ Object
- #writer_closed? ⇒ Boolean
Constructor Details
#initialize ⇒ StreamingInput
Returns a new instance of StreamingInput.
287 288 289 290 291 292 293 |
# File 'lib/hyperion/http2_handler.rb', line 287 def initialize @chunks = [] @notify = ::Async::Notification.new @writer_closed = false @reader_closed = false @bytes_buffered = 0 end |
Instance Attribute Details
#bytes_buffered ⇒ Object (readonly)
Returns the value of attribute bytes_buffered.
325 326 327 |
# File 'lib/hyperion/http2_handler.rb', line 325 def bytes_buffered @bytes_buffered end |
Instance Method Details
#close ⇒ Object
413 414 415 416 |
# File 'lib/hyperion/http2_handler.rb', line 413 def close @reader_closed = true nil end |
#close_writer ⇒ Object
2.13-D — signal end-of-request. Wakes any reader fiber parked on ‘read` so it can return EOF (`nil` from `read(n)`, accumulated buffer from `read` with no arg).
316 317 318 319 |
# File 'lib/hyperion/http2_handler.rb', line 316 def close_writer @writer_closed = true @notify.signal end |
#closed? ⇒ Boolean
418 419 420 |
# File 'lib/hyperion/http2_handler.rb', line 418 def closed? @reader_closed end |
#each ⇒ Object
376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/hyperion/http2_handler.rb', line 376 def each return enum_for(:each) unless block_given? loop do wait_for_data while (chunk = @chunks.shift) @bytes_buffered -= chunk.bytesize yield chunk end break if @writer_closed && @chunks.empty? end end |
#gets ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/hyperion/http2_handler.rb', line 389 def gets # Rack 3 streaming-input contract doesn't require gets; emulate # naively for apps that call it: read until \n or EOF. out = String.new(encoding: Encoding::ASCII_8BIT) loop do ch = read(1) return out.empty? ? nil : out if ch.nil? out << ch break if ch == "\n".b end out end |
#push(bytes) ⇒ Object
2.13-D — push a DATA-frame’s bytes onto the queue. Called from ‘RequestStream#process_data`. Empty / already-closed pushes are silently dropped (the END_STREAM path uses `close_writer` instead).
298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/hyperion/http2_handler.rb', line 298 def push(bytes) return if @writer_closed return if bytes.nil? s = bytes.to_s return if s.empty? # Force ASCII-8BIT so gRPC binary payloads survive; matches the # encoding contract on `RequestStream#@request_body`. s = s.b unless s.encoding == Encoding::ASCII_8BIT @chunks << s @bytes_buffered += s.bytesize @notify.signal end |
#read(length = nil) ⇒ Object
Read up to ‘length` bytes (or all remaining bytes when `length` is nil). Blocks the fiber until at least one chunk is available OR the writer has closed. Returns `nil` on EOF when `length` is given (Rack 3 §11 IO-conformance), otherwise the empty string on EOF when `length` is nil.
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/hyperion/http2_handler.rb', line 332 def read(length = nil) if length.nil? # Drain everything until EOF. out = String.new(encoding: Encoding::ASCII_8BIT) loop do wait_for_data while (chunk = @chunks.shift) out << chunk @bytes_buffered -= chunk.bytesize end break if @writer_closed && @chunks.empty? end out else return nil if length.zero? out = String.new(encoding: Encoding::ASCII_8BIT) remaining = length while remaining.positive? wait_for_data if @chunks.empty? && @writer_closed return out.empty? ? nil : out end chunk = @chunks.first break unless chunk # writer-closed race if chunk.bytesize <= remaining out << chunk remaining -= chunk.bytesize @bytes_buffered -= chunk.bytesize @chunks.shift else out << chunk.byteslice(0, remaining) # Mutate head chunk: take the tail. @chunks[0] = chunk.byteslice(remaining, chunk.bytesize - remaining) @bytes_buffered -= remaining remaining = 0 end end out end end |
#rewind ⇒ Object
403 404 405 406 407 408 409 410 411 |
# File 'lib/hyperion/http2_handler.rb', line 403 def rewind # Streaming bodies aren't seekable; Rack 3 §11 allows read-only # streaming inputs. We return false instead of raising so apps # that defensively rewind() (Rack::Multipart::Parser, etc.) keep # working — they just won't get the data they expected on a gRPC # streaming body, which is fine because such apps aren't gRPC # services. false end |
#writer_closed? ⇒ Boolean
321 322 323 |
# File 'lib/hyperion/http2_handler.rb', line 321 def writer_closed? @writer_closed end |