Class: Supabase::Realtime::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/client.rb

Overview

Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.

Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket. If no transport is supplied, a default Sockets::WebsocketClientSimple adapter is constructed automatically so ‘Supabase.create_client(…).realtime.channel(…).subscribe` works out of the box.

client   = Supabase::Realtime::Client.new(
  url: "wss://project.supabase.co/realtime/v1",
  params: { apikey: key }
)

channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)

    WebSocket endpoint (ws:// or wss://). Plain http(s) are upgraded.

  • params (Hash) (defaults to: {})

    query-string params merged onto the URL (e.g. apikey). ‘access_token` is accepted here but is NOT serialized into the URL —it is carried in join payloads / access_token pushes instead.

  • transport (Socket, nil) (defaults to: nil)

    inject your own transport. If nil, the production websocket-client-simple adapter is constructed from URL+params.

  • socket (Socket, nil) (defaults to: nil)

    deprecated alias for ‘transport:` — kept for back compat.

  • timeout (Numeric) (defaults to: Types::DEFAULT_TIMEOUT_SECONDS)

    default per-push timeout (seconds)

  • heartbeat_interval (Numeric) (defaults to: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS)

    seconds between automatic heartbeat pushes (0 disables)

  • auto_reconnect (Boolean) (defaults to: true)

    reconnect on unexpected socket close

  • max_retries (Integer) (defaults to: 5)

    maximum reconnect attempts before giving up

  • initial_backoff (Numeric) (defaults to: 1.0)

    seconds of delay before the first reconnect attempt; doubles each attempt up to a 60s cap (matches supabase-py)

  • logger (#warn, nil) (defaults to: nil)

    optional logger for non-fatal events. Used by Supabase::Realtime::CallbackSafety to record exceptions raised inside user-supplied channel/presence/push callbacks without killing the read-thread (US-002). Falls back to ‘Kernel#warn` ($stderr) when nil.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/supabase/realtime/client.rb', line 55

def initialize(url:, params: {}, transport: nil, socket: nil,
               timeout: Types::DEFAULT_TIMEOUT_SECONDS,
               heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
               auto_reconnect: true, max_retries: 5, initial_backoff: 1.0,
               logger: nil)
  unless Transformers.is_ws_url(url)
    raise ArgumentError,
          "Invalid Realtime URL #{url.inspect}: expected ws://, wss://, http://, or https://"
  end

  @url     = normalize_url(url, params)
  @params  = params
  @access_token = params[:access_token] || params["access_token"]
  @channels = []
  @socket   = transport || socket || build_default_transport
  @timeout  = timeout
  @ref      = 0

  @heartbeat_interval = heartbeat_interval
  @auto_reconnect     = auto_reconnect
  @max_retries        = max_retries
  @initial_backoff    = initial_backoff
  @logger             = logger
  @heartbeat_thread   = nil
  @reconnect_thread   = nil
  @connecting         = false
  @intentionally_closed = false
  @send_buffer        = [] # frames queued while no socket / not connected
  @send_buffer_mutex  = Mutex.new
  @reconnect_failed_callbacks = []

  attach_socket if @socket
end

Instance Attribute Details

#access_tokenObject (readonly)

Returns the value of attribute access_token.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def access_token
  @access_token
end

#auto_reconnectObject (readonly)

Returns the value of attribute auto_reconnect.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def auto_reconnect
  @auto_reconnect
end

#channelsObject (readonly)

Returns the value of attribute channels.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def channels
  @channels
end

#heartbeat_intervalObject (readonly)

Returns the value of attribute heartbeat_interval.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def heartbeat_interval
  @heartbeat_interval
end

#initial_backoffObject (readonly)

Returns the value of attribute initial_backoff.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def initial_backoff
  @initial_backoff
end

#loggerObject (readonly)

Returns the value of attribute logger.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def logger
  @logger
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def max_retries
  @max_retries
end

#paramsObject (readonly)

Returns the value of attribute params.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def params
  @params
end

#socketObject (readonly)

Returns the value of attribute socket.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def socket
  @socket
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def timeout
  @timeout
end

#urlObject (readonly)

Returns the value of attribute url.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def url
  @url
end

Instance Method Details

#_remove_channel(channel) ⇒ Object

Internal: drop a channel from the registry without unsubscribing it. Called by Supabase::Realtime::Channel#on_close when a channel reaches CLOSED on its own (leave-ack or a server phx_close), mirroring supabase-py’s ‘socket._remove_channel` (client.py:294-295). Distinct from the public #remove_channel, which actively unsubscribes. Removes the specific channel object (topics can repeat in the flat registry). Does not close the socket — that auto-close only happens via the explicit remove_channel/remove_all_channels paths, matching py.



218
219
220
# File 'lib/supabase/realtime/client.rb', line 218

def _remove_channel(channel)
  @channels.delete(channel)
end

#channel(topic, params: nil) ⇒ Object

Always returns a new Channel instance, matching supabase-py. The client-side topic registry is a flat list, so multiple channels can share a topic (each with its own join_ref / subscription lifecycle). To look up an existing channel, walk ‘get_channels.find { |c| c.topic == … }`.

Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.



188
189
190
191
192
193
# File 'lib/supabase/realtime/client.rb', line 188

def channel(topic, params: nil)
  full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}"
  ch = Channel.new(full_topic, params: params, socket: self)
  @channels << ch
  ch
end

#connectObject

Establish the WebSocket connection. Mirrors supabase-py’s ‘connect()` (client.py:141-193): synchronous transport failures are retried with exponential backoff — `initial_backoff * 2^(n-1)` seconds, capped at 60s — for up to `max_retries` total attempts, then the last error is re-raised to the caller. With `auto_reconnect: false` the first failure raises immediately, as in py.

Only failures that ‘Socket#connect` raises synchronously are retried here. Transports that report failure asynchronously (on_error/on_close after connect returns) are recovered by the background reconnect loop (schedule_reconnect → on_reconnect_failed) — same contract, different signal path. A concurrent `disconnect` aborts the retry loop quietly.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/supabase/realtime/client.rb', line 129

def connect
  unless @socket
    @socket = build_default_transport
    attach_socket
  end

  # Idempotent: if a connection is already open or in flight, don't kick
  # off a second transport.connect. A duplicate connect can produce a
  # second on_open, which would fire rejoin_channels twice and send a
  # duplicate join per channel (the server then phx_closes the extra one).
  # This matters because Channel#subscribe calls connect when the socket
  # isn't open yet, and the caller may have already called connect.
  return self if connected? || @connecting

  @intentionally_closed = false
  attempts = 0
  begin
    @connecting = true
    @socket.connect
  rescue StandardError
    # Reset the in-flight flag so the retry (and any later connect call)
    # isn't short-circuited by the idempotency guard above.
    @connecting = false
    attempts += 1
    raise if !@auto_reconnect || attempts >= @max_retries

    sleep [@initial_backoff * (2**(attempts - 1)), 60.0].min
    return self if @intentionally_closed

    retry
  end
  self
end

#connected?Boolean

Returns:

  • (Boolean)


175
176
177
# File 'lib/supabase/realtime/client.rb', line 175

def connected?
  @socket && @socket.connected?
end

#disconnectObject Also known as: close



163
164
165
166
167
168
169
170
# File 'lib/supabase/realtime/client.rb', line 163

def disconnect
  @intentionally_closed = true
  stop_reconnect
  stop_heartbeat
  @socket&.close
  @channels.each { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) }
  self
end

#get_channelsObject



195
196
197
# File 'lib/supabase/realtime/client.rb', line 195

def get_channels
  @channels.dup
end

#next_refObject

Used by Channel — increments a shared counter so refs are unique per socket.



271
272
273
274
# File 'lib/supabase/realtime/client.rb', line 271

def next_ref
  @ref += 1
  @ref.to_s
end

#on_reconnect_failed(&block) ⇒ Object

Register a callback fired exactly once when the background reconnect loop exhausts ‘max_retries` without re-establishing the socket. The callback receives the last underlying exception raised by the transport’s ‘connect` (or `nil` if no attempt was made — currently unreachable but kept for forward-compat).

Why this exists (US-003 / FR-4): supabase-py’s ‘connect()` is a single coroutine that raises on permanent failure. The rb port runs reconnect on a background thread, so a `raise` would die unobserved. This callback is the rb-shaped equivalent — see `lib/supabase/realtime/README.md` “Realtime reconnect: отличие от supabase-py”.

Multiple registrations are allowed; each fires in registration order. The user block is wrapped in Supabase::Realtime::CallbackSafety.safe so a raise inside one callback never blocks the next one (consistent with US-002).



105
106
107
108
# File 'lib/supabase/realtime/client.rb', line 105

def on_reconnect_failed(&block)
  @reconnect_failed_callbacks << block
  self
end

#push(message) ⇒ Object

Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/supabase/realtime/client.rb', line 279

def push(message)
  frame = JSON.generate(
    "event"    => message.event,
    "topic"    => message.topic,
    "payload"  => message.payload,
    "ref"      => message.ref,
    "join_ref" => message.join_ref
  )

  if connected?
    @socket.send(frame)
  else
    @send_buffer_mutex.synchronize { @send_buffer << frame }
  end
end

#remove_all_channelsObject

Unsubscribe every tracked channel and clear the registry. Iterates over a snapshot (‘@channels.dup`) so a channel that removes itself during `unsubscribe` doesn’t shift the array mid-loop. Idempotent: a follow-up call on an empty registry is a no-op.

See Also:

  • supabase/_sync/client.py:234


227
228
229
230
231
232
233
234
# File 'lib/supabase/realtime/client.rb', line 227

def remove_all_channels
  @channels.dup.each { |ch| ch.unsubscribe }
  @channels.clear
  # supabase-py's remove_all_channels unsubscribes every channel and then
  # `await self.close()`. Match that — intentional close, no reconnect.
  disconnect
  self
end

#remove_channel(channel) ⇒ Object



199
200
201
202
203
204
205
206
207
208
# File 'lib/supabase/realtime/client.rb', line 199

def remove_channel(channel)
  channel.unsubscribe
  @channels.delete(channel)
  # Close the socket once the registry empties — mirrors supabase-py's
  # `remove_channel` (which calls `self.close()` when `len(channels) == 0`).
  # Use the intentional-close path (`disconnect`), not a bare
  # `@socket.close`: the latter fires on_close → schedule_reconnect and the
  # socket would immediately come back up.
  disconnect if @channels.empty?
end

#send_heartbeatObject

Manually emit a heartbeat. Real adapters typically wire this onto a timer.



258
259
260
261
262
263
264
265
266
267
268
# File 'lib/supabase/realtime/client.rb', line 258

def send_heartbeat
  return unless connected?

  @socket.send(JSON.generate(
    "event"    => Types::ChannelEvents::HEARTBEAT,
    "topic"    => Types::PHOENIX_TOPIC,
    "payload"  => {},
    "ref"      => next_ref,
    "join_ref" => nil
  ))
end

#set_auth(token) ⇒ Object

Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.

Safe to call before ‘connect`: the token is always written to `@access_token` / `@params` so the next subscribe picks it up via `Channel#inject_postgres_changes_bindings`.

The fan-out mirrors supabase-py (client.py:333-337): for every joined channel, ‘channel.push(access_token, token)`. Routing through the channel’s push path (rather than sending a raw frame only when ‘connected?`) means a rotation issued while the socket is briefly offline is buffered and replayed on reconnect, not silently dropped.



248
249
250
251
252
253
254
255
# File 'lib/supabase/realtime/client.rb', line 248

def set_auth(token)
  @access_token = token
  @params["access_token"] = token if @params.is_a?(Hash)

  @channels.each do |channel|
    channel.push_access_token(token) if channel.joined?
  end
end

#use_socket(socket) ⇒ Object

Plug in a transport after construction (e.g. a websocket-client-simple wrapper).



111
112
113
114
115
# File 'lib/supabase/realtime/client.rb', line 111

def use_socket(socket)
  @socket = socket
  attach_socket
  self
end