Class: Raptor::Http2::FlowControl
- Inherits:
-
Object
- Object
- Raptor::Http2::FlowControl
- Defined in:
- lib/raptor/http2.rb
Overview
Per-connection outbound flow-control accounting.
Tracks the peer’s connection-level and per-stream receive windows so outbound DATA frames respect RFC 7540 §5.2. Threads dispatching stream responses call ‘acquire` to reserve send capacity; threads applying inbound WINDOW_UPDATE or SETTINGS frames call the mutating methods to replenish it. State is held in a single `Atom` so updates use CAS.
Constant Summary collapse
- ACQUIRE_POLL_INTERVAL =
0.001
Instance Method Summary collapse
-
#acquire(stream_id, max_bytes, end_stream: false) ⇒ Integer
Reserves outbound capacity on the given stream, polling until at least one byte is available on both the connection and stream windows.
-
#add_connection_window(increment) ⇒ void
Increments the connection-level send window.
-
#add_stream_window(stream_id, increment) ⇒ void
Increments the per-stream send window.
-
#discard_stream(stream_id) ⇒ void
Discards any per-stream tracking for the given stream.
-
#initialize ⇒ FlowControl
constructor
Creates a new FlowControl with the spec-default windows.
-
#set_initial_stream_window(new_size) ⇒ void
Updates the peer’s ‘SETTINGS_INITIAL_WINDOW_SIZE`.
Constructor Details
#initialize ⇒ FlowControl
Creates a new FlowControl with the spec-default windows.
96 97 98 99 100 |
# File 'lib/raptor/http2.rb', line 96 def initialize @connection_window = Atom.new(DEFAULT_WINDOW_SIZE) @stream_windows = Atom.new({}) @initial_stream_window = Atom.new(DEFAULT_WINDOW_SIZE) end |
Instance Method Details
#acquire(stream_id, max_bytes, end_stream: false) ⇒ Integer
Reserves outbound capacity on the given stream, polling until at least one byte is available on both the connection and stream windows. The returned size is capped at ‘MAX_FRAME_SIZE`.
When ‘end_stream` is true, `max_bytes` fits within the peer’s initial stream window, and no per-stream override has been recorded, only the connection window is consulted. The stream closes on this frame, so its remaining send window will not be consulted again and need not be tracked.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/raptor/http2.rb', line 118 def acquire(stream_id, max_bytes, end_stream: false) initial = @initial_stream_window.value capped = max_bytes < MAX_FRAME_SIZE ? max_bytes : MAX_FRAME_SIZE if end_stream && capped <= initial && !@stream_windows.value.key?(stream_id) loop do granted = 0 @connection_window.swap do |window| granted = window > capped ? capped : window granted > 0 ? window - granted : window end return granted if granted > 0 sleep ACQUIRE_POLL_INTERVAL end end loop do stream_window = @stream_windows.value[stream_id] || initial capped_full = capped < stream_window ? capped : stream_window granted = 0 if capped_full > 0 @connection_window.swap do |window| granted = window > capped_full ? capped_full : window granted > 0 ? window - granted : window end end if granted > 0 @stream_windows.swap do |s| current = s[stream_id] || initial s.merge(stream_id => current - granted) end return granted end sleep ACQUIRE_POLL_INTERVAL end end |
#add_connection_window(increment) ⇒ void
This method returns an undefined value.
Increments the connection-level send window. Called when the peer sends a WINDOW_UPDATE on stream 0.
166 167 168 |
# File 'lib/raptor/http2.rb', line 166 def add_connection_window(increment) @connection_window.swap { |window| window + increment } end |
#add_stream_window(stream_id, increment) ⇒ void
This method returns an undefined value.
Increments the per-stream send window. Called when the peer sends a WINDOW_UPDATE on a specific stream.
178 179 180 181 182 183 184 |
# File 'lib/raptor/http2.rb', line 178 def add_stream_window(stream_id, increment) initial = @initial_stream_window.value @stream_windows.swap do |s| current = s[stream_id] || initial s.merge(stream_id => current + increment) end end |
#discard_stream(stream_id) ⇒ void
This method returns an undefined value.
Discards any per-stream tracking for the given stream. Called after a stream closes so ‘@stream_windows` does not grow without bound across the lifetime of a connection.
212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/raptor/http2.rb', line 212 def discard_stream(stream_id) return unless @stream_windows.value.key?(stream_id) @stream_windows.swap do |s| next s unless s.key?(stream_id) new = s.dup new.delete(stream_id) new end end |
#set_initial_stream_window(new_size) ⇒ void
This method returns an undefined value.
Updates the peer’s ‘SETTINGS_INITIAL_WINDOW_SIZE`. Shifts every existing stream window by the delta as required by RFC 7540 §6.9.2.
193 194 195 196 197 198 199 200 201 202 |
# File 'lib/raptor/http2.rb', line 193 def set_initial_stream_window(new_size) old = @initial_stream_window.value @initial_stream_window.swap { new_size } delta = new_size - old return if delta.zero? @stream_windows.swap do |s| s.transform_values { |size| size + delta } end end |