Class: Hyperion::WebSocket::Connection
- Inherits:
-
Object
- Object
- Hyperion::WebSocket::Connection
- Defined in:
- lib/hyperion/websocket/connection.rb
Instance Attribute Summary collapse
-
#close_code ⇒ Object
readonly
Returns the value of attribute close_code.
-
#close_reason ⇒ Object
readonly
Returns the value of attribute close_reason.
-
#max_message_bytes ⇒ Object
readonly
Returns the value of attribute max_message_bytes.
-
#route_resolutions ⇒ Object
readonly
2.9-C — count of route-label resolutions for this Connection.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#subprotocol ⇒ Object
readonly
Returns the value of attribute subprotocol.
Instance Method Summary collapse
-
#close(code: CLOSE_NORMAL, reason: '', drain_timeout: 5) ⇒ Object
Initiate a graceful close.
- #closed? ⇒ Boolean
- #closing? ⇒ Boolean
-
#initialize(socket, buffered: '', subprotocol: nil, max_message_bytes: 1_048_576, ping_interval: 30, idle_timeout: 60, extensions: {}, env: nil, route: nil, path_templater: nil) ⇒ Connection
constructor
socket — IO returned by env.call.
- #on_close(&block) ⇒ Object
-
#on_ping(&block) ⇒ Object
Hooks fired AFTER the built-in protocol behaviour.
- #on_pong(&block) ⇒ Object
- #open? ⇒ Boolean
-
#recv ⇒ Object
Block until the next complete application message arrives.
-
#send(payload, opcode: :text) ⇒ Object
Send an application message.
Constructor Details
#initialize(socket, buffered: '', subprotocol: nil, max_message_bytes: 1_048_576, ping_interval: 30, idle_timeout: 60, extensions: {}, env: nil, route: nil, path_templater: nil) ⇒ Connection
socket — IO returned by env.call. The
connection assumes ownership; closing the
Hyperion::WebSocket::Connection closes the
underlying socket.
buffered — bytes already pulled off the socket by the
HTTP parser past the request boundary
(env['hyperion.hijack_buffered']). Prepended
to the read buffer before the first syscall.
subprotocol — the negotiated subprotocol from the handshake
(slot 2 of the [:ok, accept, sub] tuple) or nil.
max_message_bytes — cap on a single reassembled message. Default 1 MiB.
For permessage-deflate the cap is applied to the
DECOMPRESSED size — a tiny compressed payload that
inflates beyond the cap closes 1009 (compression
bomb defense, RFC 7692 §8.1).
ping_interval — seconds between proactive server pings. nil = off. idle_timeout — seconds of no traffic before we send a close.
nil = off. Defaults to 60s; set higher for
long-lived idle clients (chat presence, etc.).
extensions — Hash from ‘Handshake.validate`’s 4th slot. When
`permessage_deflate:` is present the connection
instantiates a per-conn Zlib::Deflate / Inflate
pair sized to the negotiated window bits, and
sets RSV1 on outbound text/binary frames. `{}`
(default) means no compression.
env — the Rack env at handshake time. Used by 2.9-C to
derive a low-cardinality `route` label for the
permessage-deflate ratio histogram. Resolution
order: explicit `env['hyperion.websocket.route']`
(operator-named channel) → templated PATH_INFO
via `Hyperion::Metrics.default_path_templater`.
Resolved exactly once at construction; cached as
a frozen one-element labels Array so the per-
message observation is allocation-free.
route — explicit route override, mostly for unit tests
that don't have an `env`. Wins over `env`.
path_templater — per-conn templater override (specs); falls back
to `Hyperion::Metrics.default_path_templater`.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/hyperion/websocket/connection.rb', line 142 def initialize(socket, buffered: '', subprotocol: nil, max_message_bytes: 1_048_576, ping_interval: 30, idle_timeout: 60, extensions: {}, env: nil, route: nil, path_templater: nil) @socket = socket @subprotocol = subprotocol @max_message_bytes = @ping_interval = ping_interval @idle_timeout = idle_timeout # 2.9-C — resolve the route label exactly once, here on the # cold path; cache the frozen labels tuple so every subsequent # `observe_deflate_ratio` reuses it. Reading PATH_INFO + running # the templater per outbound message would add a Hash lookup, a # mutex acquire, and a regex chain to a path that fires once per # frame on chat-shape workloads. @route_resolutions = 0 @deflate_ratio_labels = resolve_route_labels(env: env, route: route, path_templater: path_templater) (extensions[:permessage_deflate]) @inbuf = String.new(capacity: READ_CHUNK_BYTES, encoding: Encoding::ASCII_8BIT) @inbuf << buffered.to_s.b unless buffered.nil? || buffered.empty? @offset = 0 # Reassembly state. @msg_opcode is the first frame's opcode (text # or binary); @msg_buffer accumulates payload bytes across # continuation frames until FIN=1. @msg_opcode = nil @msg_buffer = nil @state = :open @close_code = nil @close_reason = nil @on_ping = nil @on_pong = nil @on_close = nil @last_traffic_at = monotonic_now end |
Instance Attribute Details
#close_code ⇒ Object (readonly)
Returns the value of attribute close_code.
98 99 100 |
# File 'lib/hyperion/websocket/connection.rb', line 98 def close_code @close_code end |
#close_reason ⇒ Object (readonly)
Returns the value of attribute close_reason.
98 99 100 |
# File 'lib/hyperion/websocket/connection.rb', line 98 def close_reason @close_reason end |
#max_message_bytes ⇒ Object (readonly)
Returns the value of attribute max_message_bytes.
98 99 100 |
# File 'lib/hyperion/websocket/connection.rb', line 98 def @max_message_bytes end |
#route_resolutions ⇒ Object (readonly)
2.9-C — count of route-label resolutions for this Connection. Bumped once at construction; per-message observation paths must never touch this. Specs use it to guard against per-msg leaks.
102 103 104 |
# File 'lib/hyperion/websocket/connection.rb', line 102 def route_resolutions @route_resolutions end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
98 99 100 |
# File 'lib/hyperion/websocket/connection.rb', line 98 def state @state end |
#subprotocol ⇒ Object (readonly)
Returns the value of attribute subprotocol.
98 99 100 |
# File 'lib/hyperion/websocket/connection.rb', line 98 def subprotocol @subprotocol end |
Instance Method Details
#close(code: CLOSE_NORMAL, reason: '', drain_timeout: 5) ⇒ Object
Initiate a graceful close. Sends a close frame with the given code (default 1000) and reason, then drains until either the peer’s close arrives or ‘drain_timeout` seconds pass. Closes the socket either way. Idempotent — calling close twice is a no-op on the second call.
282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/hyperion/websocket/connection.rb', line 282 def close(code: CLOSE_NORMAL, reason: '', drain_timeout: 5) return if @state == :closed if @state == :open send_close_frame(code, reason) @state = :closing end # Drain inbound until we see the peer's close (or timeout). drain_for_close(drain_timeout) mark_closed end |
#closed? ⇒ Boolean
297 |
# File 'lib/hyperion/websocket/connection.rb', line 297 def closed? = @state == :closed |
#closing? ⇒ Boolean
296 |
# File 'lib/hyperion/websocket/connection.rb', line 296 def closing? = @state == :closing |
#on_close(&block) ⇒ Object
275 |
# File 'lib/hyperion/websocket/connection.rb', line 275 def on_close(&block) = @on_close = block |
#on_ping(&block) ⇒ Object
Hooks fired AFTER the built-in protocol behaviour. Auto-pong still happens regardless of whether on_ping is registered; close-frame echo still happens regardless of on_close. The hooks are observation points, not behaviour overrides.
273 |
# File 'lib/hyperion/websocket/connection.rb', line 273 def on_ping(&block) = @on_ping = block |
#on_pong(&block) ⇒ Object
274 |
# File 'lib/hyperion/websocket/connection.rb', line 274 def on_pong(&block) = @on_pong = block |
#open? ⇒ Boolean
295 |
# File 'lib/hyperion/websocket/connection.rb', line 295 def open? = @state == :open |
#recv ⇒ Object
Block until the next complete application message arrives. Returns:
[:text, String] — opcode 0x1, UTF-8 validated
[:binary, String] — opcode 0x2, binary
[:close, Integer|nil, String|nil] — peer initiated close
nil — socket EOF before a frame
Raises StateError if called after a close has already been observed (the connection is single-shot for close-detection).
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/hyperion/websocket/connection.rb', line 195 def recv raise StateError, 'connection is closed' if @state == :closed # If we've already observed a close frame, the next recv must # raise — callers that want to clean up should check the # previous return value. raise StateError, 'close already received' if @state == :closing && @close_observed_by_caller loop do frame = next_frame if frame.nil? # Socket EOF without a clean close — treat as best-effort # disconnect. The caller sees nil and stops looping. mark_closed return nil end # RFC 7692 §6.1: control frames MUST NOT have RSV1 set. The # parser already errored on this case, but defense-in-depth # — keeps us safe if someone hands us a custom frame source. if frame.rsv1 && %i[ping pong close].include?(frame.opcode) fail_close(CLOSE_PROTOCOL_ERROR, 'RSV1 set on control frame') raise StateError, 'RSV1 set on control frame' end # RFC 7692 §6: RSV1 only allowed on data frames when the # extension was negotiated. Without negotiation, any RSV1 is # a protocol error — close 1002 and bail. if frame.rsv1 && @inflater.nil? fail_close(CLOSE_PROTOCOL_ERROR, 'RSV1 set without negotiated extension') raise StateError, 'RSV1 set without negotiated extension' end case frame.opcode when :ping handle_ping(frame) next when :pong handle_pong(frame) next when :close return handle_close_frame(frame) when :text, :binary return nil if (msg = collect_data_frame(frame))&.then { return msg } when :continuation return nil if (msg = collect_data_frame(frame))&.then { return msg } end end end |
#send(payload, opcode: :text) ⇒ Object
Send an application message. opcode: :text (default) or :binary. Single-frame, FIN=1, server-side (unmasked). When permessage- deflate is active the payload is DEFLATE-compressed inline and the RSV1 bit is set on the frame; control frames (close/ping/ pong) are NEVER compressed per RFC 7692 §6.1, even when the extension is active.
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/hyperion/websocket/connection.rb', line 250 def send(payload, opcode: :text) raise StateError, 'connection is closed' if @state == :closed raise StateError, "cannot send while #{@state}" if @state != :open unless %i[text binary].include?(opcode) raise ArgumentError, "send opcode must be :text or :binary (got #{opcode.inspect})" end bin = opcode == :text ? payload.to_s.encode(Encoding::UTF_8).b : payload.to_s.b rsv1 = false if @deflater bin = (bin) rsv1 = true end wire = Hyperion::WebSocket::Builder.build(opcode: opcode, payload: bin, rsv1: rsv1) write_wire(wire) @last_traffic_at = monotonic_now true end |