Class: Raptor::Http2::FlowControl

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/http2.rb

Overview

Per-connection outbound flow-control accounting.

Tracks the peer’s connection-level and per-stream receive windows so outbound DATA frames respect RFC 7540 §5.2. Threads dispatching stream responses call ‘acquire` to reserve send capacity; threads applying inbound WINDOW_UPDATE or SETTINGS frames call the mutating methods to replenish it. State is held in a single `Atom` so updates use CAS.

Constant Summary collapse

ACQUIRE_POLL_INTERVAL =
0.001

Instance Method Summary collapse

Constructor Details

#initializeFlowControl

Creates a new FlowControl with the spec-default windows.



96
97
98
99
100
# File 'lib/raptor/http2.rb', line 96

def initialize
  @connection_window = Atom.new(DEFAULT_WINDOW_SIZE)
  @stream_windows = Atom.new({})
  @initial_stream_window = Atom.new(DEFAULT_WINDOW_SIZE)
end

Instance Method Details

#acquire(stream_id, max_bytes, end_stream: false) ⇒ Integer

Reserves outbound capacity on the given stream, polling until at least one byte is available on both the connection and stream windows. The returned size is capped at ‘MAX_FRAME_SIZE`.

When ‘end_stream` is true, `max_bytes` fits within the peer’s initial stream window, and no per-stream override has been recorded, only the connection window is consulted. The stream closes on this frame, so its remaining send window will not be consulted again and need not be tracked.

Parameters:

  • stream_id (Integer)

    the HTTP/2 stream identifier

  • max_bytes (Integer)

    the largest size the caller would like to send

  • end_stream (Boolean) (defaults to: false)

    true when this is the final frame on the stream

Returns:

  • (Integer)

    the number of bytes the caller may now send



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/raptor/http2.rb', line 118

def acquire(stream_id, max_bytes, end_stream: false)
  initial = @initial_stream_window.value
  capped = max_bytes < MAX_FRAME_SIZE ? max_bytes : MAX_FRAME_SIZE

  if end_stream && capped <= initial && !@stream_windows.value.key?(stream_id)
    loop do
      granted = 0
      @connection_window.swap do |window|
        granted = window > capped ? capped : window
        granted > 0 ? window - granted : window
      end
      return granted if granted > 0

      sleep ACQUIRE_POLL_INTERVAL
    end
  end

  loop do
    stream_window = @stream_windows.value[stream_id] || initial
    capped_full = capped < stream_window ? capped : stream_window

    granted = 0
    if capped_full > 0
      @connection_window.swap do |window|
        granted = window > capped_full ? capped_full : window
        granted > 0 ? window - granted : window
      end
    end

    if granted > 0
      @stream_windows.swap do |s|
        current = s[stream_id] || initial
        s.merge(stream_id => current - granted)
      end
      return granted
    end

    sleep ACQUIRE_POLL_INTERVAL
  end
end

#add_connection_window(increment) ⇒ void

This method returns an undefined value.

Increments the connection-level send window. Called when the peer sends a WINDOW_UPDATE on stream 0.

Parameters:

  • increment (Integer)

    the byte count to add



166
167
168
# File 'lib/raptor/http2.rb', line 166

def add_connection_window(increment)
  @connection_window.swap { |window| window + increment }
end

#add_stream_window(stream_id, increment) ⇒ void

This method returns an undefined value.

Increments the per-stream send window. Called when the peer sends a WINDOW_UPDATE on a specific stream.

Parameters:

  • stream_id (Integer)

    the HTTP/2 stream identifier

  • increment (Integer)

    the byte count to add



178
179
180
181
182
183
184
# File 'lib/raptor/http2.rb', line 178

def add_stream_window(stream_id, increment)
  initial = @initial_stream_window.value
  @stream_windows.swap do |s|
    current = s[stream_id] || initial
    s.merge(stream_id => current + increment)
  end
end

#discard_stream(stream_id) ⇒ void

This method returns an undefined value.

Discards any per-stream tracking for the given stream. Called after a stream closes so ‘@stream_windows` does not grow without bound across the lifetime of a connection.

Parameters:

  • stream_id (Integer)

    the HTTP/2 stream identifier



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/raptor/http2.rb', line 212

def discard_stream(stream_id)
  return unless @stream_windows.value.key?(stream_id)

  @stream_windows.swap do |s|
    next s unless s.key?(stream_id)

    new = s.dup
    new.delete(stream_id)
    new
  end
end

#set_initial_stream_window(new_size) ⇒ void

This method returns an undefined value.

Updates the peer’s ‘SETTINGS_INITIAL_WINDOW_SIZE`. Shifts every existing stream window by the delta as required by RFC 7540 §6.9.2.

Parameters:

  • new_size (Integer)

    the peer’s new initial window size



193
194
195
196
197
198
199
200
201
202
# File 'lib/raptor/http2.rb', line 193

def set_initial_stream_window(new_size)
  old = @initial_stream_window.value
  @initial_stream_window.swap { new_size }
  delta = new_size - old
  return if delta.zero?

  @stream_windows.swap do |s|
    s.transform_values { |size| size + delta }
  end
end