Class: Supabase::Realtime::Client
- Inherits:
-
Object
- Object
- Supabase::Realtime::Client
- Defined in:
- lib/supabase/realtime/client.rb
Overview
Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.
Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket.
socket = Supabase::Realtime::TestSocket.new
client = Supabase::Realtime::Client.new(
url: "wss://project.supabase.co/realtime/v1",
params: { apikey: key },
socket: socket
)
client.connect
channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe
Instance Attribute Summary collapse
-
#access_token ⇒ Object
readonly
Returns the value of attribute access_token.
-
#auto_reconnect ⇒ Object
readonly
Returns the value of attribute auto_reconnect.
-
#channels ⇒ Object
readonly
Returns the value of attribute channels.
-
#heartbeat_interval ⇒ Object
readonly
Returns the value of attribute heartbeat_interval.
-
#initial_backoff ⇒ Object
readonly
Returns the value of attribute initial_backoff.
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#channel(topic, params: nil) ⇒ Object
Get or create a Channel for the given topic.
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
- #get_channels ⇒ Object
-
#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client
constructor
A new instance of Client.
-
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
-
#push(message) ⇒ Object
Used by Channel#send_push.
- #remove_all_channels ⇒ Object
- #remove_channel(channel) ⇒ Object
-
#send_heartbeat ⇒ Object
Manually emit a heartbeat.
-
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
-
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
Constructor Details
#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client
Returns a new instance of Client.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/supabase/realtime/client.rb', line 43 def initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) @url = normalize_url(url, params) @params = params @access_token = params[:access_token] || params["access_token"] @channels = {} @socket = socket @timeout = timeout @ref = 0 @heartbeat_interval = heartbeat_interval @auto_reconnect = auto_reconnect @max_retries = max_retries @initial_backoff = initial_backoff @heartbeat_thread = nil @reconnect_thread = nil @intentionally_closed = false @send_buffer = [] # frames queued while no socket / not connected @send_buffer_mutex = Mutex.new attach_socket if @socket end |
Instance Attribute Details
#access_token ⇒ Object (readonly)
Returns the value of attribute access_token.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def access_token @access_token end |
#auto_reconnect ⇒ Object (readonly)
Returns the value of attribute auto_reconnect.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def auto_reconnect @auto_reconnect end |
#channels ⇒ Object (readonly)
Returns the value of attribute channels.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def channels @channels end |
#heartbeat_interval ⇒ Object (readonly)
Returns the value of attribute heartbeat_interval.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def heartbeat_interval @heartbeat_interval end |
#initial_backoff ⇒ Object (readonly)
Returns the value of attribute initial_backoff.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def initial_backoff @initial_backoff end |
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def max_retries @max_retries end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def params @params end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def socket @socket end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def timeout @timeout end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
31 32 33 |
# File 'lib/supabase/realtime/client.rb', line 31 def url @url end |
Instance Method Details
#channel(topic, params: nil) ⇒ Object
Get or create a Channel for the given topic. Subsequent calls with the same topic return the same Channel instance, matching phoenix.js semantics.
97 98 99 |
# File 'lib/supabase/realtime/client.rb', line 97 def channel(topic, params: nil) @channels[topic] ||= Channel.new(topic, params: params, socket: self) end |
#connect ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/supabase/realtime/client.rb', line 74 def connect raise Errors::RealtimeError, "no socket attached — call #use_socket(socket) first" unless @socket @intentionally_closed = false @socket.connect self end |
#connected? ⇒ Boolean
91 92 93 |
# File 'lib/supabase/realtime/client.rb', line 91 def connected? @socket && @socket.connected? end |
#disconnect ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/supabase/realtime/client.rb', line 82 def disconnect @intentionally_closed = true stop_reconnect stop_heartbeat @socket&.close @channels.each_value { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) } self end |
#get_channels ⇒ Object
101 102 103 |
# File 'lib/supabase/realtime/client.rb', line 101 def get_channels @channels.values end |
#next_ref ⇒ Object
Used by Channel — increments a shared counter so refs are unique per socket.
155 156 157 158 |
# File 'lib/supabase/realtime/client.rb', line 155 def next_ref @ref += 1 @ref.to_s end |
#push(message) ⇒ Object
Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/supabase/realtime/client.rb', line 163 def push() frame = JSON.generate( "event" => .event, "topic" => .topic, "payload" => .payload, "ref" => .ref, "join_ref" => .join_ref ) if connected? @socket.send(frame) else @send_buffer_mutex.synchronize { @send_buffer << frame } end end |
#remove_all_channels ⇒ Object
110 111 112 113 |
# File 'lib/supabase/realtime/client.rb', line 110 def remove_all_channels @channels.values.each { |ch| ch.unsubscribe } @channels.clear end |
#remove_channel(channel) ⇒ Object
105 106 107 108 |
# File 'lib/supabase/realtime/client.rb', line 105 def remove_channel(channel) channel.unsubscribe @channels.delete(channel.topic) end |
#send_heartbeat ⇒ Object
Manually emit a heartbeat. Real adapters typically wire this onto a timer.
142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/supabase/realtime/client.rb', line 142 def send_heartbeat return unless connected? @socket.send(JSON.generate( "event" => Types::ChannelEvents::HEARTBEAT, "topic" => Types::PHOENIX_TOPIC, "payload" => {}, "ref" => next_ref, "join_ref" => nil )) end |
#set_auth(token) ⇒ Object
Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/supabase/realtime/client.rb', line 117 def set_auth(token) @access_token = token @params["access_token"] = token if @params.is_a?(Hash) return unless @socket && @socket.connected? @channels.each_value do |channel| next unless channel.joined? msg = Message.new( event: Types::ChannelEvents::ACCESS_TOKEN, topic: channel.topic, payload: { "access_token" => token }, ref: next_ref ) @socket.send(JSON.generate( "event" => msg.event, "topic" => msg.topic, "payload" => msg.payload, "ref" => msg.ref, "join_ref" => nil )) end end |
#use_socket(socket) ⇒ Object
Plug in a transport after construction (e.g. a websocket-client-simple wrapper).
68 69 70 71 72 |
# File 'lib/supabase/realtime/client.rb', line 68 def use_socket(socket) @socket = socket attach_socket self end |