Class: Supabase::Realtime::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/channel.rb

Overview

A topic subscription on a shared Socket connection. Each Channel:

  • tracks its own lifecycle state (closed/joining/joined/leaving/errored)

  • holds the listener callbacks for postgres changes / broadcast / presence / system

  • dispatches inbound messages from the Client to those callbacks

  • owns its Presence sync state

Should be constructed via Supabase::Realtime::Client#channel, not directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, params: nil, socket: nil) ⇒ Channel

Returns a new instance of Channel.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/supabase/realtime/channel.rb', line 23

def initialize(topic, params: nil, socket: nil)
  @topic   = topic
  @params  = params || default_params
  @socket  = socket
  @state   = Types::ChannelStates::CLOSED
  @joined_once = false
  @presence = Presence.new(logger: logger)

  @broadcast_callbacks        = []   # [{ event:, callback: }]
  @postgres_changes_callbacks = []   # [{ event:, schema:, table:, filter:, callback: }]
  @system_callbacks           = []
  @close_callbacks            = []
  @error_callbacks            = []

  @pending_pushes = {} # ref => Push, for matching phx_reply
  @push_buffer    = [] # outbound pushes queued while not yet joined

  @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params)
  @subscribe_callback = nil

  # py rejoin uses `lambda tries: 2**tries` with no cap
  # (`realtime/_async/channel.py:109-111`). Timer (US-006) bumps `tries`
  # before invoking this lambda with `tries + 1`, so the curve for the
  # first five attempts is 4, 8, 16, 32, 64 s — identical to py.
  @rejoin_timer = Timer.new(
    callback: -> { rejoin if @joined_once && !leaving? && !closed? },
    backoff:  ->(tries) { 2.0**tries }
  )

  @join_push
    .receive(Types::AckStatus::OK)      { |p| on_join_ok(p) }
    .receive(Types::AckStatus::ERROR)   { |p| on_join_error(p) }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout }
end

Instance Attribute Details

#join_pushObject (readonly)

Returns the value of attribute join_push.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def join_push
  @join_push
end

#paramsObject (readonly)

Returns the value of attribute params.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def params
  @params
end

#pending_pushesObject (readonly)

Returns the value of attribute pending_pushes.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def pending_pushes
  @pending_pushes
end

#presenceObject (readonly)

Returns the value of attribute presence.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def presence
  @presence
end

#rejoin_timerObject (readonly)

Returns the value of attribute rejoin_timer.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def rejoin_timer
  @rejoin_timer
end

#stateObject (readonly)

Returns the value of attribute state.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def state
  @state
end

#topicObject (readonly)

Returns the value of attribute topic.



21
22
23
# File 'lib/supabase/realtime/channel.rb', line 21

def topic
  @topic
end

Instance Method Details

#closed?Boolean

—– State predicates —–

Returns:

  • (Boolean)


70
# File 'lib/supabase/realtime/channel.rb', line 70

def closed?;  @state == Types::ChannelStates::CLOSED;  end

#dispatch(message) ⇒ Object

Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/supabase/realtime/channel.rb', line 270

def dispatch(message)
  return false unless message.topic == @topic

  case message.event
  when Types::ChannelEvents::REPLY
    dispatch_reply(message)
  when Types::ChannelEvents::POSTGRES_CHANGES
    dispatch_postgres_changes(message)
  when Types::ChannelEvents::BROADCAST
    dispatch_broadcast(message)
  when Types::ChannelEvents::PRESENCE_STATE
    @presence.sync_state(message.payload)
  when Types::ChannelEvents::PRESENCE_DIFF
    @presence.sync_diff(message.payload)
  when Types::ChannelEvents::SYSTEM
    # supabase-py routes system frames by status (channel.py:520-525): a
    # `status: "ok"` payload reaches the on_system callbacks; anything else
    # (e.g. a postgres_changes subscription failure reported via `system`)
    # is treated as a channel error → ERRORED + rejoin scheduled.
    if message.payload.is_a?(Hash) && message.payload["status"] == "error"
      trigger_channel_error(message.payload)
    else
      @system_callbacks.each do |cb|
        CallbackSafety.safe(logger, "system") { cb.call(message.payload) }
      end
    end
  when Types::ChannelEvents::CLOSE
    handle_channel_close(message.payload)
  when Types::ChannelEvents::ERROR
    trigger_channel_error(message.payload)
  end

  true
end

#errored?Boolean

Returns:

  • (Boolean)


71
# File 'lib/supabase/realtime/channel.rb', line 71

def errored?; @state == Types::ChannelStates::ERRORED; end

#joined?Boolean

Returns:

  • (Boolean)


72
# File 'lib/supabase/realtime/channel.rb', line 72

def joined?;  @state == Types::ChannelStates::JOINED;  end

#joining?Boolean

Returns:

  • (Boolean)


73
# File 'lib/supabase/realtime/channel.rb', line 73

def joining?; @state == Types::ChannelStates::JOINING; end

#leaving?Boolean

Returns:

  • (Boolean)


74
# File 'lib/supabase/realtime/channel.rb', line 74

def leaving?; @state == Types::ChannelStates::LEAVING; end

#loggerObject

Logger used by Supabase::Realtime::CallbackSafety when a user callback raises. Resolved lazily from the realtime client (‘@socket` in this class is the Supabase::Realtime::Client, which exposes its injected logger via `Client#logger`). When the underlying transport doesn’t carry a logger (e.g. tests that pass a bare TestSocket as ‘socket:`), `safe` falls through to `Kernel#warn`.



64
65
66
# File 'lib/supabase/realtime/channel.rb', line 64

def logger
  @socket.respond_to?(:logger) ? @socket.logger : nil
end

#on_broadcast(event, &block) ⇒ Object



152
153
154
155
# File 'lib/supabase/realtime/channel.rb', line 152

def on_broadcast(event, &block)
  @broadcast_callbacks << { event: event, callback: block }
  self
end

#on_close(&block) ⇒ Object



162
163
164
165
# File 'lib/supabase/realtime/channel.rb', line 162

def on_close(&block)
  @close_callbacks << block
  self
end

#on_error(&block) ⇒ Object



167
168
169
170
# File 'lib/supabase/realtime/channel.rb', line 167

def on_error(&block)
  @error_callbacks << block
  self
end

#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object

Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.



141
142
143
144
145
146
147
148
149
150
# File 'lib/supabase/realtime/channel.rb', line 141

def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block)
  unless %w[INSERT UPDATE DELETE *].include?(event)
    raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*"
  end

  @postgres_changes_callbacks << {
    event: event, schema: schema, table: table, filter: filter, callback: block
  }
  self
end

#on_presence_join(&block) ⇒ Object



183
184
185
186
187
# File 'lib/supabase/realtime/channel.rb', line 183

def on_presence_join(&block)
  @presence.on_join(&block)
  resubscribe_for_presence!
  self
end

#on_presence_leave(&block) ⇒ Object



189
190
191
192
193
# File 'lib/supabase/realtime/channel.rb', line 189

def on_presence_leave(&block)
  @presence.on_leave(&block)
  resubscribe_for_presence!
  self
end

#on_presence_sync(&block) ⇒ Object

Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API. If called after the channel is already joined, the channel resubscribes so the server starts forwarding presence events (presence has to be enabled in the join config — see #default_params).



177
178
179
180
181
# File 'lib/supabase/realtime/channel.rb', line 177

def on_presence_sync(&block)
  @presence.on_sync(&block)
  resubscribe_for_presence!
  self
end

#on_system(&block) ⇒ Object



157
158
159
160
# File 'lib/supabase/realtime/channel.rb', line 157

def on_system(&block)
  @system_callbacks << block
  self
end

#presence_stateObject

Shortcut for ‘channel.presence.state` so callers don’t have to drill in.



196
197
198
# File 'lib/supabase/realtime/channel.rb', line 196

def presence_state
  @presence.state
end

#push_access_token(token) ⇒ Object

Push the rotated access_token to the server for this channel. Called by Supabase::Realtime::Client#set_auth for every joined channel, mirroring supabase-py (client.py:335-337): ‘await channel.push(ChannelEvents.access_token, token)`. Routing through the normal push path means the frame is buffered (not dropped) when the socket is momentarily offline, matching py’s ‘channel.push` buffering — the prior version only sent when `connected?` and silently lost the rotation otherwise.



236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/supabase/realtime/channel.rb', line 236

def push_access_token(token)
  return unless @joined_once

  ref  = @socket&.next_ref
  push = Push.new(self,
                  Types::ChannelEvents::ACCESS_TOKEN,
                  { "access_token" => token },
                  ref: ref,
                  timeout: Types::DEFAULT_TIMEOUT_SECONDS)
  send_push(push, register_pending: true)
  self
end

#push_event(event, payload = {}, timeout: nil) ⇒ Object

Public low-level push for arbitrary Phoenix events. Mirrors ‘supabase-py`’s ‘channel.push(event, payload, timeout)`. Returns the Push instance so callers can attach receive() handlers and observe the reply / timeout. Raises if called before #subscribe.



253
254
255
256
257
258
259
260
261
262
263
# File 'lib/supabase/realtime/channel.rb', line 253

def push_event(event, payload = {}, timeout: nil)
  unless @joined_once
    raise Errors::RealtimeError,
          "tried to push '#{event}' to '#{@topic}' before joining. Call subscribe() first."
  end

  ref = @socket&.next_ref
  push = Push.new(self, event, payload, ref: ref, timeout: timeout || Types::DEFAULT_TIMEOUT_SECONDS)
  send_push(push, register_pending: true)
  push
end

#rejoinObject

Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions, and by the rejoin timer after a join error.



107
108
109
110
111
112
113
114
# File 'lib/supabase/realtime/channel.rb', line 107

def rejoin
  return unless @joined_once

  @state = Types::ChannelStates::JOINING
  inject_postgres_changes_bindings
  send_join_push
  self
end

#send_broadcast(event, payload = {}) ⇒ Object

Send a custom broadcast message. The server will forward it to other subscribers of the same topic.



204
205
206
207
208
209
210
# File 'lib/supabase/realtime/channel.rb', line 204

def send_broadcast(event, payload = {})
  push = Push.new(self,
                  Types::ChannelEvents::BROADCAST,
                  { "type" => "broadcast", "event" => event, "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#subscribe(&block) ⇒ Object

Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/supabase/realtime/channel.rb', line 80

def subscribe(&block)
  raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once

  @joined_once = true
  @subscribe_callback = block
  @state = Types::ChannelStates::JOINING

  inject_postgres_changes_bindings
  # Make subscribe a one-call entry point. If the socket is already open,
  # send the join now. If it isn't, just (idempotently) open it — the
  # client's rejoin_channels fires on socket-open and (re)sends the join
  # exactly once. The previous version sent/buffered the join here AND let
  # rejoin_channels re-send it on open, so a subscribe-before-open issued
  # a DUPLICATE join; the server phx_closed the extra one, which (with the
  # registry-removal fix) tore the channel down and broke delivery. Caught
  # by the live integration suite, not the mocked specs.
  if @socket && !@socket.connected?
    @socket.connect
  else
    send_join_push
  end
  self
end

#track(payload) ⇒ Object

Track the local user in the channel’s presence state.



213
214
215
216
217
218
219
# File 'lib/supabase/realtime/channel.rb', line 213

def track(payload)
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "track", "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#unsubscribeObject

Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/supabase/realtime/channel.rb', line 120

def unsubscribe
  return self if closed?

  @state = Types::ChannelStates::LEAVING
  ref = @socket&.next_ref
  leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref)

  leave_push
    .receive(Types::AckStatus::OK)      { |_| on_leave_ack }
    .receive(Types::AckStatus::ERROR)   { |_| on_leave_ack }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack }

  send_push(leave_push, register_pending: true)
  self
end

#untrackObject



221
222
223
224
225
226
227
# File 'lib/supabase/realtime/channel.rb', line 221

def untrack
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "untrack" })
  send_push(push, register_pending: false)
  self
end