Class: Hyperion::WebSocket::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/websocket/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, buffered: '', subprotocol: nil, max_message_bytes: 1_048_576, ping_interval: 30, idle_timeout: 60, extensions: {}, env: nil, route: nil, path_templater: nil) ⇒ Connection

socket — IO returned by env.call. The

connection assumes ownership; closing the
Hyperion::WebSocket::Connection closes the
underlying socket.

buffered — bytes already pulled off the socket by the

HTTP parser past the request boundary
(env['hyperion.hijack_buffered']). Prepended
to the read buffer before the first syscall.

subprotocol — the negotiated subprotocol from the handshake

(slot 2 of the [:ok, accept, sub] tuple) or nil.

max_message_bytes — cap on a single reassembled message. Default 1 MiB.

For permessage-deflate the cap is applied to the
DECOMPRESSED size — a tiny compressed payload that
inflates beyond the cap closes 1009 (compression
bomb defense, RFC 7692 §8.1).

ping_interval — seconds between proactive server pings. nil = off. idle_timeout — seconds of no traffic before we send a close.

nil = off. Defaults to 60s; set higher for
long-lived idle clients (chat presence, etc.).

extensions — Hash from ‘Handshake.validate`’s 4th slot. When

`permessage_deflate:` is present the connection
instantiates a per-conn Zlib::Deflate / Inflate
pair sized to the negotiated window bits, and
sets RSV1 on outbound text/binary frames. `{}`
(default) means no compression.

env — the Rack env at handshake time. Used by 2.9-C to

derive a low-cardinality `route` label for the
permessage-deflate ratio histogram. Resolution
order: explicit `env['hyperion.websocket.route']`
(operator-named channel) → templated PATH_INFO
via `Hyperion::Metrics.default_path_templater`.
Resolved exactly once at construction; cached as
a frozen one-element labels Array so the per-
message observation is allocation-free.

route — explicit route override, mostly for unit tests

that don't have an `env`. Wins over `env`.

path_templater — per-conn templater override (specs); falls back

to `Hyperion::Metrics.default_path_templater`.


142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/hyperion/websocket/connection.rb', line 142

def initialize(socket, buffered: '', subprotocol: nil,
               max_message_bytes: 1_048_576,
               ping_interval: 30, idle_timeout: 60,
               extensions: {}, env: nil, route: nil,
               path_templater: nil)
  @socket = socket
  @subprotocol = subprotocol
  @max_message_bytes = max_message_bytes
  @ping_interval = ping_interval
  @idle_timeout = idle_timeout

  # 2.9-C — resolve the route label exactly once, here on the
  # cold path; cache the frozen labels tuple so every subsequent
  # `observe_deflate_ratio` reuses it. Reading PATH_INFO + running
  # the templater per outbound message would add a Hash lookup, a
  # mutex acquire, and a regex chain to a path that fires once per
  # frame on chat-shape workloads.
  @route_resolutions = 0
  @deflate_ratio_labels = resolve_route_labels(env: env, route: route,
                                               path_templater: path_templater)

  configure_permessage_deflate(extensions[:permessage_deflate])

  @inbuf = String.new(capacity: READ_CHUNK_BYTES, encoding: Encoding::ASCII_8BIT)
  @inbuf << buffered.to_s.b unless buffered.nil? || buffered.empty?
  @offset = 0

  # Reassembly state. @msg_opcode is the first frame's opcode (text
  # or binary); @msg_buffer accumulates payload bytes across
  # continuation frames until FIN=1.
  @msg_opcode = nil
  @msg_buffer = nil

  @state = :open
  @close_code = nil
  @close_reason = nil

  @on_ping = nil
  @on_pong = nil
  @on_close = nil

  @last_traffic_at = monotonic_now
end

Instance Attribute Details

#close_codeObject (readonly)

Returns the value of attribute close_code.



98
99
100
# File 'lib/hyperion/websocket/connection.rb', line 98

def close_code
  @close_code
end

#close_reasonObject (readonly)

Returns the value of attribute close_reason.



98
99
100
# File 'lib/hyperion/websocket/connection.rb', line 98

def close_reason
  @close_reason
end

#max_message_bytesObject (readonly)

Returns the value of attribute max_message_bytes.



98
99
100
# File 'lib/hyperion/websocket/connection.rb', line 98

def max_message_bytes
  @max_message_bytes
end

#route_resolutionsObject (readonly)

2.9-C — count of route-label resolutions for this Connection. Bumped once at construction; per-message observation paths must never touch this. Specs use it to guard against per-msg leaks.



102
103
104
# File 'lib/hyperion/websocket/connection.rb', line 102

def route_resolutions
  @route_resolutions
end

#stateObject (readonly)

Returns the value of attribute state.



98
99
100
# File 'lib/hyperion/websocket/connection.rb', line 98

def state
  @state
end

#subprotocolObject (readonly)

Returns the value of attribute subprotocol.



98
99
100
# File 'lib/hyperion/websocket/connection.rb', line 98

def subprotocol
  @subprotocol
end

Instance Method Details

#close(code: CLOSE_NORMAL, reason: '', drain_timeout: 5) ⇒ Object

Initiate a graceful close. Sends a close frame with the given code (default 1000) and reason, then drains until either the peer’s close arrives or ‘drain_timeout` seconds pass. Closes the socket either way. Idempotent — calling close twice is a no-op on the second call.



282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/hyperion/websocket/connection.rb', line 282

def close(code: CLOSE_NORMAL, reason: '', drain_timeout: 5)
  return if @state == :closed

  if @state == :open
    send_close_frame(code, reason)
    @state = :closing
  end

  # Drain inbound until we see the peer's close (or timeout).
  drain_for_close(drain_timeout)
  mark_closed
end

#closed?Boolean

Returns:

  • (Boolean)


297
# File 'lib/hyperion/websocket/connection.rb', line 297

def closed? = @state == :closed

#closing?Boolean

Returns:

  • (Boolean)


296
# File 'lib/hyperion/websocket/connection.rb', line 296

def closing? = @state == :closing

#on_close(&block) ⇒ Object



275
# File 'lib/hyperion/websocket/connection.rb', line 275

def on_close(&block) = @on_close = block

#on_ping(&block) ⇒ Object

Hooks fired AFTER the built-in protocol behaviour. Auto-pong still happens regardless of whether on_ping is registered; close-frame echo still happens regardless of on_close. The hooks are observation points, not behaviour overrides.



273
# File 'lib/hyperion/websocket/connection.rb', line 273

def on_ping(&block) = @on_ping = block

#on_pong(&block) ⇒ Object



274
# File 'lib/hyperion/websocket/connection.rb', line 274

def on_pong(&block) = @on_pong = block

#open?Boolean

Returns:

  • (Boolean)


295
# File 'lib/hyperion/websocket/connection.rb', line 295

def open? = @state == :open

#recvObject

Block until the next complete application message arrives. Returns:

[:text, String]                — opcode 0x1, UTF-8 validated
[:binary, String]              — opcode 0x2, binary
[:close, Integer|nil, String|nil] — peer initiated close
nil                            — socket EOF before a frame

Raises StateError if called after a close has already been observed (the connection is single-shot for close-detection).

Raises:



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/hyperion/websocket/connection.rb', line 195

def recv
  raise StateError, 'connection is closed' if @state == :closed
  # If we've already observed a close frame, the next recv must
  # raise — callers that want to clean up should check the
  # previous return value.
  raise StateError, 'close already received' if @state == :closing && @close_observed_by_caller

  loop do
    frame = next_frame
    if frame.nil?
      # Socket EOF without a clean close — treat as best-effort
      # disconnect. The caller sees nil and stops looping.
      mark_closed
      return nil
    end

    # RFC 7692 §6.1: control frames MUST NOT have RSV1 set. The
    # parser already errored on this case, but defense-in-depth
    # — keeps us safe if someone hands us a custom frame source.
    if frame.rsv1 && %i[ping pong close].include?(frame.opcode)
      fail_close(CLOSE_PROTOCOL_ERROR, 'RSV1 set on control frame')
      raise StateError, 'RSV1 set on control frame'
    end

    # RFC 7692 §6: RSV1 only allowed on data frames when the
    # extension was negotiated. Without negotiation, any RSV1 is
    # a protocol error — close 1002 and bail.
    if frame.rsv1 && @inflater.nil?
      fail_close(CLOSE_PROTOCOL_ERROR, 'RSV1 set without negotiated extension')
      raise StateError, 'RSV1 set without negotiated extension'
    end

    case frame.opcode
    when :ping
      handle_ping(frame)
      next
    when :pong
      handle_pong(frame)
      next
    when :close
      return handle_close_frame(frame)
    when :text, :binary
      return nil if (msg = collect_data_frame(frame))&.then { return msg }
    when :continuation
      return nil if (msg = collect_data_frame(frame))&.then { return msg }
    end
  end
end

#send(payload, opcode: :text) ⇒ Object

Send an application message. opcode: :text (default) or :binary. Single-frame, FIN=1, server-side (unmasked). When permessage- deflate is active the payload is DEFLATE-compressed inline and the RSV1 bit is set on the frame; control frames (close/ping/ pong) are NEVER compressed per RFC 7692 §6.1, even when the extension is active.

Raises:



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/hyperion/websocket/connection.rb', line 250

def send(payload, opcode: :text)
  raise StateError, 'connection is closed' if @state == :closed
  raise StateError, "cannot send while #{@state}" if @state != :open
  unless %i[text binary].include?(opcode)
    raise ArgumentError, "send opcode must be :text or :binary (got #{opcode.inspect})"
  end

  bin = opcode == :text ? payload.to_s.encode(Encoding::UTF_8).b : payload.to_s.b
  rsv1 = false
  if @deflater
    bin = deflate_message(bin)
    rsv1 = true
  end
  wire = Hyperion::WebSocket::Builder.build(opcode: opcode, payload: bin, rsv1: rsv1)
  write_wire(wire)
  @last_traffic_at = monotonic_now
  true
end