Class: Tina4::WebSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/websocket.rb

Constant Summary collapse

GUID =
WEBSOCKET_GUID

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWebSocket

Returns a new instance of WebSocket.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/tina4/websocket.rb', line 67

def initialize
  @connections = {}
  @handlers = {
    open: [],
    message: [],
    close: [],
    error: []
  }
  @rooms = {}  # room_name => Set of conn_ids

  # ── Backplane (multi-instance scaling) ──────────────────────
  # Lazily wired on first broadcast (see ensure_backplane). Each instance
  # owns a stable id so it can ignore its own echoes coming back over the
  # shared pub/sub channel (the origin guard). The backplane listener runs
  # in its own Ruby thread, so the connections structure is guarded by a
  # mutex shared with the broadcast path.
  @backplane = nil
  @backplane_started = false
  @instance_id = SecureRandom.hex(8)
  @backplane_channel = Tina4::WEBSOCKET_BACKPLANE_CHANNEL
  @conn_mutex = Mutex.new
end

Instance Attribute Details

#backplane_channelObject (readonly)

Stable per-process id used as the backplane envelope “src” so an instance drops its own echoes. Exposed for tests / introspection.



92
93
94
# File 'lib/tina4/websocket.rb', line 92

def backplane_channel
  @backplane_channel
end

#connectionsObject (readonly)

Returns the value of attribute connections.



65
66
67
# File 'lib/tina4/websocket.rb', line 65

def connections
  @connections
end

#instance_idObject (readonly)

Stable per-process id used as the backplane envelope “src” so an instance drops its own echoes. Exposed for tests / introspection.



92
93
94
# File 'lib/tina4/websocket.rb', line 92

def instance_id
  @instance_id
end

Class Method Details

.route(path, &block) ⇒ Object

Register a WebSocket handler for a path (class method, matching Python’s WebSocketServer.route). The block receives a WebSocketConnection and should call conn.on_message / conn.on_close to wire up event callbacks.

Registers on the Router so routes work in integrated (Rack) mode.

Tina4::WebSocket.route("/chat") do |conn|
  conn.on_message { |data| conn.send(data) }
  conn.on_close   { puts "bye" }
end


454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'lib/tina4/websocket.rb', line 454

def self.route(path, &block)
  @route_handlers ||= {}
  @route_handlers[path] = block

  # Adapt to Router's (conn, event, data) style
  adapter = proc do |conn, event, data|
    case event
    when :open
      block.call(conn)
    when :message
      conn.on_message_handler&.call(data)
    when :close
      conn.on_close_handler&.call
    end
  end

  Tina4::Router.websocket(path, &adapter)
end

Instance Method Details

#_join_room(conn_id, room_name) ⇒ Object

Internal: add connection ID to a room (called by WebSocketConnection#join_room). Mirrors Python’s WebSocketManager._join_room — not part of the public API.



170
171
172
173
# File 'lib/tina4/websocket.rb', line 170

def _join_room(conn_id, room_name)
  @rooms[room_name] ||= Set.new
  @rooms[room_name].add(conn_id)
end

#_leave_room(conn_id, room_name) ⇒ Object

Internal: remove connection ID from a room (called by WebSocketConnection#leave_room). Mirrors Python’s WebSocketManager._leave_room — not part of the public API.



177
178
179
# File 'lib/tina4/websocket.rb', line 177

def _leave_room(conn_id, room_name)
  @rooms[room_name]&.delete(conn_id)
end

#binary_payload?(message) ⇒ Boolean

A message is “binary” when it is an ASCII-8BIT (BINARY-encoded) string. Ruby has no separate bytes type, so a caller’s choice of the BINARY encoding is the signal to treat the payload as bytes: it goes through base64 so it survives the JSON envelope AND arrives with its binary encoding intact on the relaying instance (a JSON “text” value would come back as UTF-8, silently re-encoding binary data).

Returns:

  • (Boolean)


439
440
441
# File 'lib/tina4/websocket.rb', line 439

def binary_payload?(message)
  message.is_a?(String) && message.encoding == Encoding::ASCII_8BIT
end

#broadcast(message, exclude: nil, path: nil) ⇒ Object



133
134
135
136
137
138
139
140
141
142
# File 'lib/tina4/websocket.rb', line 133

def broadcast(message, exclude: nil, path: nil)
  ensure_backplane
  targets = @conn_mutex.synchronize do
    @connections.select do |id, conn|
      !(exclude && id == exclude) && !(path && conn.path != path)
    end.values
  end
  deliver_resilient(targets, message)
  publish_envelope(path ? "path" : "all", message, path: path, exclude: exclude)
end

#broadcast_all(message, exclude: nil) ⇒ Object

Send to ALL connections (no path filter). Resilient + backplane-fanned.



145
146
147
148
149
150
151
152
# File 'lib/tina4/websocket.rb', line 145

def broadcast_all(message, exclude: nil)
  ensure_backplane
  targets = @conn_mutex.synchronize do
    @connections.reject { |id, _| exclude && id == exclude }.values
  end
  deliver_resilient(targets, message)
  publish_envelope("all", message, exclude: exclude)
end

#broadcast_to_room(room_name, message, exclude: nil) ⇒ Object



198
199
200
201
202
203
# File 'lib/tina4/websocket.rb', line 198

def broadcast_to_room(room_name, message, exclude: nil)
  ensure_backplane
  targets = get_room_connections(room_name).reject { |conn| exclude && conn.id == exclude }
  deliver_resilient(targets, message)
  publish_envelope("room", message, room: room_name, exclude: exclude)
end

#close(conn_id, code: 1000, reason: "") ⇒ Object



161
162
163
164
# File 'lib/tina4/websocket.rb', line 161

def close(conn_id, code: 1000, reason: "")
  conn = @connections[conn_id]
  conn&.close(code: code, reason: reason)
end

#decode_envelope_message(env) ⇒ Object

Reconstruct the original str/bytes message from an envelope. JSON can’t carry bytes, so text → … and bytes → base64(…).



426
427
428
429
430
431
# File 'lib/tina4/websocket.rb', line 426

def decode_envelope_message(env)
  return env["text"] if env.key?("text")
  return Base64.strict_decode64(env["b64"]).b if env.key?("b64")

  nil
end

#deliver_resilient(targets, message) ⇒ Object

Deliver message to every connection in targets, pruning any that fail.



244
245
246
247
# File 'lib/tina4/websocket.rb', line 244

def deliver_resilient(targets, message)
  dead = targets.reject { |conn| safe_send(conn, message) }
  dead.each { |conn| prune(conn) }
end

#emit(event, *args) ⇒ Object



540
541
542
# File 'lib/tina4/websocket.rb', line 540

def emit(event, *args)
  @handlers[event]&.each { |h| h.call(*args) }
end

#ensure_backplaneObject

Lazily wire the configured backplane. Idempotent and best-effort — a failure logs and leaves the manager local-only; it must NEVER crash a broadcast.



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/tina4/websocket.rb', line 338

def ensure_backplane
  return if @backplane_started

  # Set immediately so we only ever attempt the wiring once, even on failure
  # (no retry storm on every broadcast).
  @backplane_started = true
  begin
    backplane = Tina4::WebSocketBackplane.create_backplane
    return if backplane.nil? # No backplane configured — stay local-only.

    @backplane = backplane
    @backplane.subscribe(@backplane_channel) { |raw| on_backplane_message(raw) }
    Tina4::Log.info("WebSocket backplane active (instance #{@instance_id}, channel '#{@backplane_channel}')") if defined?(Tina4::Log)
  rescue StandardError => e
    @backplane = nil
    Tina4::Log.error("WebSocket backplane wiring failed, continuing local-only: #{e.message}") if defined?(Tina4::Log)
  end
end

#get_client_rooms(client_id) ⇒ Object

Return list of room names a given client/connection id belongs to. Matches PHP Tina4WebSocket::getClientRooms($clientId).



187
188
189
190
191
# File 'lib/tina4/websocket.rb', line 187

def get_client_rooms(client_id)
  @rooms.each_with_object([]) do |(name, members), acc|
    acc << name if members.include?(client_id)
  end
end

#get_clientsObject



103
104
105
# File 'lib/tina4/websocket.rb', line 103

def get_clients
  @connections
end

#get_room_connections(room_name) ⇒ Object



193
194
195
196
# File 'lib/tina4/websocket.rb', line 193

def get_room_connections(room_name)
  ids = @rooms[room_name] || Set.new
  ids.filter_map { |id| @connections[id] }
end

#handle_upgrade(env, socket, manager: self) ⇒ Object

Upgrade a raw socket to a WebSocket connection and run its frame loop.

manager is the engine that should OWN the connection (its @connections / rooms / backplane / idle reaper). It defaults to self. In integrated (Rack) mode the rack_app passes a process-wide shared engine here so that broadcasts, rooms and the backplane span every route’s connections even though each upgrade keeps its own isolated event handlers on self.



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
# File 'lib/tina4/websocket.rb', line 480

def handle_upgrade(env, socket, manager: self)
  key = env["HTTP_SEC_WEBSOCKET_KEY"]
  return unless key

  # Origin allow-list (opt-in via TINA4_WS_ALLOWED_ORIGINS). Unset = allow
  # all, so this never breaks an existing deployment. When set, an upgrade
  # from a non-listed Origin is refused with a 403 before the handshake.
  unless Tina4.websocket_origin_allowed?(env)
    socket.write("HTTP/1.1 403 Forbidden\r\n\r\n") rescue nil
    socket.close rescue nil
    return
  end

  accept = Tina4.compute_accept_key(key)

  response = "HTTP/1.1 101 Switching Protocols\r\n" \
             "Upgrade: websocket\r\n" \
             "Connection: Upgrade\r\n" \
             "Sec-WebSocket-Accept: #{accept}\r\n\r\n"

  socket.write(response)

  conn_id = SecureRandom.hex(16)
  ws_path = env["REQUEST_PATH"] || env["PATH_INFO"] || "/"
  connection = WebSocketConnection.new(conn_id, socket, ws_server: manager, path: ws_path)
  manager.register_connection(connection)

  # Start the idle reaper lazily once we actually have a connection (opt-in
  # via TINA4_WS_IDLE_TIMEOUT; a no-op when unset/0).
  manager.start_idle_reaper

  emit(:open, connection)

  Thread.new do
    begin
      loop do
        frame = connection.read_frame
        break unless frame

        connection.touch # mark activity for the idle reaper

        case frame[:opcode]
        when 0x1 # Text
          emit(:message, connection, frame[:data])
        when 0x8 # Close
          break
        when 0x9 # Ping
          connection.send_pong(frame[:data])
        end
      end
    rescue => e
      emit(:error, connection, e)
    ensure
      manager.unregister_connection(conn_id)
      emit(:close, connection)
      socket.close rescue nil
    end
  end
end

#idle_timeoutObject

Read the configured idle timeout (seconds). 0 = disabled.



287
288
289
290
291
# File 'lib/tina4/websocket.rb', line 287

def idle_timeout
  Float(ENV["TINA4_WS_IDLE_TIMEOUT"] || "0")
rescue ArgumentError, TypeError
  0.0
end

#on(event, &block) ⇒ Object



94
95
96
# File 'lib/tina4/websocket.rb', line 94

def on(event, &block)
  @handlers[event.to_sym] << block if @handlers.key?(event.to_sym)
end

#on_backplane_message(raw) ⇒ Object

Receive a raw envelope from the backplane. Runs in the backplane’s BACKGROUND THREAD.



359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/tina4/websocket.rb', line 359

def on_backplane_message(raw)
  env = begin
    JSON.parse(raw)
  rescue JSON::ParserError, TypeError
    return
  end
  return unless env.is_a?(Hash)

  # Origin guard: ignore our own broadcasts echoed back over the channel.
  # We already delivered them locally; relaying again would double-send.
  return if env["src"] == @instance_id

  relay_local(env)
end

#prune(conn) ⇒ Object

Remove a (presumed dead) connection from the manager + all rooms.



250
251
252
253
254
255
256
# File 'lib/tina4/websocket.rb', line 250

def prune(conn)
  id = conn.respond_to?(:id) ? conn.id : nil
  return unless id

  @conn_mutex.synchronize { @connections.delete(id) }
  remove_from_all_rooms(id)
end

#publish_envelope(kind, message, room: nil, path: nil, exclude: nil) ⇒ Object

Publish a broadcast to the shared channel for sibling instances. No-op when no backplane is configured. Best-effort — a publish failure logs and is swallowed so the local broadcast that already happened is never undone by a flaky message bus.



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/tina4/websocket.rb', line 401

def publish_envelope(kind, message, room: nil, path: nil, exclude: nil)
  return unless @backplane

  envelope = {
    "src" => @instance_id,
    "kind" => kind,
    "exclude" => exclude,
    "room" => room,
    "path" => path
  }
  # JSON can't carry raw bytes — encode binary as base64, text as text.
  if binary_payload?(message)
    envelope["b64"] = Base64.strict_encode64(message.b)
  else
    envelope["text"] = message
  end
  begin
    @backplane.publish(@backplane_channel, JSON.generate(envelope))
  rescue StandardError => e
    Tina4::Log.warning("WebSocket backplane publish failed: #{e.message}") if defined?(Tina4::Log)
  end
end

#reap_idle(timeout) ⇒ Object

Close connections whose last inbound frame is older than timeout seconds. Returns the number reaped. timeout <= 0 is a no-op. Connections without a last_activity are skipped.



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/tina4/websocket.rb', line 268

def reap_idle(timeout)
  return 0 if timeout.to_f <= 0

  now = Time.now.to_f
  stale = @conn_mutex.synchronize do
    @connections.values.select do |conn|
      la = conn.respond_to?(:last_activity) ? conn.last_activity : nil
      la && (now - la) > timeout
    end
  end
  stale.each do |conn|
    conn.close(code: 1001, reason: "idle timeout") rescue nil
    prune(conn)
  end
  Tina4::Log.info("WebSocket idle reaper closed #{stale.size} connection(s)") if stale.any? && defined?(Tina4::Log)
  stale.size
end

#register_connection(connection) ⇒ Object

Register an open connection on this manager (thread-safe).



206
207
208
# File 'lib/tina4/websocket.rb', line 206

def register_connection(connection)
  @conn_mutex.synchronize { @connections[connection.id] = connection }
end

#relay_local(env) ⇒ Object

Deliver a remote-originated envelope to LOCAL connections only. NEVER re-publishes (that would loop the message around the cluster). Dispatches by “kind”: room / path / all.



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/tina4/websocket.rb', line 377

def relay_local(env)
  message = decode_envelope_message(env)
  return if message.nil?

  exclude = env["exclude"]
  targets =
    case env["kind"]
    when "room"
      room = env["room"]
      room ? get_room_connections(room) : []
    when "path"
      path = env["path"]
      path ? @conn_mutex.synchronize { @connections.values.select { |c| c.path == path } } : []
    else # "all" (and anything unknown) → every local connection
      @conn_mutex.synchronize { @connections.values }
    end
  targets = targets.reject { |conn| exclude && conn.id == exclude }
  deliver_resilient(targets, message)
end

#remove_from_all_rooms(conn_id) ⇒ Object



544
545
546
# File 'lib/tina4/websocket.rb', line 544

def remove_from_all_rooms(conn_id)
  @rooms.each_value { |members| members.delete(conn_id) }
end

#room_count(room_name) ⇒ Object



181
182
183
# File 'lib/tina4/websocket.rb', line 181

def room_count(room_name)
  (@rooms[room_name] || Set.new).size
end

#safe_send(conn, message) ⇒ Object

Send to ONE connection without letting a single dead client abort a broadcast loop. Returns true if delivered, false if the connection looks dead (the caller then prunes it). A failed send is logged, never silent. A connection whose own #send swallowed a write error and flipped #closed? is treated as dead too (mirrors Python’s ‘return not ws._closed`).



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/tina4/websocket.rb', line 227

def safe_send(conn, message)
  conn.send_text(message)
  # If the connection's own #send swallowed a write error it flips #closed?;
  # treat that as dead. Probe defensively so a stub/double that doesn't
  # define #closed? is simply treated as alive.
  dead = begin
    conn.respond_to?(:closed?) && conn.closed?
  rescue StandardError
    false
  end
  !dead
rescue StandardError => e
  Tina4::Log.warning("WebSocket send to #{conn.id} failed, pruning: #{e.message}") if defined?(Tina4::Log)
  false
end

#send_to(conn_id, message) ⇒ Object



154
155
156
157
158
159
# File 'lib/tina4/websocket.rb', line 154

def send_to(conn_id, message)
  conn = @conn_mutex.synchronize { @connections[conn_id] }
  return unless conn

  prune(conn) unless safe_send(conn, message)
end

#start(host: "0.0.0.0", port: 7147) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/tina4/websocket.rb', line 107

def start(host: "0.0.0.0", port: 7147)
  require "socket"
  @server_socket = TCPServer.new(host, port)
  @running = true
  @server_thread = Thread.new do
    while @running
      begin
        client = @server_socket.accept
        env = {}
        handle_upgrade(env, client)
      rescue => e
        break unless @running
      end
    end
  end
  self
end

#start_idle_reaperObject

Spin up a background reaper thread when an idle timeout is configured. Opt-in and non-breaking — unset/0 means no thread is created.



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/tina4/websocket.rb', line 295

def start_idle_reaper
  timeout = idle_timeout
  return if timeout <= 0 || @reaper_thread&.alive?

  interval = [1.0, timeout / 2.0].max
  @reaper_running = true
  @reaper_thread = Thread.new do
    while @reaper_running
      sleep interval
      begin
        reap_idle(timeout)
      rescue StandardError => e
        Tina4::Log.error("WebSocket idle reaper sweep failed: #{e.message}") if defined?(Tina4::Log)
      end
    end
  end
end

#stopObject



125
126
127
128
129
130
131
# File 'lib/tina4/websocket.rb', line 125

def stop
  @running = false
  @server_socket&.close rescue nil
  @server_thread&.join(1)
  @connections.each_value { |conn| conn.close rescue nil }
  @connections.clear
end

#stop_idle_reaperObject



313
314
315
316
317
# File 'lib/tina4/websocket.rb', line 313

def stop_idle_reaper
  @reaper_running = false
  @reaper_thread&.kill
  @reaper_thread = nil
end

#unregister_connection(conn_id) ⇒ Object

Drop a connection on close (thread-safe) and remove it from all rooms.



211
212
213
214
# File 'lib/tina4/websocket.rb', line 211

def unregister_connection(conn_id)
  @conn_mutex.synchronize { @connections.delete(conn_id) }
  remove_from_all_rooms(conn_id)
end

#upgrade?(env) ⇒ Boolean

Returns:

  • (Boolean)


98
99
100
101
# File 'lib/tina4/websocket.rb', line 98

def upgrade?(env)
  upgrade = env["HTTP_UPGRADE"] || ""
  upgrade.downcase == "websocket"
end