Class: Supabase::Realtime::Client
- Inherits:
-
Object
- Object
- Supabase::Realtime::Client
- Defined in:
- lib/supabase/realtime/client.rb
Overview
Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.
Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket.
socket = Supabase::Realtime::TestSocket.new
client = Supabase::Realtime::Client.new(
url: "wss://project.supabase.co/realtime/v1",
params: { apikey: key },
socket: socket
)
client.connect
channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe
Instance Attribute Summary collapse
-
#access_token ⇒ Object
readonly
Returns the value of attribute access_token.
-
#auto_reconnect ⇒ Object
readonly
Returns the value of attribute auto_reconnect.
-
#channels ⇒ Object
readonly
Returns the value of attribute channels.
-
#heartbeat_interval ⇒ Object
readonly
Returns the value of attribute heartbeat_interval.
-
#initial_backoff ⇒ Object
readonly
Returns the value of attribute initial_backoff.
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#channel(topic, params: nil) ⇒ Object
Get or create a Channel for the given topic.
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object (also: #close)
- #get_channels ⇒ Object
-
#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client
constructor
A new instance of Client.
-
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
-
#push(message) ⇒ Object
Used by Channel#send_push.
- #remove_all_channels ⇒ Object
- #remove_channel(channel) ⇒ Object
-
#send_heartbeat ⇒ Object
Manually emit a heartbeat.
-
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
-
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
Constructor Details
#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client
Returns a new instance of Client.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/supabase/realtime/client.rb', line 43 def initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) @url = normalize_url(url, params) @params = params @access_token = params[:access_token] || params["access_token"] @channels = {} @socket = socket @timeout = timeout @ref = 0 @heartbeat_interval = heartbeat_interval @auto_reconnect = auto_reconnect @max_retries = max_retries @initial_backoff = initial_backoff @heartbeat_thread = nil @reconnect_thread = nil @intentionally_closed = false @send_buffer = [] # frames queued while no socket / not connected @send_buffer_mutex = Mutex.new attach_socket if @socket end |
Instance Attribute Details
#access_token ⇒ Object (readonly)
Returns the value of attribute access_token.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def access_token @access_token end |
#auto_reconnect ⇒ Object (readonly)
Returns the value of attribute auto_reconnect.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def auto_reconnect @auto_reconnect end |
#channels ⇒ Object (readonly)
Returns the value of attribute channels.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def channels @channels end |
#heartbeat_interval ⇒ Object (readonly)
Returns the value of attribute heartbeat_interval.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def heartbeat_interval @heartbeat_interval end |
#initial_backoff ⇒ Object (readonly)
Returns the value of attribute initial_backoff.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def initial_backoff @initial_backoff end |
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def max_retries @max_retries end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def params @params end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def socket @socket end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def timeout @timeout end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def url @url end |
Instance Method Details
#channel(topic, params: nil) ⇒ Object
Get or create a Channel for the given topic. Subsequent calls with the same topic return the same Channel instance, matching phoenix.js semantics.
Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.
105 106 107 108 |
# File 'lib/supabase/realtime/client.rb', line 105 def channel(topic, params: nil) full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}" @channels[full_topic] ||= Channel.new(full_topic, params: params, socket: self) end |
#connect ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/supabase/realtime/client.rb', line 74 def connect raise Errors::RealtimeError, "no socket attached — call #use_socket(socket) first" unless @socket @intentionally_closed = false @socket.connect self end |
#connected? ⇒ Boolean
94 95 96 |
# File 'lib/supabase/realtime/client.rb', line 94 def connected? @socket && @socket.connected? end |
#disconnect ⇒ Object Also known as: close
82 83 84 85 86 87 88 89 |
# File 'lib/supabase/realtime/client.rb', line 82 def disconnect @intentionally_closed = true stop_reconnect stop_heartbeat @socket&.close @channels.each_value { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) } self end |
#get_channels ⇒ Object
110 111 112 |
# File 'lib/supabase/realtime/client.rb', line 110 def get_channels @channels.values end |
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
164 165 166 167 |
# File 'lib/supabase/realtime/client.rb', line 164 def next_ref @ref += 1 @ref.to_s end |
#push(message) ⇒ Object
Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/supabase/realtime/client.rb', line 172 def push() frame = JSON.generate( "event" => .event, "topic" => .topic, "payload" => .payload, "ref" => .ref, "join_ref" => .join_ref ) if connected? @socket.send(frame) else @send_buffer_mutex.synchronize { @send_buffer << frame } end end |
#remove_all_channels ⇒ Object
119 120 121 122 |
# File 'lib/supabase/realtime/client.rb', line 119 def remove_all_channels @channels.values.each { |ch| ch.unsubscribe } @channels.clear end |
#remove_channel(channel) ⇒ Object
114 115 116 117 |
# File 'lib/supabase/realtime/client.rb', line 114 def remove_channel(channel) channel.unsubscribe @channels.delete(channel.topic) end |
#send_heartbeat ⇒ Object
Manually emit a heartbeat. Real adapters typically wire this onto a timer.
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/supabase/realtime/client.rb', line 151 def send_heartbeat return unless connected? @socket.send(JSON.generate( "event" => Types::ChannelEvents::HEARTBEAT, "topic" => Types::PHOENIX_TOPIC, "payload" => {}, "ref" => next_ref, "join_ref" => nil )) end |
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/supabase/realtime/client.rb', line 126 def set_auth(token) @access_token = token @params["access_token"] = token if @params.is_a?(Hash) return unless @socket && @socket.connected? @channels.each_value do |channel| next unless channel.joined? msg = Message.new( event: Types::ChannelEvents::ACCESS_TOKEN, topic: channel.topic, payload: { "access_token" => token }, ref: next_ref ) @socket.send(JSON.generate( "event" => msg.event, "topic" => msg.topic, "payload" => msg.payload, "ref" => msg.ref, "join_ref" => nil )) end end |
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
68 69 70 71 72 |
# File 'lib/supabase/realtime/client.rb', line 68 def use_socket(socket) @socket = socket attach_socket self end |