Class: Supabase::Realtime::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/client.rb

Overview

Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.

Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket.

socket   = Supabase::Realtime::TestSocket.new
client   = Supabase::Realtime::Client.new(
  url: "wss://project.supabase.co/realtime/v1",
  params: { apikey: key },
  socket: socket
)
client.connect

channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)

    WebSocket endpoint (ws:// or wss://). Plain http(s) are upgraded.

  • params (Hash) (defaults to: {})

    query-string params merged onto the URL (e.g. apikey/access_token)

  • socket (Socket, nil) (defaults to: nil)

    inject your own transport (defaults to nil — caller wires it up)

  • timeout (Numeric) (defaults to: Types::DEFAULT_TIMEOUT_SECONDS)

    default per-push timeout (seconds)

  • heartbeat_interval (Numeric) (defaults to: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS)

    seconds between automatic heartbeat pushes (0 disables)

  • auto_reconnect (Boolean) (defaults to: true)

    reconnect on unexpected socket close

  • max_retries (Integer) (defaults to: 5)

    maximum reconnect attempts before giving up

  • initial_backoff (Numeric) (defaults to: 1.0)

    seconds of delay before the first reconnect attempt; doubles each attempt up to a 60s cap (matches supabase-py)



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/supabase/realtime/client.rb', line 43

def initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS,
               heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
               auto_reconnect: true, max_retries: 5, initial_backoff: 1.0)
  @url     = normalize_url(url, params)
  @params  = params
  @access_token = params[:access_token] || params["access_token"]
  @channels = {}
  @socket   = socket
  @timeout  = timeout
  @ref      = 0

  @heartbeat_interval = heartbeat_interval
  @auto_reconnect     = auto_reconnect
  @max_retries        = max_retries
  @initial_backoff    = initial_backoff
  @heartbeat_thread   = nil
  @reconnect_thread   = nil
  @intentionally_closed = false
  @send_buffer        = [] # frames queued while no socket / not connected
  @send_buffer_mutex  = Mutex.new

  attach_socket if @socket
end

Instance Attribute Details

#access_tokenObject (readonly)

Returns the value of attribute access_token.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def access_token
  @access_token
end

#auto_reconnectObject (readonly)

Returns the value of attribute auto_reconnect.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def auto_reconnect
  @auto_reconnect
end

#channelsObject (readonly)

Returns the value of attribute channels.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def channels
  @channels
end

#heartbeat_intervalObject (readonly)

Returns the value of attribute heartbeat_interval.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def heartbeat_interval
  @heartbeat_interval
end

#initial_backoffObject (readonly)

Returns the value of attribute initial_backoff.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def initial_backoff
  @initial_backoff
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def max_retries
  @max_retries
end

#paramsObject (readonly)

Returns the value of attribute params.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def params
  @params
end

#socketObject (readonly)

Returns the value of attribute socket.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def socket
  @socket
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def timeout
  @timeout
end

#urlObject (readonly)

Returns the value of attribute url.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def url
  @url
end

Instance Method Details

#channel(topic, params: nil) ⇒ Object

Get or create a Channel for the given topic. Subsequent calls with the same topic return the same Channel instance, matching phoenix.js semantics.

Topic names are auto-prefixed with ‘“realtime:”` to match supabase-py: `client.channel(“public:users”)` reaches the same channel as `client.channel(“realtime:public:users”)`. Pre-prefixed topics are left alone so existing code keeps working.



105
106
107
108
# File 'lib/supabase/realtime/client.rb', line 105

def channel(topic, params: nil)
  full_topic = topic.start_with?("realtime:") ? topic : "realtime:#{topic}"
  @channels[full_topic] ||= Channel.new(full_topic, params: params, socket: self)
end

#connectObject



74
75
76
77
78
79
80
# File 'lib/supabase/realtime/client.rb', line 74

def connect
  raise Errors::RealtimeError, "no socket attached — call #use_socket(socket) first" unless @socket

  @intentionally_closed = false
  @socket.connect
  self
end

#connected?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/supabase/realtime/client.rb', line 94

def connected?
  @socket && @socket.connected?
end

#disconnectObject Also known as: close



82
83
84
85
86
87
88
89
# File 'lib/supabase/realtime/client.rb', line 82

def disconnect
  @intentionally_closed = true
  stop_reconnect
  stop_heartbeat
  @socket&.close
  @channels.each_value { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) }
  self
end

#get_channelsObject



110
111
112
# File 'lib/supabase/realtime/client.rb', line 110

def get_channels
  @channels.values
end

#next_refObject

Used by Channel — increments a shared counter so refs are unique per socket.



164
165
166
167
# File 'lib/supabase/realtime/client.rb', line 164

def next_ref
  @ref += 1
  @ref.to_s
end

#push(message) ⇒ Object

Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/supabase/realtime/client.rb', line 172

def push(message)
  frame = JSON.generate(
    "event"    => message.event,
    "topic"    => message.topic,
    "payload"  => message.payload,
    "ref"      => message.ref,
    "join_ref" => message.join_ref
  )

  if connected?
    @socket.send(frame)
  else
    @send_buffer_mutex.synchronize { @send_buffer << frame }
  end
end

#remove_all_channelsObject



119
120
121
122
# File 'lib/supabase/realtime/client.rb', line 119

def remove_all_channels
  @channels.values.each { |ch| ch.unsubscribe }
  @channels.clear
end

#remove_channel(channel) ⇒ Object



114
115
116
117
# File 'lib/supabase/realtime/client.rb', line 114

def remove_channel(channel)
  channel.unsubscribe
  @channels.delete(channel.topic)
end

#send_heartbeatObject

Manually emit a heartbeat. Real adapters typically wire this onto a timer.



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/supabase/realtime/client.rb', line 151

def send_heartbeat
  return unless connected?

  @socket.send(JSON.generate(
    "event"    => Types::ChannelEvents::HEARTBEAT,
    "topic"    => Types::PHOENIX_TOPIC,
    "payload"  => {},
    "ref"      => next_ref,
    "join_ref" => nil
  ))
end

#set_auth(token) ⇒ Object

Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/supabase/realtime/client.rb', line 126

def set_auth(token)
  @access_token = token
  @params["access_token"] = token if @params.is_a?(Hash)
  return unless @socket && @socket.connected?

  @channels.each_value do |channel|
    next unless channel.joined?

    msg = Message.new(
      event:   Types::ChannelEvents::ACCESS_TOKEN,
      topic:   channel.topic,
      payload: { "access_token" => token },
      ref:     next_ref
    )
    @socket.send(JSON.generate(
      "event"    => msg.event,
      "topic"    => msg.topic,
      "payload"  => msg.payload,
      "ref"      => msg.ref,
      "join_ref" => nil
    ))
  end
end

#use_socket(socket) ⇒ Object

Plug in a transport after construction (e.g. a websocket-client-simple wrapper).



68
69
70
71
72
# File 'lib/supabase/realtime/client.rb', line 68

def use_socket(socket)
  @socket = socket
  attach_socket
  self
end