Class: Quicsilver::Protocol::StreamInput

Inherits:
Protocol::HTTP::Body::Writable
  • Object
show all
Defined in:
lib/quicsilver/protocol/stream_input.rb

Overview

A streaming request body backed by Protocol::HTTP::Body::Writable.

QUIC RECEIVE events push chunks via #write, while the application reads them via #read. This enables concurrent streaming — the app can start processing before the full body arrives.

Follows the protocol-http Body::Writable contract:

  • write(chunk) — called by the QUIC transport on RECEIVE events

  • close_write — called on RECEIVE_FIN to signal end of body

  • read — called by the application (blocks until data available)

  • close — called to abort (e.g., stream reset)

Optional features:

  • Back-pressure via Thread::SizedQueue (bounded buffer)

  • Read timeout for slow client protection

Defined Under Namespace

Classes: ReadTimeout

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(length = nil, queue_size: nil, read_timeout: nil) ⇒ StreamInput

Returns a new instance of StreamInput.

Parameters:

  • length (Integer, nil) (defaults to: nil)

    The content-length if known from headers.

  • queue_size (Integer, nil) (defaults to: nil)

    Maximum buffered chunks for back-pressure. nil (default) = unbounded. When bounded, write blocks if queue is full, which naturally maps to QUIC flow control.

  • read_timeout (Numeric, nil) (defaults to: nil)

    Seconds to wait for data before raising ReadTimeout. nil (default) = wait forever.



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/quicsilver/protocol/stream_input.rb', line 32

def initialize(length = nil, queue_size: nil, read_timeout: nil)
  queue = if queue_size
    Thread::SizedQueue.new(queue_size)
  else
    Thread::Queue.new
  end

  super(length, queue: queue)
  @read_timeout = read_timeout
  @bytes_written = 0
end

Instance Attribute Details

#Read timeout in seconds.(timeout) ⇒ Object (readonly)



45
# File 'lib/quicsilver/protocol/stream_input.rb', line 45

attr_reader :read_timeout

#read_timeoutObject (readonly)

Returns the value of attribute read_timeout.



45
46
47
# File 'lib/quicsilver/protocol/stream_input.rb', line 45

def read_timeout
  @read_timeout
end

Instance Method Details

#close_write(error = nil) ⇒ Object

Signal that no more data will be written. Validates content-length if declared (RFC 9114 §4.1.2) — raises MessageError if total bytes written don’t match.



56
57
58
59
60
61
# File 'lib/quicsilver/protocol/stream_input.rb', line 56

def close_write(error = nil)
  if @length && @bytes_written != @length
    raise Protocol::MessageError, "Content-length mismatch: header=#{@length}, body=#{@bytes_written}"
  end
  super
end

#readObject

Read the next available chunk, with optional timeout.



68
69
70
71
72
73
74
# File 'lib/quicsilver/protocol/stream_input.rb', line 68

def read
  if @read_timeout
    read_with_timeout
  else
    super
  end
end

#write(chunk) ⇒ Object

Track bytes written for content-length validation.



48
49
50
51
# File 'lib/quicsilver/protocol/stream_input.rb', line 48

def write(chunk)
  @bytes_written += chunk.bytesize
  super
end