Class: Supabase::Realtime::Client
- Inherits:
-
Object
- Object
- Supabase::Realtime::Client
- Defined in:
- lib/supabase/realtime/client.rb
Overview
Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.
Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket. If no transport is supplied, a default Sockets::WebsocketClientSimple adapter is constructed automatically so ‘Supabase.create_client(…).realtime.channel(…).subscribe` works out of the box.
client = Supabase::Realtime::Client.new(
url: "wss://project.supabase.co/realtime/v1",
params: { apikey: key }
)
channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe
Instance Attribute Summary collapse
-
#access_token ⇒ Object
readonly
Returns the value of attribute access_token.
-
#auto_reconnect ⇒ Object
readonly
Returns the value of attribute auto_reconnect.
-
#channels ⇒ Object
readonly
Returns the value of attribute channels.
-
#heartbeat_interval ⇒ Object
readonly
Returns the value of attribute heartbeat_interval.
-
#initial_backoff ⇒ Object
readonly
Returns the value of attribute initial_backoff.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#channel(topic, params: nil) ⇒ Object
Always returns a new Channel instance, matching supabase-py.
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object (also: #close)
- #get_channels ⇒ Object
-
#initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) ⇒ Client
constructor
A new instance of Client.
-
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
-
#on_reconnect_failed(&block) ⇒ Object
Register a callback fired exactly once when the background reconnect loop exhausts ‘max_retries` without re-establishing the socket.
-
#push(message) ⇒ Object
Used by Channel#send_push.
-
#remove_all_channels ⇒ Object
Unsubscribe every tracked channel and clear the registry.
- #remove_channel(channel) ⇒ Object
-
#send_heartbeat ⇒ Object
Manually emit a heartbeat.
-
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
-
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
Constructor Details
#initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) ⇒ Client
Returns a new instance of Client.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/supabase/realtime/client.rb', line 53 def initialize(url:, params: {}, transport: nil, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0, logger: nil) unless Transformers.is_ws_url(url) raise ArgumentError, "Invalid Realtime URL #{url.inspect}: expected ws://, wss://, http://, or https://" end @url = normalize_url(url, params) @params = params @access_token = params[:access_token] || params["access_token"] @channels = [] @socket = transport || socket || build_default_transport @timeout = timeout @ref = 0 @heartbeat_interval = heartbeat_interval @auto_reconnect = auto_reconnect @max_retries = max_retries @initial_backoff = initial_backoff @logger = logger @heartbeat_thread = nil @reconnect_thread = nil @intentionally_closed = false @send_buffer = [] # frames queued while no socket / not connected @send_buffer_mutex = Mutex.new @reconnect_failed_callbacks = [] attach_socket if @socket end |
Instance Attribute Details
#access_token ⇒ Object (readonly)
Returns the value of attribute access_token.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def access_token @access_token end |
#auto_reconnect ⇒ Object (readonly)
Returns the value of attribute auto_reconnect.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def auto_reconnect @auto_reconnect end |
#channels ⇒ Object (readonly)
Returns the value of attribute channels.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def channels @channels end |
#heartbeat_interval ⇒ Object (readonly)
Returns the value of attribute heartbeat_interval.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def heartbeat_interval @heartbeat_interval end |
#initial_backoff ⇒ Object (readonly)
Returns the value of attribute initial_backoff.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def initial_backoff @initial_backoff end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def logger @logger end |
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def max_retries @max_retries end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def params @params end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def socket @socket end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def timeout @timeout end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
34 35 36 |
# File 'lib/supabase/realtime/client.rb', line 34 def url @url end |
Instance Method Details
#channel(topic, params: nil) ⇒ Object
Always returns a new Channel instance, matching supabase-py. The client-side topic registry is a flat list, so multiple channels can share a topic (each with its own join_ref / subscription lifecycle). To look up an existing channel, walk ‘get_channels.find { |c| c.topic == … }`.
Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.
150 151 152 153 154 155 |
# File 'lib/supabase/realtime/client.rb', line 150 def channel(topic, params: nil) full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}" ch = Channel.new(full_topic, params: params, socket: self) @channels << ch ch end |
#connect ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/supabase/realtime/client.rb', line 114 def connect unless @socket @socket = build_default_transport attach_socket end @intentionally_closed = false @socket.connect self end |
#connected? ⇒ Boolean
137 138 139 |
# File 'lib/supabase/realtime/client.rb', line 137 def connected? @socket && @socket.connected? end |
#disconnect ⇒ Object Also known as: close
125 126 127 128 129 130 131 132 |
# File 'lib/supabase/realtime/client.rb', line 125 def disconnect @intentionally_closed = true stop_reconnect stop_heartbeat @socket&.close @channels.each { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) } self end |
#get_channels ⇒ Object
157 158 159 |
# File 'lib/supabase/realtime/client.rb', line 157 def get_channels @channels.dup end |
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
224 225 226 227 |
# File 'lib/supabase/realtime/client.rb', line 224 def next_ref @ref += 1 @ref.to_s end |
#on_reconnect_failed(&block) ⇒ Object
Register a callback fired exactly once when the background reconnect loop exhausts ‘max_retries` without re-establishing the socket. The callback receives the last underlying exception raised by the transport’s ‘connect` (or `nil` if no attempt was made — currently unreachable but kept for forward-compat).
Why this exists (US-003 / FR-4): supabase-py’s ‘connect()` is a single coroutine that raises on permanent failure. The rb port runs reconnect on a background thread, so a `raise` would die unobserved. This callback is the rb-shaped equivalent — see `lib/supabase/realtime/README.md` “Realtime reconnect: отличие от supabase-py”.
Multiple registrations are allowed; each fires in registration order. The user block is wrapped in Supabase::Realtime::CallbackSafety.safe so a raise inside one callback never blocks the next one (consistent with US-002).
102 103 104 105 |
# File 'lib/supabase/realtime/client.rb', line 102 def on_reconnect_failed(&block) @reconnect_failed_callbacks << block self end |
#push(message) ⇒ Object
Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/supabase/realtime/client.rb', line 232 def push() frame = JSON.generate( "event" => .event, "topic" => .topic, "payload" => .payload, "ref" => .ref, "join_ref" => .join_ref ) if connected? @socket.send(frame) else @send_buffer_mutex.synchronize { @send_buffer << frame } end end |
#remove_all_channels ⇒ Object
Unsubscribe every tracked channel and clear the registry. Iterates over a snapshot (‘@channels.dup`) so a channel that removes itself during `unsubscribe` doesn’t shift the array mid-loop. Idempotent: a follow-up call on an empty registry is a no-op.
172 173 174 175 176 |
# File 'lib/supabase/realtime/client.rb', line 172 def remove_all_channels @channels.dup.each { |ch| ch.unsubscribe } @channels.clear self end |
#remove_channel(channel) ⇒ Object
161 162 163 164 165 |
# File 'lib/supabase/realtime/client.rb', line 161 def remove_channel(channel) channel.unsubscribe @channels.delete(channel) @socket&.close if @channels.empty? end |
#send_heartbeat ⇒ Object
Manually emit a heartbeat. Real adapters typically wire this onto a timer.
211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/supabase/realtime/client.rb', line 211 def send_heartbeat return unless connected? @socket.send(JSON.generate( "event" => Types::ChannelEvents::HEARTBEAT, "topic" => Types::PHOENIX_TOPIC, "payload" => {}, "ref" => next_ref, "join_ref" => nil )) end |
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
Safe to call before ‘connect`: the token is always written to `@access_token` / `@params` so the next subscribe picks it up via `Channel#inject_postgres_changes_bindings`. The ACCESS_TOKEN frame fan-out only runs once the socket is actually connected.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/supabase/realtime/client.rb', line 185 def set_auth(token) @access_token = token @params["access_token"] = token if @params.is_a?(Hash) if connected? @channels.each do |channel| next unless channel.joined? msg = Message.new( event: Types::ChannelEvents::ACCESS_TOKEN, topic: channel.topic, payload: { "access_token" => token }, ref: next_ref ) @socket.send(JSON.generate( "event" => msg.event, "topic" => msg.topic, "payload" => msg.payload, "ref" => msg.ref, "join_ref" => nil )) end end end |
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
108 109 110 111 112 |
# File 'lib/supabase/realtime/client.rb', line 108 def use_socket(socket) @socket = socket attach_socket self end |