Class: Supabase::Realtime::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/client.rb

Overview

Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.

Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket.

socket   = Supabase::Realtime::TestSocket.new
client   = Supabase::Realtime::Client.new(
  url: "wss://project.supabase.co/realtime/v1",
  params: { apikey: key },
  socket: socket
)
client.connect

channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)

    WebSocket endpoint (ws:// or wss://). Plain http(s) are upgraded.

  • params (Hash) (defaults to: {})

    query-string params merged onto the URL (e.g. apikey/access_token)

  • socket (Socket, nil) (defaults to: nil)

    inject your own transport (defaults to nil — caller wires it up)

  • timeout (Numeric) (defaults to: Types::DEFAULT_TIMEOUT_SECONDS)

    default per-push timeout (seconds)

  • heartbeat_interval (Numeric) (defaults to: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS)

    seconds between automatic heartbeat pushes (0 disables)

  • auto_reconnect (Boolean) (defaults to: true)

    reconnect on unexpected socket close

  • max_retries (Integer) (defaults to: 5)

    maximum reconnect attempts before giving up

  • initial_backoff (Numeric) (defaults to: 1.0)

    seconds of delay before the first reconnect attempt; doubles each attempt up to a 60s cap (matches supabase-py)



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/supabase/realtime/client.rb', line 45

def initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS,
               heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
               auto_reconnect: true, max_retries: 5, initial_backoff: 1.0)
  unless Transformers.is_ws_url(url)
    raise ArgumentError,
          "Invalid Realtime URL #{url.inspect}: expected ws://, wss://, http://, or https://"
  end

  @url     = normalize_url(url, params)
  @params  = params
  @access_token = params[:access_token] || params["access_token"]
  @channels = {}
  @socket   = socket
  @timeout  = timeout
  @ref      = 0

  @heartbeat_interval = heartbeat_interval
  @auto_reconnect     = auto_reconnect
  @max_retries        = max_retries
  @initial_backoff    = initial_backoff
  @heartbeat_thread   = nil
  @reconnect_thread   = nil
  @intentionally_closed = false
  @send_buffer        = [] # frames queued while no socket / not connected
  @send_buffer_mutex  = Mutex.new

  attach_socket if @socket
end

Instance Attribute Details

#access_tokenObject (readonly)

Returns the value of attribute access_token.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def access_token
  @access_token
end

#auto_reconnectObject (readonly)

Returns the value of attribute auto_reconnect.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def auto_reconnect
  @auto_reconnect
end

#channelsObject (readonly)

Returns the value of attribute channels.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def channels
  @channels
end

#heartbeat_intervalObject (readonly)

Returns the value of attribute heartbeat_interval.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def heartbeat_interval
  @heartbeat_interval
end

#initial_backoffObject (readonly)

Returns the value of attribute initial_backoff.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def initial_backoff
  @initial_backoff
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def max_retries
  @max_retries
end

#paramsObject (readonly)

Returns the value of attribute params.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def params
  @params
end

#socketObject (readonly)

Returns the value of attribute socket.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def socket
  @socket
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def timeout
  @timeout
end

#urlObject (readonly)

Returns the value of attribute url.



33
34
35
# File 'lib/supabase/realtime/client.rb', line 33

def url
  @url
end

Instance Method Details

#channel(topic, params: nil) ⇒ Object

Get or create a Channel for the given topic. Subsequent calls with the same topic return the same Channel instance, matching phoenix.js semantics.

Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.



112
113
114
115
# File 'lib/supabase/realtime/client.rb', line 112

def channel(topic, params: nil)
  full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}"
  @channels[full_topic] ||= Channel.new(full_topic, params: params, socket: self)
end

#connectObject



81
82
83
84
85
86
87
# File 'lib/supabase/realtime/client.rb', line 81

def connect
  raise Errors::RealtimeError, "no socket attached — call #use_socket(socket) first" unless @socket

  @intentionally_closed = false
  @socket.connect
  self
end

#connected?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/supabase/realtime/client.rb', line 101

def connected?
  @socket && @socket.connected?
end

#disconnectObject Also known as: close



89
90
91
92
93
94
95
96
# File 'lib/supabase/realtime/client.rb', line 89

def disconnect
  @intentionally_closed = true
  stop_reconnect
  stop_heartbeat
  @socket&.close
  @channels.each_value { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) }
  self
end

#get_channelsObject



117
118
119
# File 'lib/supabase/realtime/client.rb', line 117

def get_channels
  @channels.values
end

#next_refObject

Used by Channel — increments a shared counter so refs are unique per socket.



171
172
173
174
# File 'lib/supabase/realtime/client.rb', line 171

def next_ref
  @ref += 1
  @ref.to_s
end

#push(message) ⇒ Object

Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/supabase/realtime/client.rb', line 179

def push(message)
  frame = JSON.generate(
    "event"    => message.event,
    "topic"    => message.topic,
    "payload"  => message.payload,
    "ref"      => message.ref,
    "join_ref" => message.join_ref
  )

  if connected?
    @socket.send(frame)
  else
    @send_buffer_mutex.synchronize { @send_buffer << frame }
  end
end

#remove_all_channelsObject



126
127
128
129
# File 'lib/supabase/realtime/client.rb', line 126

def remove_all_channels
  @channels.values.each { |ch| ch.unsubscribe }
  @channels.clear
end

#remove_channel(channel) ⇒ Object



121
122
123
124
# File 'lib/supabase/realtime/client.rb', line 121

def remove_channel(channel)
  channel.unsubscribe
  @channels.delete(channel.topic)
end

#send_heartbeatObject

Manually emit a heartbeat. Real adapters typically wire this onto a timer.



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/supabase/realtime/client.rb', line 158

def send_heartbeat
  return unless connected?

  @socket.send(JSON.generate(
    "event"    => Types::ChannelEvents::HEARTBEAT,
    "topic"    => Types::PHOENIX_TOPIC,
    "payload"  => {},
    "ref"      => next_ref,
    "join_ref" => nil
  ))
end

#set_auth(token) ⇒ Object

Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/supabase/realtime/client.rb', line 133

def set_auth(token)
  @access_token = token
  @params["access_token"] = token if @params.is_a?(Hash)
  return unless @socket && @socket.connected?

  @channels.each_value do |channel|
    next unless channel.joined?

    msg = Message.new(
      event:   Types::ChannelEvents::ACCESS_TOKEN,
      topic:   channel.topic,
      payload: { "access_token" => token },
      ref:     next_ref
    )
    @socket.send(JSON.generate(
      "event"    => msg.event,
      "topic"    => msg.topic,
      "payload"  => msg.payload,
      "ref"      => msg.ref,
      "join_ref" => nil
    ))
  end
end

#use_socket(socket) ⇒ Object

Plug in a transport after construction (e.g. a websocket-client-simple wrapper).



75
76
77
78
79
# File 'lib/supabase/realtime/client.rb', line 75

def use_socket(socket)
  @socket = socket
  attach_socket
  self
end