Class: Supabase::Realtime::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/client.rb

Overview

Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.

Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket.

socket   = Supabase::Realtime::TestSocket.new
client   = Supabase::Realtime::Client.new(
  url: "wss://project.supabase.co/realtime/v1",
  params: { apikey: key },
  socket: socket
)
client.connect

channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS, heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS, auto_reconnect: true, max_retries: 5, initial_backoff: 1.0) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)

    WebSocket endpoint (ws:// or wss://). Plain http(s) are upgraded.

  • params (Hash) (defaults to: {})

    query-string params merged onto the URL (e.g. apikey/access_token)

  • socket (Socket, nil) (defaults to: nil)

    inject your own transport (defaults to nil — caller wires it up)

  • timeout (Numeric) (defaults to: Types::DEFAULT_TIMEOUT_SECONDS)

    default per-push timeout (seconds)

  • heartbeat_interval (Numeric) (defaults to: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS)

    seconds between automatic heartbeat pushes (0 disables)

  • auto_reconnect (Boolean) (defaults to: true)

    reconnect on unexpected socket close

  • max_retries (Integer) (defaults to: 5)

    maximum reconnect attempts before giving up

  • initial_backoff (Numeric) (defaults to: 1.0)

    seconds of delay before the first reconnect attempt; doubles each attempt up to a 60s cap (matches supabase-py)



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/supabase/realtime/client.rb', line 43

def initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS,
               heartbeat_interval: Types::DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
               auto_reconnect: true, max_retries: 5, initial_backoff: 1.0)
  @url     = normalize_url(url, params)
  @params  = params
  @access_token = params[:access_token] || params["access_token"]
  @channels = {}
  @socket   = socket
  @timeout  = timeout
  @ref      = 0

  @heartbeat_interval = heartbeat_interval
  @auto_reconnect     = auto_reconnect
  @max_retries        = max_retries
  @initial_backoff    = initial_backoff
  @heartbeat_thread   = nil
  @reconnect_thread   = nil
  @intentionally_closed = false
  @send_buffer        = [] # frames queued while no socket / not connected
  @send_buffer_mutex  = Mutex.new

  attach_socket if @socket
end

Instance Attribute Details

#access_tokenObject (readonly)

Returns the value of attribute access_token.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def access_token
  @access_token
end

#auto_reconnectObject (readonly)

Returns the value of attribute auto_reconnect.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def auto_reconnect
  @auto_reconnect
end

#channelsObject (readonly)

Returns the value of attribute channels.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def channels
  @channels
end

#heartbeat_intervalObject (readonly)

Returns the value of attribute heartbeat_interval.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def heartbeat_interval
  @heartbeat_interval
end

#initial_backoffObject (readonly)

Returns the value of attribute initial_backoff.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def initial_backoff
  @initial_backoff
end

#max_retriesObject (readonly)

Returns the value of attribute max_retries.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def max_retries
  @max_retries
end

#paramsObject (readonly)

Returns the value of attribute params.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def params
  @params
end

#socketObject (readonly)

Returns the value of attribute socket.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def socket
  @socket
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def timeout
  @timeout
end

#urlObject (readonly)

Returns the value of attribute url.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def url
  @url
end

Instance Method Details

#channel(topic, params: nil) ⇒ Object

Get or create a Channel for the given topic. Subsequent calls with the same topic return the same Channel instance, matching phoenix.js semantics.



97
98
99
# File 'lib/supabase/realtime/client.rb', line 97

def channel(topic, params: nil)
  @channels[topic] ||= Channel.new(topic, params: params, socket: self)
end

#connectObject



74
75
76
77
78
79
80
# File 'lib/supabase/realtime/client.rb', line 74

def connect
  raise Errors::RealtimeError, "no socket attached — call #use_socket(socket) first" unless @socket

  @intentionally_closed = false
  @socket.connect
  self
end

#connected?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/supabase/realtime/client.rb', line 91

def connected?
  @socket && @socket.connected?
end

#disconnectObject



82
83
84
85
86
87
88
89
# File 'lib/supabase/realtime/client.rb', line 82

def disconnect
  @intentionally_closed = true
  stop_reconnect
  stop_heartbeat
  @socket&.close
  @channels.each_value { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) }
  self
end

#get_channelsObject



101
102
103
# File 'lib/supabase/realtime/client.rb', line 101

def get_channels
  @channels.values
end

#next_refObject

Used by Channel — increments a shared counter so refs are unique per socket.



155
156
157
158
# File 'lib/supabase/realtime/client.rb', line 155

def next_ref
  @ref += 1
  @ref.to_s
end

#push(message) ⇒ Object

Used by Channel#send_push. If the socket isn’t connected yet, the frame is buffered and flushed automatically when the socket opens — matches supabase-py’s send_buffer so offline pushes aren’t silently dropped.



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/supabase/realtime/client.rb', line 163

def push(message)
  frame = JSON.generate(
    "event"    => message.event,
    "topic"    => message.topic,
    "payload"  => message.payload,
    "ref"      => message.ref,
    "join_ref" => message.join_ref
  )

  if connected?
    @socket.send(frame)
  else
    @send_buffer_mutex.synchronize { @send_buffer << frame }
  end
end

#remove_all_channelsObject



110
111
112
113
# File 'lib/supabase/realtime/client.rb', line 110

def remove_all_channels
  @channels.values.each { |ch| ch.unsubscribe }
  @channels.clear
end

#remove_channel(channel) ⇒ Object



105
106
107
108
# File 'lib/supabase/realtime/client.rb', line 105

def remove_channel(channel)
  channel.unsubscribe
  @channels.delete(channel.topic)
end

#send_heartbeatObject

Manually emit a heartbeat. Real adapters typically wire this onto a timer.



142
143
144
145
146
147
148
149
150
151
152
# File 'lib/supabase/realtime/client.rb', line 142

def send_heartbeat
  return unless connected?

  @socket.send(JSON.generate(
    "event"    => Types::ChannelEvents::HEARTBEAT,
    "topic"    => Types::PHOENIX_TOPIC,
    "payload"  => {},
    "ref"      => next_ref,
    "join_ref" => nil
  ))
end

#set_auth(token) ⇒ Object

Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/supabase/realtime/client.rb', line 117

def set_auth(token)
  @access_token = token
  @params["access_token"] = token if @params.is_a?(Hash)
  return unless @socket && @socket.connected?

  @channels.each_value do |channel|
    next unless channel.joined?

    msg = Message.new(
      event:   Types::ChannelEvents::ACCESS_TOKEN,
      topic:   channel.topic,
      payload: { "access_token" => token },
      ref:     next_ref
    )
    @socket.send(JSON.generate(
      "event"    => msg.event,
      "topic"    => msg.topic,
      "payload"  => msg.payload,
      "ref"      => msg.ref,
      "join_ref" => nil
    ))
  end
end

#use_socket(socket) ⇒ Object

Plug in a transport after construction (e.g. a websocket-client-simple wrapper).



68
69
70
71
72
# File 'lib/supabase/realtime/client.rb', line 68

def use_socket(socket)
  @socket = socket
  attach_socket
  self
end