Class: Hyperion::Http2Handler::StreamingInput

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/http2_handler.rb

Overview

Per-stream subclass that captures decoded request pseudo-headers, regular headers, and any DATA frame body bytes for later dispatch. Also exposes a ‘window_available` notification fan-out so the response-writer fiber can sleep until WINDOW_UPDATE arrives. 2.13-D — IO-shaped streaming request body backing `rack.input` for gRPC client-streaming and bidirectional RPCs. Push side is the `RequestStream#process_data` callback (one push per inbound DATA frame); read side is the Rack app’s ‘env.read` calls.

Reads block the calling fiber via ‘Async::Notification` until either bytes arrive or the writer side calls `close_writer` (END_STREAM on the request).

Contract notes:

* `read(n)` returns up to n bytes, or fewer at EOF (Rack 3 §11
  IO-conformance: `read(length)` returns nil on EOF).
* `read` (no argument) reads until EOF and returns the rest.
* `each` yields chunk-by-chunk (one yield per pushed chunk).
* `rewind` is a no-op — streaming bodies aren't seekable.
* `close` is a no-op on the read side; the writer drives
  end-of-stream via `close_writer`.

The class is intentionally narrow — it implements only the methods gRPC handlers and Rack apps actually call against ‘rack.input`. A real Rack app that does `rack.input.size` or `rack.input.gets` will raise (Rack 3 §11 doesn’t require those for streaming inputs).

Constant Summary collapse

EMPTY_BIN =
String.new('', encoding: Encoding::ASCII_8BIT).freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStreamingInput

Returns a new instance of StreamingInput.



287
288
289
290
291
292
293
# File 'lib/hyperion/http2_handler.rb', line 287

def initialize
  @chunks = []
  @notify = ::Async::Notification.new
  @writer_closed = false
  @reader_closed = false
  @bytes_buffered = 0
end

Instance Attribute Details

#bytes_bufferedObject (readonly)

Returns the value of attribute bytes_buffered.



325
326
327
# File 'lib/hyperion/http2_handler.rb', line 325

def bytes_buffered
  @bytes_buffered
end

Instance Method Details

#closeObject



413
414
415
416
# File 'lib/hyperion/http2_handler.rb', line 413

def close
  @reader_closed = true
  nil
end

#close_writerObject

2.13-D — signal end-of-request. Wakes any reader fiber parked on ‘read` so it can return EOF (`nil` from `read(n)`, accumulated buffer from `read` with no arg).



316
317
318
319
# File 'lib/hyperion/http2_handler.rb', line 316

def close_writer
  @writer_closed = true
  @notify.signal
end

#closed?Boolean

Returns:

  • (Boolean)


418
419
420
# File 'lib/hyperion/http2_handler.rb', line 418

def closed?
  @reader_closed
end

#eachObject



376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/hyperion/http2_handler.rb', line 376

def each
  return enum_for(:each) unless block_given?

  loop do
    wait_for_data
    while (chunk = @chunks.shift)
      @bytes_buffered -= chunk.bytesize
      yield chunk
    end
    break if @writer_closed && @chunks.empty?
  end
end

#getsObject



389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/hyperion/http2_handler.rb', line 389

def gets
  # Rack 3 streaming-input contract doesn't require gets; emulate
  # naively for apps that call it: read until \n or EOF.
  out = String.new(encoding: Encoding::ASCII_8BIT)
  loop do
    ch = read(1)
    return out.empty? ? nil : out if ch.nil?

    out << ch
    break if ch == "\n".b
  end
  out
end

#push(bytes) ⇒ Object

2.13-D — push a DATA-frame’s bytes onto the queue. Called from ‘RequestStream#process_data`. Empty / already-closed pushes are silently dropped (the END_STREAM path uses `close_writer` instead).



298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/hyperion/http2_handler.rb', line 298

def push(bytes)
  return if @writer_closed
  return if bytes.nil?

  s = bytes.to_s
  return if s.empty?

  # Force ASCII-8BIT so gRPC binary payloads survive; matches the
  # encoding contract on `RequestStream#@request_body`.
  s = s.b unless s.encoding == Encoding::ASCII_8BIT
  @chunks << s
  @bytes_buffered += s.bytesize
  @notify.signal
end

#read(length = nil) ⇒ Object

Read up to ‘length` bytes (or all remaining bytes when `length` is nil). Blocks the fiber until at least one chunk is available OR the writer has closed. Returns `nil` on EOF when `length` is given (Rack 3 §11 IO-conformance), otherwise the empty string on EOF when `length` is nil.



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/hyperion/http2_handler.rb', line 332

def read(length = nil)
  if length.nil?
    # Drain everything until EOF.
    out = String.new(encoding: Encoding::ASCII_8BIT)
    loop do
      wait_for_data
      while (chunk = @chunks.shift)
        out << chunk
        @bytes_buffered -= chunk.bytesize
      end
      break if @writer_closed && @chunks.empty?
    end
    out
  else
    return nil if length.zero?

    out = String.new(encoding: Encoding::ASCII_8BIT)
    remaining = length
    while remaining.positive?
      wait_for_data
      if @chunks.empty? && @writer_closed
        return out.empty? ? nil : out
      end

      chunk = @chunks.first
      break unless chunk # writer-closed race

      if chunk.bytesize <= remaining
        out << chunk
        remaining -= chunk.bytesize
        @bytes_buffered -= chunk.bytesize
        @chunks.shift
      else
        out << chunk.byteslice(0, remaining)
        # Mutate head chunk: take the tail.
        @chunks[0] = chunk.byteslice(remaining, chunk.bytesize - remaining)
        @bytes_buffered -= remaining
        remaining = 0
      end
    end
    out
  end
end

#rewindObject



403
404
405
406
407
408
409
410
411
# File 'lib/hyperion/http2_handler.rb', line 403

def rewind
  # Streaming bodies aren't seekable; Rack 3 §11 allows read-only
  # streaming inputs. We return false instead of raising so apps
  # that defensively rewind() (Rack::Multipart::Parser, etc.) keep
  # working — they just won't get the data they expected on a gRPC
  # streaming body, which is fine because such apps aren't gRPC
  # services.
  false
end

#writer_closed?Boolean

Returns:

  • (Boolean)


321
322
323
# File 'lib/hyperion/http2_handler.rb', line 321

def writer_closed?
  @writer_closed
end