Class: Raptor::Http2::FlowControl
- Inherits:
-
Object
- Object
- Raptor::Http2::FlowControl
- Defined in:
- lib/raptor/http2.rb
Overview
Per-connection outbound flow-control accounting.
Tracks the peer’s connection-level and per-stream receive windows so outbound DATA frames respect RFC 7540 §5.2. Threads dispatching stream responses call ‘acquire` to reserve send capacity; threads applying inbound WINDOW_UPDATE or SETTINGS frames call the mutating methods to replenish it. The connection window and per-stream windows live in separate `Atom`s so the common fast path skips per-stream tracking.
Constant Summary collapse
- ACQUIRE_POLL_INTERVAL =
0.001
Instance Method Summary collapse
-
#acquire(stream_id, max_bytes, end_stream: false) ⇒ Integer
Reserves outbound capacity on the given stream, polling until at least one byte is available on both the connection and stream windows.
-
#add_connection_window(increment) ⇒ void
Increments the connection-level send window.
-
#add_stream_window(stream_id, increment) ⇒ void
Increments the per-stream send window.
-
#discard_stream(stream_id) ⇒ void
Discards any per-stream tracking for the given stream.
-
#initialize ⇒ FlowControl
constructor
Creates a new FlowControl with the spec-default windows.
-
#set_initial_stream_window(new_size) ⇒ void
Updates the peer’s ‘SETTINGS_INITIAL_WINDOW_SIZE`.
Constructor Details
#initialize ⇒ FlowControl
Creates a new FlowControl with the spec-default windows.
97 98 99 100 101 |
# File 'lib/raptor/http2.rb', line 97 def initialize @connection_window = Atom.new(DEFAULT_WINDOW_SIZE) @stream_windows = Atom.new({}) @initial_stream_window = Atom.new(DEFAULT_WINDOW_SIZE) end |
Instance Method Details
#acquire(stream_id, max_bytes, end_stream: false) ⇒ Integer
Reserves outbound capacity on the given stream, polling until at least one byte is available on both the connection and stream windows. The returned size is capped at ‘MAX_FRAME_SIZE`.
When ‘end_stream` is true, `max_bytes` fits within the peer’s initial stream window, and no per-stream override has been recorded, only the connection window is consulted. The stream closes on this frame, so its remaining send window will not be consulted again and need not be tracked.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/raptor/http2.rb', line 119 def acquire(stream_id, max_bytes, end_stream: false) initial = @initial_stream_window.value capped = max_bytes < MAX_FRAME_SIZE ? max_bytes : MAX_FRAME_SIZE if end_stream && capped <= initial && !@stream_windows.value.key?(stream_id) loop do granted = 0 @connection_window.swap do |window| granted = window > capped ? capped : window granted > 0 ? window - granted : window end return granted if granted > 0 sleep ACQUIRE_POLL_INTERVAL end end loop do stream_window = @stream_windows.value[stream_id] || initial capped_full = capped < stream_window ? capped : stream_window granted = 0 if capped_full > 0 @connection_window.swap do |window| granted = window > capped_full ? capped_full : window granted > 0 ? window - granted : window end end if granted > 0 @stream_windows.swap do |s| current = s[stream_id] || initial s.merge(stream_id => current - granted) end return granted end sleep ACQUIRE_POLL_INTERVAL end end |
#add_connection_window(increment) ⇒ void
This method returns an undefined value.
Increments the connection-level send window. Called when the peer sends a WINDOW_UPDATE on stream 0.
167 168 169 |
# File 'lib/raptor/http2.rb', line 167 def add_connection_window(increment) @connection_window.swap { |window| window + increment } end |
#add_stream_window(stream_id, increment) ⇒ void
This method returns an undefined value.
Increments the per-stream send window. Called when the peer sends a WINDOW_UPDATE on a specific stream.
179 180 181 182 183 184 185 |
# File 'lib/raptor/http2.rb', line 179 def add_stream_window(stream_id, increment) initial = @initial_stream_window.value @stream_windows.swap do |s| current = s[stream_id] || initial s.merge(stream_id => current + increment) end end |
#discard_stream(stream_id) ⇒ void
This method returns an undefined value.
Discards any per-stream tracking for the given stream. Called after a stream closes so ‘@stream_windows` does not grow without bound across the lifetime of a connection.
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/raptor/http2.rb', line 213 def discard_stream(stream_id) return unless @stream_windows.value.key?(stream_id) @stream_windows.swap do |s| next s unless s.key?(stream_id) new = s.dup new.delete(stream_id) new end end |
#set_initial_stream_window(new_size) ⇒ void
This method returns an undefined value.
Updates the peer’s ‘SETTINGS_INITIAL_WINDOW_SIZE`. Shifts every existing stream window by the delta as required by RFC 7540 §6.9.2.
194 195 196 197 198 199 200 201 202 203 |
# File 'lib/raptor/http2.rb', line 194 def set_initial_stream_window(new_size) old = @initial_stream_window.value @initial_stream_window.swap { new_size } delta = new_size - old return if delta.zero? @stream_windows.swap do |s| s.transform_values { |size| size + delta } end end |