Class: Tina4::WebSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/websocket.rb

Constant Summary collapse

GUID =
WEBSOCKET_GUID

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWebSocket

Returns a new instance of WebSocket.



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/tina4/websocket.rb', line 140

def initialize
  @connections = {}
  @handlers = {
    open: [],
    message: [],
    close: [],
    error: []
  }
  @rooms = {}  # room_name => Set of conn_ids

  # ── Backplane (multi-instance scaling) ──────────────────────
  # Lazily wired on first broadcast (see ensure_backplane). Each instance
  # owns a stable id so it can ignore its own echoes coming back over the
  # shared pub/sub channel (the origin guard). The backplane listener runs
  # in its own Ruby thread, so the connections structure is guarded by a
  # mutex shared with the broadcast path.
  @backplane = nil
  @backplane_started = false
  @instance_id = SecureRandom.hex(8)
  @backplane_channel = Tina4::WEBSOCKET_BACKPLANE_CHANNEL
  @conn_mutex = Mutex.new
end

Instance Attribute Details

#backplane_channelObject (readonly)

Stable per-process id used as the backplane envelope “src” so an instance drops its own echoes. Exposed for tests / introspection.



165
166
167
# File 'lib/tina4/websocket.rb', line 165

def backplane_channel
  @backplane_channel
end

#connectionsObject (readonly)

Returns the value of attribute connections.



138
139
140
# File 'lib/tina4/websocket.rb', line 138

def connections
  @connections
end

#instance_idObject (readonly)

Stable per-process id used as the backplane envelope “src” so an instance drops its own echoes. Exposed for tests / introspection.



165
166
167
# File 'lib/tina4/websocket.rb', line 165

def instance_id
  @instance_id
end

Class Method Details

.route(path, secure: false, &block) ⇒ Object

Register a WebSocket handler for a path (class method, matching Python’s WebSocketServer.route). The block receives a WebSocketConnection and should call conn.on_message / conn.on_close to wire up event callbacks.

Registers on the Router so routes work in integrated (Rack) mode.

Tina4::WebSocket.route("/chat") do |conn|
  conn.on_message { |data| conn.send(data) }
  conn.on_close   { puts "bye" }
end

PUBLIC by default (mirrors GET). Pass secure: true to require a valid JWT on the upgrade (or chain .secure on the returned route).



529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# File 'lib/tina4/websocket.rb', line 529

def self.route(path, secure: false, &block)
  @route_handlers ||= {}
  @route_handlers[path] = block

  # Adapt to Router's (conn, event, data) style
  adapter = proc do |conn, event, data|
    case event
    when :open
      block.call(conn)
    when :message
      conn.on_message_handler&.call(data)
    when :close
      conn.on_close_handler&.call
    end
  end

  Tina4::Router.websocket(path, secure: secure, &adapter)
end

Instance Method Details

#_join_room(conn_id, room_name) ⇒ Object

Internal: add connection ID to a room (called by WebSocketConnection#join_room). Mirrors Python’s WebSocketManager._join_room — not part of the public API.



243
244
245
246
# File 'lib/tina4/websocket.rb', line 243

def _join_room(conn_id, room_name)
  @rooms[room_name] ||= Set.new
  @rooms[room_name].add(conn_id)
end

#_leave_room(conn_id, room_name) ⇒ Object

Internal: remove connection ID from a room (called by WebSocketConnection#leave_room). Mirrors Python’s WebSocketManager._leave_room — not part of the public API.



250
251
252
# File 'lib/tina4/websocket.rb', line 250

def _leave_room(conn_id, room_name)
  @rooms[room_name]&.delete(conn_id)
end

#binary_payload?(message) ⇒ Boolean

A message is “binary” when it is an ASCII-8BIT (BINARY-encoded) string. Ruby has no separate bytes type, so a caller’s choice of the BINARY encoding is the signal to treat the payload as bytes: it goes through base64 so it survives the JSON envelope AND arrives with its binary encoding intact on the relaying instance (a JSON “text” value would come back as UTF-8, silently re-encoding binary data).

Returns:

  • (Boolean)


512
513
514
# File 'lib/tina4/websocket.rb', line 512

def binary_payload?(message)
  message.is_a?(String) && message.encoding == Encoding::ASCII_8BIT
end

#broadcast(message, exclude: nil, path: nil) ⇒ Object



206
207
208
209
210
211
212
213
214
215
# File 'lib/tina4/websocket.rb', line 206

def broadcast(message, exclude: nil, path: nil)
  ensure_backplane
  targets = @conn_mutex.synchronize do
    @connections.select do |id, conn|
      !(exclude && id == exclude) && !(path && conn.path != path)
    end.values
  end
  deliver_resilient(targets, message)
  publish_envelope(path ? "path" : "all", message, path: path, exclude: exclude)
end

#broadcast_all(message, exclude: nil) ⇒ Object

Send to ALL connections (no path filter). Resilient + backplane-fanned.



218
219
220
221
222
223
224
225
# File 'lib/tina4/websocket.rb', line 218

def broadcast_all(message, exclude: nil)
  ensure_backplane
  targets = @conn_mutex.synchronize do
    @connections.reject { |id, _| exclude && id == exclude }.values
  end
  deliver_resilient(targets, message)
  publish_envelope("all", message, exclude: exclude)
end

#broadcast_to_room(room_name, message, exclude: nil) ⇒ Object



271
272
273
274
275
276
# File 'lib/tina4/websocket.rb', line 271

def broadcast_to_room(room_name, message, exclude: nil)
  ensure_backplane
  targets = get_room_connections(room_name).reject { |conn| exclude && conn.id == exclude }
  deliver_resilient(targets, message)
  publish_envelope("room", message, room: room_name, exclude: exclude)
end

#close(conn_id, code: 1000, reason: "") ⇒ Object



234
235
236
237
# File 'lib/tina4/websocket.rb', line 234

def close(conn_id, code: 1000, reason: "")
  conn = @connections[conn_id]
  conn&.close(code: code, reason: reason)
end

#decode_envelope_message(env) ⇒ Object

Reconstruct the original str/bytes message from an envelope. JSON can’t carry bytes, so text → … and bytes → base64(…).



499
500
501
502
503
504
# File 'lib/tina4/websocket.rb', line 499

def decode_envelope_message(env)
  return env["text"] if env.key?("text")
  return Base64.strict_decode64(env["b64"]).b if env.key?("b64")

  nil
end

#deliver_resilient(targets, message) ⇒ Object

Deliver message to every connection in targets, pruning any that fail.



317
318
319
320
# File 'lib/tina4/websocket.rb', line 317

def deliver_resilient(targets, message)
  dead = targets.reject { |conn| safe_send(conn, message) }
  dead.each { |conn| prune(conn) }
end

#emit(event, *args) ⇒ Object



637
638
639
# File 'lib/tina4/websocket.rb', line 637

def emit(event, *args)
  @handlers[event]&.each { |h| h.call(*args) }
end

#ensure_backplaneObject

Lazily wire the configured backplane. Idempotent and best-effort — a failure logs and leaves the manager local-only; it must NEVER crash a broadcast.



411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# File 'lib/tina4/websocket.rb', line 411

def ensure_backplane
  return if @backplane_started

  # Set immediately so we only ever attempt the wiring once, even on failure
  # (no retry storm on every broadcast).
  @backplane_started = true
  begin
    backplane = Tina4::WebSocketBackplane.create_backplane
    return if backplane.nil? # No backplane configured — stay local-only.

    @backplane = backplane
    @backplane.subscribe(@backplane_channel) { |raw| on_backplane_message(raw) }
    Tina4::Log.info("WebSocket backplane active (instance #{@instance_id}, channel '#{@backplane_channel}')") if defined?(Tina4::Log)
  rescue StandardError => e
    @backplane = nil
    Tina4::Log.error("WebSocket backplane wiring failed, continuing local-only: #{e.message}") if defined?(Tina4::Log)
  end
end

#get_client_rooms(client_id) ⇒ Object

Return list of room names a given client/connection id belongs to. Matches PHP Tina4WebSocket::getClientRooms($clientId).



260
261
262
263
264
# File 'lib/tina4/websocket.rb', line 260

def get_client_rooms(client_id)
  @rooms.each_with_object([]) do |(name, members), acc|
    acc << name if members.include?(client_id)
  end
end

#get_clientsObject



176
177
178
# File 'lib/tina4/websocket.rb', line 176

def get_clients
  @connections
end

#get_room_connections(room_name) ⇒ Object



266
267
268
269
# File 'lib/tina4/websocket.rb', line 266

def get_room_connections(room_name)
  ids = @rooms[room_name] || Set.new
  ids.filter_map { |id| @connections[id] }
end

#handle_upgrade(env, socket, manager: self, auth_required: false) ⇒ Object

Upgrade a raw socket to a WebSocket connection and run its frame loop.

manager is the engine that should OWN the connection (its @connections / rooms / backplane / idle reaper). It defaults to self. In integrated (Rack) mode the rack_app passes a process-wide shared engine here so that broadcasts, rooms and the backplane span every route’s connections even though each upgrade keeps its own isolated event handlers on self.



555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
# File 'lib/tina4/websocket.rb', line 555

def handle_upgrade(env, socket, manager: self, auth_required: false)
  key = env["HTTP_SEC_WEBSOCKET_KEY"]
  return unless key

  # Origin allow-list (opt-in via TINA4_WS_ALLOWED_ORIGINS). Unset = allow
  # all, so this never breaks an existing deployment. When set, an upgrade
  # from a non-listed Origin is refused with a 403 before the handshake.
  unless Tina4.websocket_origin_allowed?(env)
    socket.write("HTTP/1.1 403 Forbidden\r\n\r\n") rescue nil
    socket.close rescue nil
    return
  end

  # Per-route auth — checked AFTER the origin allow-list and BEFORE we accept
  # the handshake. A PUBLIC route (the default, mirrors GET) always passes; a
  # secured route (auth_required) needs a valid JWT via the Authorization
  # header, the "bearer" subprotocol, or ?token=. Missing/invalid → reject
  # the upgrade with a 401 (close code 1008 equivalent) and never accept.
  payload, ok = Tina4.ws_authorized(auth_required, env, env["QUERY_STRING"].to_s)
  unless ok
    socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n") rescue nil
    socket.close rescue nil
    return
  end

  accept = Tina4.compute_accept_key(key)

  # When the client offered the "bearer" subprotocol (the browser transport,
  # since new WebSocket() can't set headers), echo "bearer" back as the
  # accepted subprotocol — browsers reject a 101 that doesn't echo a
  # subprotocol they offered. Mirrors Python's accept-subprotocol behaviour.
  subproto_header = Tina4.ws_bearer_subprotocol_offered?(env) ? "Sec-WebSocket-Protocol: bearer\r\n" : ""

  response = "HTTP/1.1 101 Switching Protocols\r\n" \
             "Upgrade: websocket\r\n" \
             "Connection: Upgrade\r\n" \
             "#{subproto_header}" \
             "Sec-WebSocket-Accept: #{accept}\r\n\r\n"

  socket.write(response)

  conn_id = SecureRandom.hex(16)
  ws_path = env["REQUEST_PATH"] || env["PATH_INFO"] || "/"
  connection = WebSocketConnection.new(conn_id, socket, ws_server: manager, path: ws_path)
  # Expose the verified token payload on the connection (nil on public
  # routes). Mirrors Python's connection.auth = payload.
  connection.auth = payload
  manager.register_connection(connection)

  # Start the idle reaper lazily once we actually have a connection (opt-in
  # via TINA4_WS_IDLE_TIMEOUT; a no-op when unset/0).
  manager.start_idle_reaper

  emit(:open, connection)

  Thread.new do
    begin
      loop do
        frame = connection.read_frame
        break unless frame

        connection.touch # mark activity for the idle reaper

        case frame[:opcode]
        when 0x1 # Text
          emit(:message, connection, frame[:data])
        when 0x8 # Close
          break
        when 0x9 # Ping
          connection.send_pong(frame[:data])
        end
      end
    rescue => e
      emit(:error, connection, e)
    ensure
      manager.unregister_connection(conn_id)
      emit(:close, connection)
      socket.close rescue nil
    end
  end
end

#idle_timeoutObject

Read the configured idle timeout (seconds). 0 = disabled.



360
361
362
363
364
# File 'lib/tina4/websocket.rb', line 360

def idle_timeout
  Float(ENV["TINA4_WS_IDLE_TIMEOUT"] || "0")
rescue ArgumentError, TypeError
  0.0
end

#on(event, &block) ⇒ Object



167
168
169
# File 'lib/tina4/websocket.rb', line 167

def on(event, &block)
  @handlers[event.to_sym] << block if @handlers.key?(event.to_sym)
end

#on_backplane_message(raw) ⇒ Object

Receive a raw envelope from the backplane. Runs in the backplane’s BACKGROUND THREAD.



432
433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/tina4/websocket.rb', line 432

def on_backplane_message(raw)
  env = begin
    JSON.parse(raw)
  rescue JSON::ParserError, TypeError
    return
  end
  return unless env.is_a?(Hash)

  # Origin guard: ignore our own broadcasts echoed back over the channel.
  # We already delivered them locally; relaying again would double-send.
  return if env["src"] == @instance_id

  relay_local(env)
end

#prune(conn) ⇒ Object

Remove a (presumed dead) connection from the manager + all rooms.



323
324
325
326
327
328
329
# File 'lib/tina4/websocket.rb', line 323

def prune(conn)
  id = conn.respond_to?(:id) ? conn.id : nil
  return unless id

  @conn_mutex.synchronize { @connections.delete(id) }
  remove_from_all_rooms(id)
end

#publish_envelope(kind, message, room: nil, path: nil, exclude: nil) ⇒ Object

Publish a broadcast to the shared channel for sibling instances. No-op when no backplane is configured. Best-effort — a publish failure logs and is swallowed so the local broadcast that already happened is never undone by a flaky message bus.



474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/tina4/websocket.rb', line 474

def publish_envelope(kind, message, room: nil, path: nil, exclude: nil)
  return unless @backplane

  envelope = {
    "src" => @instance_id,
    "kind" => kind,
    "exclude" => exclude,
    "room" => room,
    "path" => path
  }
  # JSON can't carry raw bytes — encode binary as base64, text as text.
  if binary_payload?(message)
    envelope["b64"] = Base64.strict_encode64(message.b)
  else
    envelope["text"] = message
  end
  begin
    @backplane.publish(@backplane_channel, JSON.generate(envelope))
  rescue StandardError => e
    Tina4::Log.warning("WebSocket backplane publish failed: #{e.message}") if defined?(Tina4::Log)
  end
end

#reap_idle(timeout) ⇒ Object

Close connections whose last inbound frame is older than timeout seconds. Returns the number reaped. timeout <= 0 is a no-op. Connections without a last_activity are skipped.



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/tina4/websocket.rb', line 341

def reap_idle(timeout)
  return 0 if timeout.to_f <= 0

  now = Time.now.to_f
  stale = @conn_mutex.synchronize do
    @connections.values.select do |conn|
      la = conn.respond_to?(:last_activity) ? conn.last_activity : nil
      la && (now - la) > timeout
    end
  end
  stale.each do |conn|
    conn.close(code: 1001, reason: "idle timeout") rescue nil
    prune(conn)
  end
  Tina4::Log.info("WebSocket idle reaper closed #{stale.size} connection(s)") if stale.any? && defined?(Tina4::Log)
  stale.size
end

#register_connection(connection) ⇒ Object

Register an open connection on this manager (thread-safe).



279
280
281
# File 'lib/tina4/websocket.rb', line 279

def register_connection(connection)
  @conn_mutex.synchronize { @connections[connection.id] = connection }
end

#relay_local(env) ⇒ Object

Deliver a remote-originated envelope to LOCAL connections only. NEVER re-publishes (that would loop the message around the cluster). Dispatches by “kind”: room / path / all.



450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'lib/tina4/websocket.rb', line 450

def relay_local(env)
  message = decode_envelope_message(env)
  return if message.nil?

  exclude = env["exclude"]
  targets =
    case env["kind"]
    when "room"
      room = env["room"]
      room ? get_room_connections(room) : []
    when "path"
      path = env["path"]
      path ? @conn_mutex.synchronize { @connections.values.select { |c| c.path == path } } : []
    else # "all" (and anything unknown) → every local connection
      @conn_mutex.synchronize { @connections.values }
    end
  targets = targets.reject { |conn| exclude && conn.id == exclude }
  deliver_resilient(targets, message)
end

#remove_from_all_rooms(conn_id) ⇒ Object



641
642
643
# File 'lib/tina4/websocket.rb', line 641

def remove_from_all_rooms(conn_id)
  @rooms.each_value { |members| members.delete(conn_id) }
end

#room_count(room_name) ⇒ Object



254
255
256
# File 'lib/tina4/websocket.rb', line 254

def room_count(room_name)
  (@rooms[room_name] || Set.new).size
end

#safe_send(conn, message) ⇒ Object

Send to ONE connection without letting a single dead client abort a broadcast loop. Returns true if delivered, false if the connection looks dead (the caller then prunes it). A failed send is logged, never silent. A connection whose own #send swallowed a write error and flipped #closed? is treated as dead too (mirrors Python’s ‘return not ws._closed`).



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/tina4/websocket.rb', line 300

def safe_send(conn, message)
  conn.send_text(message)
  # If the connection's own #send swallowed a write error it flips #closed?;
  # treat that as dead. Probe defensively so a stub/double that doesn't
  # define #closed? is simply treated as alive.
  dead = begin
    conn.respond_to?(:closed?) && conn.closed?
  rescue StandardError
    false
  end
  !dead
rescue StandardError => e
  Tina4::Log.warning("WebSocket send to #{conn.id} failed, pruning: #{e.message}") if defined?(Tina4::Log)
  false
end

#send_to(conn_id, message) ⇒ Object



227
228
229
230
231
232
# File 'lib/tina4/websocket.rb', line 227

def send_to(conn_id, message)
  conn = @conn_mutex.synchronize { @connections[conn_id] }
  return unless conn

  prune(conn) unless safe_send(conn, message)
end

#start(host: "0.0.0.0", port: 7147) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/tina4/websocket.rb', line 180

def start(host: "0.0.0.0", port: 7147)
  require "socket"
  @server_socket = TCPServer.new(host, port)
  @running = true
  @server_thread = Thread.new do
    while @running
      begin
        client = @server_socket.accept
        env = {}
        handle_upgrade(env, client)
      rescue => e
        break unless @running
      end
    end
  end
  self
end

#start_idle_reaperObject

Spin up a background reaper thread when an idle timeout is configured. Opt-in and non-breaking — unset/0 means no thread is created.



368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/tina4/websocket.rb', line 368

def start_idle_reaper
  timeout = idle_timeout
  return if timeout <= 0 || @reaper_thread&.alive?

  interval = [1.0, timeout / 2.0].max
  @reaper_running = true
  @reaper_thread = Thread.new do
    while @reaper_running
      sleep interval
      begin
        reap_idle(timeout)
      rescue StandardError => e
        Tina4::Log.error("WebSocket idle reaper sweep failed: #{e.message}") if defined?(Tina4::Log)
      end
    end
  end
end

#stopObject



198
199
200
201
202
203
204
# File 'lib/tina4/websocket.rb', line 198

def stop
  @running = false
  @server_socket&.close rescue nil
  @server_thread&.join(1)
  @connections.each_value { |conn| conn.close rescue nil }
  @connections.clear
end

#stop_idle_reaperObject



386
387
388
389
390
# File 'lib/tina4/websocket.rb', line 386

def stop_idle_reaper
  @reaper_running = false
  @reaper_thread&.kill
  @reaper_thread = nil
end

#unregister_connection(conn_id) ⇒ Object

Drop a connection on close (thread-safe) and remove it from all rooms.



284
285
286
287
# File 'lib/tina4/websocket.rb', line 284

def unregister_connection(conn_id)
  @conn_mutex.synchronize { @connections.delete(conn_id) }
  remove_from_all_rooms(conn_id)
end

#upgrade?(env) ⇒ Boolean

Returns:

  • (Boolean)


171
172
173
174
# File 'lib/tina4/websocket.rb', line 171

def upgrade?(env)
  upgrade = env["HTTP_UPGRADE"] || ""
  upgrade.downcase == "websocket"
end