Class: Supabase::Realtime::Client
- Inherits:
-
Object
- Object
- Supabase::Realtime::Client
- Defined in:
- lib/supabase/realtime/client.rb
Overview
Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.
Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket. If no transport is supplied, a default Sockets::WebsocketClientSimple adapter is constructed automatically so ‘Supabase.create_client(…).realtime.channel(…).subscribe` works out of the box.
client = Supabase::Realtime::Client.new(
url: "wss://project.supabase.co/realtime/v1",
params: { apikey: key }
)
channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe
Instance Attribute Summary collapse
-
#access_token ⇒ Object
readonly
Returns the value of attribute access_token.
-
#auto_reconnect ⇒ Object
readonly
Returns the value of attribute auto_reconnect.
-
#channels ⇒ Object
readonly
Returns the value of attribute channels.
-
#heartbeat_interval ⇒ Object
readonly
Returns the value of attribute heartbeat_interval.
-
#initial_backoff ⇒ Object
readonly
Returns the value of attribute initial_backoff.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#_remove_channel(channel) ⇒ Object
Internal: drop a channel from the registry without unsubscribing it.
-
#channel(topic, params: nil) ⇒ Object
Always returns a new Channel instance, matching supabase-py.
-
#connect ⇒ Object
Establish the WebSocket connection.
- #connected? ⇒ Boolean
- #disconnect ⇒ Object (also: #close)
- #get_channels ⇒ Object
-
#initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) ⇒ Client
constructor
A new instance of Client.
-
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
-
#on_reconnect_failed(&block) ⇒ Object
Register a callback fired exactly once when the background reconnect loop exhausts ‘max_retries` without re-establishing the socket.
-
#push(message) ⇒ Object
Used by Channel#send_push.
-
#remove_all_channels ⇒ Object
Unsubscribe every tracked channel and clear the registry.
- #remove_channel(channel) ⇒ Object
-
#send_heartbeat ⇒ Object
Manually emit a heartbeat.
-
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
-
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
Constructor Details
#initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) ⇒ Client
Returns a new instance of Client.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/supabase/realtime/client.rb', line 55 def initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) unless Transformers.is_ws_url(url) raise ArgumentError, "Invalid Realtime URL #{url.inspect}: expected ws://, wss://, http://, or https://" end @url = normalize_url(url, params) @params = params @access_token = params[:access_token] || params["access_token"] @channels = [] @socket = transport || socket || build_default_transport @timeout = timeout @ref = 0 @heartbeat_interval = heartbeat_interval @auto_reconnect = auto_reconnect @max_retries = max_retries @initial_backoff = initial_backoff @logger = logger @heartbeat_thread = nil @reconnect_thread = nil @connecting = false @intentionally_closed = false @send_buffer = [] # frames queued while no socket / not connected @send_buffer_mutex = Mutex.new @reconnect_failed_callbacks = [] attach_socket if @socket end |
Instance Attribute Details
#access_token ⇒ Object (readonly)
Returns the value of attribute access_token.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def access_token @access_token end |
#auto_reconnect ⇒ Object (readonly)
Returns the value of attribute auto_reconnect.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def auto_reconnect @auto_reconnect end |
#channels ⇒ Object (readonly)
Returns the value of attribute channels.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def channels @channels end |
#heartbeat_interval ⇒ Object (readonly)
Returns the value of attribute heartbeat_interval.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def heartbeat_interval @heartbeat_interval end |
#initial_backoff ⇒ Object (readonly)
Returns the value of attribute initial_backoff.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def initial_backoff @initial_backoff end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def logger @logger end |
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def max_retries @max_retries end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def params @params end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def socket @socket end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def timeout @timeout end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def url @url end |
Instance Method Details
#_remove_channel(channel) ⇒ Object
Internal: drop a channel from the registry without unsubscribing it. Called by Supabase::Realtime::Channel#on_close when a channel reaches CLOSED on its own (leave-ack or a server phx_close), mirroring supabase-py’s ‘socket._remove_channel` (client.py:294-295). Distinct from the public #remove_channel, which actively unsubscribes. Removes the specific channel object (topics can repeat in the flat registry). Does not close the socket — that auto-close only happens via the explicit remove_channel/remove_all_channels paths, matching py.
218 219 220 |
# File 'lib/supabase/realtime/client.rb', line 218 def _remove_channel(channel) @channels.delete(channel) end |
#channel(topic, params: nil) ⇒ Object
Always returns a new Channel instance, matching supabase-py. The client-side topic registry is a flat list, so multiple channels can share a topic (each with its own join_ref / subscription lifecycle). To look up an existing channel, walk ‘get_channels.find { |c| c.topic == … }`.
Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.
188 189 190 191 192 193 |
# File 'lib/supabase/realtime/client.rb', line 188 def channel(topic, params: nil) full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}" ch = Channel.new(full_topic, params: params, socket: self) @channels << ch ch end |
#connect ⇒ Object
Establish the WebSocket connection. Mirrors supabase-py’s ‘connect()` (client.py:141-193): synchronous transport failures are retried with exponential backoff — `initial_backoff * 2^(n-1)` seconds, capped at 60s — for up to `max_retries` total attempts, then the last error is re-raised to the caller. With `auto_reconnect: false` the first failure raises immediately, as in py.
Only failures that ‘Socket#connect` raises synchronously are retried here. Transports that report failure asynchronously (on_error/on_close after connect returns) are recovered by the background reconnect loop (schedule_reconnect → on_reconnect_failed) — same contract, different signal path. A concurrent `disconnect` aborts the retry loop quietly.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/supabase/realtime/client.rb', line 129 def connect unless @socket @socket = build_default_transport attach_socket end # Idempotent: if a connection is already open or in flight, don't kick # off a second transport.connect. A duplicate connect can produce a # second on_open, which would fire rejoin_channels twice and send a # duplicate join per channel (the server then phx_closes the extra one). # This matters because Channel#subscribe calls connect when the socket # isn't open yet, and the caller may have already called connect. return self if connected? || @connecting @intentionally_closed = false attempts = 0 begin @connecting = true @socket.connect rescue StandardError # Reset the in-flight flag so the retry (and any later connect call) # isn't short-circuited by the idempotency guard above. @connecting = false attempts += 1 raise if !@auto_reconnect || attempts >= @max_retries sleep [@initial_backoff * (2**(attempts - 1)), 60.0].min return self if @intentionally_closed retry end self end |
#connected? ⇒ Boolean
175 176 177 |
# File 'lib/supabase/realtime/client.rb', line 175 def connected? @socket && @socket.connected? end |
#disconnect ⇒ Object Also known as: close
163 164 165 166 167 168 169 170 |
# File 'lib/supabase/realtime/client.rb', line 163 def disconnect @intentionally_closed = true stop_reconnect stop_heartbeat @socket&.close @channels.each { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) } self end |
#get_channels ⇒ Object
195 196 197 |
# File 'lib/supabase/realtime/client.rb', line 195 def get_channels @channels.dup end |
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
271 272 273 274 |
# File 'lib/supabase/realtime/client.rb', line 271 def next_ref @ref += 1 @ref.to_s end |
#on_reconnect_failed(&block) ⇒ Object
Register a callback fired exactly once when the background reconnect loop exhausts ‘max_retries` without re-establishing the socket. The callback receives the last underlying exception raised by the transport’s ‘connect` (or `nil` if no attempt was made — currently unreachable but kept for forward-compat).
Why this exists (US-003 / FR-4): supabase-py’s ‘connect()` is a single coroutine that raises on permanent failure. The rb port runs reconnect on a background thread, so a `raise` would die unobserved. This callback is the rb-shaped equivalent — see `lib/supabase/realtime/README.md` “Realtime reconnect: отличие от supabase-py”.
Multiple registrations are allowed; each fires in registration order. The user block is wrapped in Supabase::Realtime::CallbackSafety.safe so a raise inside one callback never blocks the next one (consistent with US-002).
105 106 107 108 |
# File 'lib/supabase/realtime/client.rb', line 105 def on_reconnect_failed(&block) @reconnect_failed_callbacks << block self end |
#push(message) ⇒ Object
Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/supabase/realtime/client.rb', line 279 def push() frame = JSON.generate( "event" => .event, "topic" => .topic, "payload" => .payload, "ref" => .ref, "join_ref" => .join_ref ) if connected? @socket.send(frame) else @send_buffer_mutex.synchronize { @send_buffer << frame } end end |
#remove_all_channels ⇒ Object
Unsubscribe every tracked channel and clear the registry. Iterates over a snapshot (‘@channels.dup`) so a channel that removes itself during `unsubscribe` doesn’t shift the array mid-loop. Idempotent: a follow-up call on an empty registry is a no-op.
227 228 229 230 231 232 233 234 |
# File 'lib/supabase/realtime/client.rb', line 227 def remove_all_channels @channels.dup.each { |ch| ch.unsubscribe } @channels.clear # supabase-py's remove_all_channels unsubscribes every channel and then # `await self.close()`. Match that — intentional close, no reconnect. disconnect self end |
#remove_channel(channel) ⇒ Object
199 200 201 202 203 204 205 206 207 208 |
# File 'lib/supabase/realtime/client.rb', line 199 def remove_channel(channel) channel.unsubscribe @channels.delete(channel) # Close the socket once the registry empties — mirrors supabase-py's # `remove_channel` (which calls `self.close()` when `len(channels) == 0`). # Use the intentional-close path (`disconnect`), not a bare # `@socket.close`: the latter fires on_close → schedule_reconnect and the # socket would immediately come back up. disconnect if @channels.empty? end |
#send_heartbeat ⇒ Object
Manually emit a heartbeat. Real adapters typically wire this onto a timer.
258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/supabase/realtime/client.rb', line 258 def send_heartbeat return unless connected? @socket.send(JSON.generate( "event" => Types::ChannelEvents::HEARTBEAT, "topic" => Types::PHOENIX_TOPIC, "payload" => {}, "ref" => next_ref, "join_ref" => nil )) end |
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
Safe to call before ‘connect`: the token is always written to `@access_token` / `@params` so the next subscribe picks it up via `Channel#inject_postgres_changes_bindings`.
The fan-out mirrors supabase-py (client.py:333-337): for every joined channel, ‘channel.push(access_token, token)`. Routing through the channel’s push path (rather than sending a raw frame only when ‘connected?`) means a rotation issued while the socket is briefly offline is buffered and replayed on reconnect, not silently dropped.
248 249 250 251 252 253 254 255 |
# File 'lib/supabase/realtime/client.rb', line 248 def set_auth(token) @access_token = token @params["access_token"] = token if @params.is_a?(Hash) @channels.each do |channel| channel.push_access_token(token) if channel.joined? end end |
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
111 112 113 114 115 |
# File 'lib/supabase/realtime/client.rb', line 111 def use_socket(socket) @socket = socket attach_socket self end |