Class: Supabase::Realtime::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/channel.rb

Overview

A topic subscription on a shared Socket connection. Each Channel:

  • tracks its own lifecycle state (closed/joining/joined/leaving/errored)

  • holds the listener callbacks for postgres changes / broadcast / presence / system

  • dispatches inbound messages from the Client to those callbacks

  • owns its Presence sync state

Should be constructed via Supabase::Realtime::Client#channel, not directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, params: nil, socket: nil) ⇒ Channel

Returns a new instance of Channel.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/supabase/realtime/channel.rb', line 23

def initialize(topic, params: nil, socket: nil)
  @topic   = topic
  @params  = params || default_params
  @socket  = socket
  @state   = Types::ChannelStates::CLOSED
  @joined_once = false
  @presence = Presence.new(logger: logger)

  @broadcast_callbacks        = []   # [{ event:, callback: }]
  @postgres_changes_callbacks = []   # [{ event:, schema:, table:, filter:, callback: }]
  @system_callbacks           = []
  @close_callbacks            = []
  @error_callbacks            = []

  @pending_pushes = {} # ref => Push, for matching phx_reply
  @push_buffer    = [] # outbound pushes queued while not yet joined

  @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params)
  @subscribe_callback = nil

  # py rejoin uses `lambda tries: 2**tries` with no cap
  # (`realtime/_async/channel.py:109-111`). Timer (US-006) bumps `tries`
  # before invoking this lambda with `tries + 1`, so the curve for the
  # first five attempts is 4, 8, 16, 32, 64 s — identical to py.
  @rejoin_timer = Timer.new(
    callback: -> { rejoin if @joined_once && !leaving? && !closed? },
    backoff:  ->(tries) { 2.0**tries }
  )

  @join_push
    .receive(Types::AckStatus::OK)      { |p| on_join_ok(p) }
    .receive(Types::AckStatus::ERROR)   { |p| on_join_error(p) }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout }
end

Instance Attribute Details

#join_pushObject (readonly)

Returns the value of attribute join_push.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def join_push
  @join_push
end

#paramsObject (readonly)

Returns the value of attribute params.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def params
  @params
end

#pending_pushesObject (readonly)

Returns the value of attribute pending_pushes.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def pending_pushes
  @pending_pushes
end

#presenceObject (readonly)

Returns the value of attribute presence.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def presence
  @presence
end

#rejoin_timerObject (readonly)

Returns the value of attribute rejoin_timer.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def rejoin_timer
  @rejoin_timer
end

#stateObject (readonly)

Returns the value of attribute state.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def state
  @state
end

#topicObject (readonly)

Returns the value of attribute topic.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def topic
  @topic
end

Instance Method Details

#closed?Boolean

—– State predicates —–

Returns:

  • (Boolean)


70
# File 'lib/supabase/realtime/channel.rb', line 70

def closed?;  @state == Types::ChannelStates::CLOSED;  end

#dispatch(message) ⇒ Object

Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/supabase/realtime/channel.rb', line 245

def dispatch(message)
  return false unless message.topic == @topic

  case message.event
  when Types::ChannelEvents::REPLY
    dispatch_reply(message)
  when Types::ChannelEvents::POSTGRES_CHANGES
    dispatch_postgres_changes(message)
  when Types::ChannelEvents::BROADCAST
    dispatch_broadcast(message)
  when Types::ChannelEvents::PRESENCE_STATE
    @presence.sync_state(message.payload)
  when Types::ChannelEvents::PRESENCE_DIFF
    @presence.sync_diff(message.payload)
  when Types::ChannelEvents::SYSTEM
    @system_callbacks.each do |cb|
      CallbackSafety.safe(logger, "system") { cb.call(message.payload) }
    end
  when Types::ChannelEvents::CLOSE
    @state = Types::ChannelStates::CLOSED
    @close_callbacks.each do |cb|
      CallbackSafety.safe(logger, "phx_close") { cb.call(message.payload) }
    end
  when Types::ChannelEvents::ERROR
    @state = Types::ChannelStates::ERRORED
    @error_callbacks.each do |cb|
      CallbackSafety.safe(logger, "phx_error") { cb.call(message.payload) }
    end
  end

  true
end

#errored?Boolean

Returns:

  • (Boolean)


71
# File 'lib/supabase/realtime/channel.rb', line 71

def errored?; @state == Types::ChannelStates::ERRORED; end

#joined?Boolean

Returns:

  • (Boolean)


72
# File 'lib/supabase/realtime/channel.rb', line 72

def joined?;  @state == Types::ChannelStates::JOINED;  end

#joining?Boolean

Returns:

  • (Boolean)


73
# File 'lib/supabase/realtime/channel.rb', line 73

def joining?; @state == Types::ChannelStates::JOINING; end

#leaving?Boolean

Returns:

  • (Boolean)


74
# File 'lib/supabase/realtime/channel.rb', line 74

def leaving?; @state == Types::ChannelStates::LEAVING; end

#loggerObject

Logger used by Supabase::Realtime::CallbackSafety when a user callback raises. Resolved lazily from the realtime client (‘@socket` in this class is the Supabase::Realtime::Client, which exposes its injected logger via `Client#logger`). When the underlying transport doesn’t carry a logger (e.g. tests that pass a bare TestSocket as ‘socket:`), `safe` falls through to `Kernel#warn`.



64
65
66
# File 'lib/supabase/realtime/channel.rb', line 64

def logger
  @socket.respond_to?(:logger) ? @socket.logger : nil
end

#on_broadcast(event, &block) ⇒ Object



147
148
149
150
# File 'lib/supabase/realtime/channel.rb', line 147

def on_broadcast(event, &block)
  @broadcast_callbacks << { event: event, callback: block }
  self
end

#on_close(&block) ⇒ Object



157
158
159
160
# File 'lib/supabase/realtime/channel.rb', line 157

def on_close(&block)
  @close_callbacks << block
  self
end

#on_error(&block) ⇒ Object



162
163
164
165
# File 'lib/supabase/realtime/channel.rb', line 162

def on_error(&block)
  @error_callbacks << block
  self
end

#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object

Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.



136
137
138
139
140
141
142
143
144
145
# File 'lib/supabase/realtime/channel.rb', line 136

def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block)
  unless %w[INSERT UPDATE DELETE *].include?(event)
    raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*"
  end

  @postgres_changes_callbacks << {
    event: event, schema: schema, table: table, filter: filter, callback: block
  }
  self
end

#on_presence_join(&block) ⇒ Object



178
179
180
181
182
# File 'lib/supabase/realtime/channel.rb', line 178

def on_presence_join(&block)
  @presence.on_join(&block)
  resubscribe_for_presence!
  self
end

#on_presence_leave(&block) ⇒ Object



184
185
186
187
188
# File 'lib/supabase/realtime/channel.rb', line 184

def on_presence_leave(&block)
  @presence.on_leave(&block)
  resubscribe_for_presence!
  self
end

#on_presence_sync(&block) ⇒ Object

Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API. If called after the channel is already joined, the channel resubscribes so the server starts forwarding presence events (presence has to be enabled in the join config — see #default_params).



172
173
174
175
176
# File 'lib/supabase/realtime/channel.rb', line 172

def on_presence_sync(&block)
  @presence.on_sync(&block)
  resubscribe_for_presence!
  self
end

#on_system(&block) ⇒ Object



152
153
154
155
# File 'lib/supabase/realtime/channel.rb', line 152

def on_system(&block)
  @system_callbacks << block
  self
end

#presence_stateObject

Shortcut for ‘channel.presence.state` so callers don’t have to drill in.



191
192
193
# File 'lib/supabase/realtime/channel.rb', line 191

def presence_state
  @presence.state
end

#push_event(event, payload = {}, timeout: nil) ⇒ Object

Public low-level push for arbitrary Phoenix events. Mirrors ‘supabase-py`’s ‘channel.push(event, payload, timeout)`. Returns the Push instance so callers can attach receive() handlers and observe the reply / timeout. Raises if called before #subscribe.



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/supabase/realtime/channel.rb', line 228

def push_event(event, payload = {}, timeout: nil)
  unless @joined_once
    raise Errors::RealtimeError,
          "tried to push '#{event}' to '#{@topic}' before joining. Call subscribe() first."
  end

  ref = @socket&.next_ref
  push = Push.new(self, event, payload, ref: ref, timeout: timeout || Types::DEFAULT_TIMEOUT_SECONDS)
  send_push(push, register_pending: true)
  push
end

#rejoinObject

Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions.



100
101
102
103
104
105
106
107
108
109
# File 'lib/supabase/realtime/channel.rb', line 100

def rejoin
  return unless @joined_once

  @state = Types::ChannelStates::JOINING
  inject_postgres_changes_bindings
  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  @join_push.instance_variable_set(:@received_status, nil)
  send_push(@join_push, register_pending: true)
  self
end

#send_broadcast(event, payload = {}) ⇒ Object

Send a custom broadcast message. The server will forward it to other subscribers of the same topic.



199
200
201
202
203
204
205
# File 'lib/supabase/realtime/channel.rb', line 199

def send_broadcast(event, payload = {})
  push = Push.new(self,
                  Types::ChannelEvents::BROADCAST,
                  { "type" => "broadcast", "event" => event, "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#subscribe(&block) ⇒ Object

Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/supabase/realtime/channel.rb', line 80

def subscribe(&block)
  raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once

  @joined_once = true
  @subscribe_callback = block
  @state = Types::ChannelStates::JOINING

  inject_postgres_changes_bindings
  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  # Make subscribe a one-call entry point: if the caller hasn't already
  # connected the underlying transport, open it now so the join frame
  # actually reaches the wire instead of being held forever in the
  # Client#send_buffer. Matches supabase-py's `channel.subscribe()` ergonomics.
  @socket.connect if @socket && !@socket.connected?
  send_push(@join_push, register_pending: true)
  self
end

#track(payload) ⇒ Object

Track the local user in the channel’s presence state.



208
209
210
211
212
213
214
# File 'lib/supabase/realtime/channel.rb', line 208

def track(payload)
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "track", "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#unsubscribeObject

Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/supabase/realtime/channel.rb', line 115

def unsubscribe
  return self if closed?

  @state = Types::ChannelStates::LEAVING
  ref = @socket&.next_ref
  leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref)

  leave_push
    .receive(Types::AckStatus::OK)      { |_| on_leave_ack }
    .receive(Types::AckStatus::ERROR)   { |_| on_leave_ack }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack }

  send_push(leave_push, register_pending: true)
  self
end

#untrackObject



216
217
218
219
220
221
222
# File 'lib/supabase/realtime/channel.rb', line 216

def untrack
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "untrack" })
  send_push(push, register_pending: false)
  self
end