Class: Supabase::Realtime::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/client.rb

Overview

Top-level Realtime client. Owns one Socket, multiplexes Channels onto it, and dispatches inbound frames to whichever channel owns the topic.

Bring your own Socket (e.g. websocket-client-simple adapter or async-websocket adapter). For unit tests, pass a TestSocket.

socket   = Supabase::Realtime::TestSocket.new
client   = Supabase::Realtime::Client.new(
  url: "wss://project.supabase.co/realtime/v1",
  params: { apikey: key },
  socket: socket
)
client.connect

channel = client.channel("realtime:public:users")
channel.on_postgres_changes("*", schema: "public", table: "users") { |p| puts p }
channel.subscribe

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)

    WebSocket endpoint (ws:// or wss://). Plain http(s) are upgraded.

  • params (Hash) (defaults to: {})

    query-string params merged onto the URL (e.g. apikey/access_token)

  • socket (Socket, nil) (defaults to: nil)

    inject your own transport (defaults to nil — caller wires it up)

  • timeout (Numeric) (defaults to: Types::DEFAULT_TIMEOUT_SECONDS)

    default per-push timeout (seconds)



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/supabase/realtime/client.rb', line 37

def initialize(url:, params: {}, socket: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS)
  @url     = normalize_url(url, params)
  @params  = params
  @access_token = params[:access_token] || params["access_token"]
  @channels = {}
  @socket   = socket
  @timeout  = timeout
  @ref      = 0

  attach_socket if @socket
end

Instance Attribute Details

#access_tokenObject (readonly)

Returns the value of attribute access_token.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def access_token
  @access_token
end

#channelsObject (readonly)

Returns the value of attribute channels.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def channels
  @channels
end

#paramsObject (readonly)

Returns the value of attribute params.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def params
  @params
end

#socketObject (readonly)

Returns the value of attribute socket.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def socket
  @socket
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def timeout
  @timeout
end

#urlObject (readonly)

Returns the value of attribute url.



31
32
33
# File 'lib/supabase/realtime/client.rb', line 31

def url
  @url
end

Instance Method Details

#channel(topic, params: nil) ⇒ Object

Get or create a Channel for the given topic. Subsequent calls with the same topic return the same Channel instance, matching phoenix.js semantics.



75
76
77
# File 'lib/supabase/realtime/client.rb', line 75

def channel(topic, params: nil)
  @channels[topic] ||= Channel.new(topic, params: params, socket: self)
end

#connectObject



56
57
58
59
60
61
# File 'lib/supabase/realtime/client.rb', line 56

def connect
  raise Errors::RealtimeError, "no socket attached — call #use_socket(socket) first" unless @socket

  @socket.connect
  self
end

#connected?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/supabase/realtime/client.rb', line 69

def connected?
  @socket && @socket.connected?
end

#disconnectObject



63
64
65
66
67
# File 'lib/supabase/realtime/client.rb', line 63

def disconnect
  @socket&.close
  @channels.each_value { |ch| ch.instance_variable_set(:@state, Types::ChannelStates::CLOSED) }
  self
end

#get_channelsObject



79
80
81
# File 'lib/supabase/realtime/client.rb', line 79

def get_channels
  @channels.values
end

#next_refObject

Used by Channel — increments a shared counter so refs are unique per socket.



133
134
135
136
# File 'lib/supabase/realtime/client.rb', line 133

def next_ref
  @ref += 1
  @ref.to_s
end

#push(message) ⇒ Object

Used by Channel#send_push.



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/supabase/realtime/client.rb', line 139

def push(message)
  return unless @socket

  @socket.send(JSON.generate(
    "event"    => message.event,
    "topic"    => message.topic,
    "payload"  => message.payload,
    "ref"      => message.ref,
    "join_ref" => message.join_ref
  ))
end

#remove_all_channelsObject



88
89
90
91
# File 'lib/supabase/realtime/client.rb', line 88

def remove_all_channels
  @channels.values.each { |ch| ch.unsubscribe }
  @channels.clear
end

#remove_channel(channel) ⇒ Object



83
84
85
86
# File 'lib/supabase/realtime/client.rb', line 83

def remove_channel(channel)
  channel.unsubscribe
  @channels.delete(channel.topic)
end

#send_heartbeatObject

Manually emit a heartbeat. Real adapters typically wire this onto a timer.



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/supabase/realtime/client.rb', line 120

def send_heartbeat
  return unless connected?

  @socket.send(JSON.generate(
    "event"    => Types::ChannelEvents::HEARTBEAT,
    "topic"    => Types::PHOENIX_TOPIC,
    "payload"  => {},
    "ref"      => next_ref,
    "join_ref" => nil
  ))
end

#set_auth(token) ⇒ Object

Update the access token, send it to every joined channel so RLS reflects the new auth context, and remember it for future joins.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/supabase/realtime/client.rb', line 95

def set_auth(token)
  @access_token = token
  @params["access_token"] = token if @params.is_a?(Hash)
  return unless @socket && @socket.connected?

  @channels.each_value do |channel|
    next unless channel.joined?

    msg = Message.new(
      event:   Types::ChannelEvents::ACCESS_TOKEN,
      topic:   channel.topic,
      payload: { "access_token" => token },
      ref:     next_ref
    )
    @socket.send(JSON.generate(
      "event"    => msg.event,
      "topic"    => msg.topic,
      "payload"  => msg.payload,
      "ref"      => msg.ref,
      "join_ref" => nil
    ))
  end
end

#use_socket(socket) ⇒ Object

Plug in a transport after construction (e.g. a websocket-client-simple wrapper).



50
51
52
53
54
# File 'lib/supabase/realtime/client.rb', line 50

def use_socket(socket)
  @socket = socket
  attach_socket
  self
end