Class: Supabase::Realtime::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/client.rb

Overview

Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.

Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket. If no transport is supplied, a default Sockets::WebsocketClientSimple adapter is constructed automatically so ‘Supabase.create_client(…).realtime.channel(…).subscribe` works out of the box.

client   = Supabase::Realtime::Client.new(
  url: "wss://project.supabase.co/realtime/v1",
  params: { apikey: key }
)

channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)

    WebSocket endpoint (ws:// or wss://). Plain http(s) are upgraded.

  • params (Hash) (defaults to: {})

    query-string params merged onto the URL (e.g. apikey/access_token)

  • transport (Socket, nil) (defaults to: nil)

    inject your own transport. If nil, the production websocket-client-simple adapter is constructed from URL+params.

  • socket (Socket, nil) (defaults to: nil)

    deprecated alias for ‘transport:` — kept for back compat.

  • timeout (Numeric) (defaults to: Types::DEFAULT_TIMEOUT_SECONDS)

    default per-push timeout (seconds)

  • heartbeat_interval (Numeric) (defaults to: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS)

    seconds between automatic heartbeat pushes (0 disables)

  • auto_reconnect (Boolean) (defaults to: true)

    reconnect on unexpected socket close

  • max_retries (Integer) (defaults to: 5)

    maximum reconnect attempts before giving up

  • initial_backoff (Numeric) (defaults to: 1.0)

    seconds of delay before the first reconnect attempt; doubles each attempt up to a 60s cap (matches supabase-py)

  • logger (#warn, nil) (defaults to: nil)

    optional logger for non-fatal events. Used by Supabase::Realtime::CallbackSafety to record exceptions raised inside user-supplied channel/presence/push callbacks without killing the read-thread (US-002). Falls back to ‘Kernel#warn` ($stderr) when nil.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/supabase/realtime/client.rb', line 53

def initialize(url:, params: {}, transport: nil, socket: nil,
               timeout: Types::DEFAULT_TIMEOUT_SECONDS,
               heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
               auto_reconnect: true, max_retries: 5, initial_backoff: 1.0,
               logger: nil)
  unless Transformers.is_ws_url(url)
    raise ArgumentError,
          "Invalid Realtime URL #{url.inspect}: expected ws://, wss://, http://, or https://"
  end

  @url     = normalize_url(url, params)
  @params  = params
  @access_token = params[:access_token] || params["access_token"]
  @channels = []
  @socket   = transport || socket || build_default_transport
  @timeout  = timeout
  @ref      = 0

  @heartbeat_interval = heartbeat_interval
  @auto_reconnect     = auto_reconnect
  @max_retries        = max_retries
  @initial_backoff    = initial_backoff
  @logger             = logger
  @heartbeat_thread   = nil
  @reconnect_thread   = nil
  @intentionally_closed = false
  @send_buffer        = [] # frames queued while no socket / not connected
  @send_buffer_mutex  = Mutex.new
  @reconnect_failed_callbacks = []

  attach_socket if @socket
end

Instance Attribute Details

#access_tokenObject (readonly)

Returns the value of attribute access_token.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def access_token
  @access_token
end

#auto_reconnectObject (readonly)

Returns the value of attribute auto_reconnect.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def auto_reconnect
  @auto_reconnect
end

#channelsObject (readonly)

Returns the value of attribute channels.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def channels
  @channels
end

#heartbeat_intervalObject (readonly)

Returns the value of attribute heartbeat_interval.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def heartbeat_interval
  @heartbeat_interval
end

#initial_backoffObject (readonly)

Returns the value of attribute initial_backoff.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def initial_backoff
  @initial_backoff
end

#loggerObject (readonly)

Returns the value of attribute logger.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def logger
  @logger
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def max_retries
  @max_retries
end

#paramsObject (readonly)

Returns the value of attribute params.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def params
  @params
end

#socketObject (readonly)

Returns the value of attribute socket.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def socket
  @socket
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def timeout
  @timeout
end

#urlObject (readonly)

Returns the value of attribute url.



34
35
36
# File 'lib/supabase/realtime/client.rb', line 34

def url
  @url
end

Instance Method Details

#channel(topic, params: nil) ⇒ Object

Always returns a new Channel instance, matching supabase-py. The client-side topic registry is a flat list, so multiple channels can share a topic (each with its own join_ref / subscription lifecycle). To look up an existing channel, walk ‘get_channels.find { |c| c.topic == … }`.

Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.



150
151
152
153
154
155
# File 'lib/supabase/realtime/client.rb', line 150

def channel(topic, params: nil)
  full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}"
  ch = Channel.new(full_topic, params: params, socket: self)
  @channels << ch
  ch
end

#connectObject



114
115
116
117
118
119
120
121
122
123
# File 'lib/supabase/realtime/client.rb', line 114

def connect
  unless @socket
    @socket = build_default_transport
    attach_socket
  end

  @intentionally_closed = false
  @socket.connect
  self
end

#connected?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/supabase/realtime/client.rb', line 137

def connected?
  @socket && @socket.connected?
end

#disconnectObject Also known as: close



125
126
127
128
129
130
131
132
# File 'lib/supabase/realtime/client.rb', line 125

def disconnect
  @intentionally_closed = true
  stop_reconnect
  stop_heartbeat
  @socket&.close
  @channels.each { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) }
  self
end

#get_channelsObject



157
158
159
# File 'lib/supabase/realtime/client.rb', line 157

def get_channels
  @channels.dup
end

#next_refObject

Used by Channel — increments a shared counter so refs are unique per socket.



224
225
226
227
# File 'lib/supabase/realtime/client.rb', line 224

def next_ref
  @ref += 1
  @ref.to_s
end

#on_reconnect_failed(&block) ⇒ Object

Register a callback fired exactly once when the background reconnect loop exhausts ‘max_retries` without re-establishing the socket. The callback receives the last underlying exception raised by the transport’s ‘connect` (or `nil` if no attempt was made — currently unreachable but kept for forward-compat).

Why this exists (US-003 / FR-4): supabase-py’s ‘connect()` is a single coroutine that raises on permanent failure. The rb port runs reconnect on a background thread, so a `raise` would die unobserved. This callback is the rb-shaped equivalent — see `lib/supabase/realtime/README.md` “Realtime reconnect: отличие от supabase-py”.

Multiple registrations are allowed; each fires in registration order. The user block is wrapped in Supabase::Realtime::CallbackSafety.safe so a raise inside one callback never blocks the next one (consistent with US-002).



102
103
104
105
# File 'lib/supabase/realtime/client.rb', line 102

def on_reconnect_failed(&block)
  @reconnect_failed_callbacks << block
  self
end

#push(message) ⇒ Object

Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/supabase/realtime/client.rb', line 232

def push(message)
  frame = JSON.generate(
    "event"    => message.event,
    "topic"    => message.topic,
    "payload"  => message.payload,
    "ref"      => message.ref,
    "join_ref" => message.join_ref
  )

  if connected?
    @socket.send(frame)
  else
    @send_buffer_mutex.synchronize { @send_buffer << frame }
  end
end

#remove_all_channelsObject

Unsubscribe every tracked channel and clear the registry. Iterates over a snapshot (‘@channels.dup`) so a channel that removes itself during `unsubscribe` doesn’t shift the array mid-loop. Idempotent: a follow-up call on an empty registry is a no-op.

See Also:

  • supabase/_sync/client.py:234


172
173
174
175
176
# File 'lib/supabase/realtime/client.rb', line 172

def remove_all_channels
  @channels.dup.each { |ch| ch.unsubscribe }
  @channels.clear
  self
end

#remove_channel(channel) ⇒ Object



161
162
163
164
165
# File 'lib/supabase/realtime/client.rb', line 161

def remove_channel(channel)
  channel.unsubscribe
  @channels.delete(channel)
  @socket&.close if @channels.empty?
end

#send_heartbeatObject

Manually emit a heartbeat. Real adapters typically wire this onto a timer.



211
212
213
214
215
216
217
218
219
220
221
# File 'lib/supabase/realtime/client.rb', line 211

def send_heartbeat
  return unless connected?

  @socket.send(JSON.generate(
    "event"    => Types::ChannelEvents::HEARTBEAT,
    "topic"    => Types::PHOENIX_TOPIC,
    "payload"  => {},
    "ref"      => next_ref,
    "join_ref" => nil
  ))
end

#set_auth(token) ⇒ Object

Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.

Safe to call before ‘connect`: the token is always written to `@access_token` / `@params` so the next subscribe picks it up via `Channel#inject_postgres_changes_bindings`. The ACCESS_TOKEN frame fan-out only runs once the socket is actually connected.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/supabase/realtime/client.rb', line 185

def set_auth(token)
  @access_token = token
  @params["access_token"] = token if @params.is_a?(Hash)

  if connected?
    @channels.each do |channel|
      next unless channel.joined?

      msg = Message.new(
        event:   Types::ChannelEvents::ACCESS_TOKEN,
        topic:   channel.topic,
        payload: { "access_token" => token },
        ref:     next_ref
      )
      @socket.send(JSON.generate(
        "event"    => msg.event,
        "topic"    => msg.topic,
        "payload"  => msg.payload,
        "ref"      => msg.ref,
        "join_ref" => nil
      ))
    end
  end
end

#use_socket(socket) ⇒ Object

Plug in a transport after construction (e.g. a websocket-client-simple wrapper).



108
109
110
111
112
# File 'lib/supabase/realtime/client.rb', line 108

def use_socket(socket)
  @socket = socket
  attach_socket
  self
end