Class: Supabase::Realtime::Channel
- Inherits:
-
Object
- Object
- Supabase::Realtime::Channel
- Defined in:
- lib/supabase/realtime/channel.rb
Overview
A topic subscription on a shared Socket connection. Each Channel:
-
tracks its own lifecycle state (closed/joining/joined/leaving/errored)
-
holds the listener callbacks for postgres changes / broadcast / presence / system
-
dispatches inbound messages from the Client to those callbacks
-
owns its Presence sync state
Should be constructed via Supabase::Realtime::Client#channel, not directly.
Instance Attribute Summary collapse
-
#join_push ⇒ Object
readonly
Returns the value of attribute join_push.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#pending_pushes ⇒ Object
readonly
Returns the value of attribute pending_pushes.
-
#presence ⇒ Object
readonly
Returns the value of attribute presence.
-
#rejoin_timer ⇒ Object
readonly
Returns the value of attribute rejoin_timer.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
—– State predicates —–.
-
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners.
- #errored? ⇒ Boolean
-
#initialize(topic, params: nil, socket: nil) ⇒ Channel
constructor
A new instance of Channel.
- #joined? ⇒ Boolean
- #joining? ⇒ Boolean
- #leaving? ⇒ Boolean
-
#logger ⇒ Object
Logger used by CallbackSafety when a user callback raises.
- #on_broadcast(event, &block) ⇒ Object
- #on_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
-
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener.
- #on_presence_join(&block) ⇒ Object
- #on_presence_leave(&block) ⇒ Object
-
#on_presence_sync(&block) ⇒ Object
Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API.
- #on_system(&block) ⇒ Object
-
#presence_state ⇒ Object
Shortcut for ‘channel.presence.state` so callers don’t have to drill in.
-
#push_access_token(token) ⇒ Object
Push the rotated access_token to the server for this channel.
-
#push_event(event, payload = {}, timeout: nil) ⇒ Object
Public low-level push for arbitrary Phoenix events.
-
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once.
-
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message.
-
#subscribe(&block) ⇒ Object
Start the join handshake.
-
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
-
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push.
- #untrack ⇒ Object
Constructor Details
#initialize(topic, params: nil, socket: nil) ⇒ Channel
Returns a new instance of Channel.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/supabase/realtime/channel.rb', line 23 def initialize(topic, params: nil, socket: nil) @topic = topic @params = params || default_params @socket = socket @state = Types::ChannelStates::CLOSED @joined_once = false @presence = Presence.new(logger: logger) @broadcast_callbacks = [] # [{ event:, callback: }] @postgres_changes_callbacks = [] # [{ event:, schema:, table:, filter:, callback: }] @system_callbacks = [] @close_callbacks = [] @error_callbacks = [] @pending_pushes = {} # ref => Push, for matching phx_reply @push_buffer = [] # outbound pushes queued while not yet joined @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params) @subscribe_callback = nil # py rejoin uses `lambda tries: 2**tries` with no cap # (`realtime/_async/channel.py:109-111`). Timer (US-006) bumps `tries` # before invoking this lambda with `tries + 1`, so the curve for the # first five attempts is 4, 8, 16, 32, 64 s — identical to py. @rejoin_timer = Timer.new( callback: -> { rejoin if @joined_once && !leaving? && !closed? }, backoff: ->(tries) { 2.0**tries } ) @join_push .receive(Types::AckStatus::OK) { |p| on_join_ok(p) } .receive(Types::AckStatus::ERROR) { |p| on_join_error(p) } .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout } end |
Instance Attribute Details
#join_push ⇒ Object (readonly)
Returns the value of attribute join_push.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def join_push @join_push end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def params @params end |
#pending_pushes ⇒ Object (readonly)
Returns the value of attribute pending_pushes.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def pending_pushes @pending_pushes end |
#presence ⇒ Object (readonly)
Returns the value of attribute presence.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def presence @presence end |
#rejoin_timer ⇒ Object (readonly)
Returns the value of attribute rejoin_timer.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def rejoin_timer @rejoin_timer end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def state @state end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def topic @topic end |
Instance Method Details
#closed? ⇒ Boolean
—– State predicates —–
70 |
# File 'lib/supabase/realtime/channel.rb', line 70 def closed?; @state == Types::ChannelStates::CLOSED; end |
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/supabase/realtime/channel.rb', line 270 def dispatch() return false unless .topic == @topic case .event when Types::ChannelEvents::REPLY dispatch_reply() when Types::ChannelEvents::POSTGRES_CHANGES dispatch_postgres_changes() when Types::ChannelEvents::BROADCAST dispatch_broadcast() when Types::ChannelEvents::PRESENCE_STATE @presence.sync_state(.payload) when Types::ChannelEvents::PRESENCE_DIFF @presence.sync_diff(.payload) when Types::ChannelEvents::SYSTEM # supabase-py routes system frames by status (channel.py:520-525): a # `status: "ok"` payload reaches the on_system callbacks; anything else # (e.g. a postgres_changes subscription failure reported via `system`) # is treated as a channel error → ERRORED + rejoin scheduled. if .payload.is_a?(Hash) && .payload["status"] == "error" trigger_channel_error(.payload) else @system_callbacks.each do |cb| CallbackSafety.safe(logger, "system") { cb.call(.payload) } end end when Types::ChannelEvents::CLOSE handle_channel_close(.payload) when Types::ChannelEvents::ERROR trigger_channel_error(.payload) end true end |
#errored? ⇒ Boolean
71 |
# File 'lib/supabase/realtime/channel.rb', line 71 def errored?; @state == Types::ChannelStates::ERRORED; end |
#joined? ⇒ Boolean
72 |
# File 'lib/supabase/realtime/channel.rb', line 72 def joined?; @state == Types::ChannelStates::JOINED; end |
#joining? ⇒ Boolean
73 |
# File 'lib/supabase/realtime/channel.rb', line 73 def joining?; @state == Types::ChannelStates::JOINING; end |
#leaving? ⇒ Boolean
74 |
# File 'lib/supabase/realtime/channel.rb', line 74 def leaving?; @state == Types::ChannelStates::LEAVING; end |
#logger ⇒ Object
Logger used by Supabase::Realtime::CallbackSafety when a user callback raises. Resolved lazily from the realtime client (‘@socket` in this class is the Supabase::Realtime::Client, which exposes its injected logger via `Client#logger`). When the underlying transport doesn’t carry a logger (e.g. tests that pass a bare TestSocket as ‘socket:`), `safe` falls through to `Kernel#warn`.
64 65 66 |
# File 'lib/supabase/realtime/channel.rb', line 64 def logger @socket.respond_to?(:logger) ? @socket.logger : nil end |
#on_broadcast(event, &block) ⇒ Object
152 153 154 155 |
# File 'lib/supabase/realtime/channel.rb', line 152 def on_broadcast(event, &block) @broadcast_callbacks << { event: event, callback: block } self end |
#on_close(&block) ⇒ Object
162 163 164 165 |
# File 'lib/supabase/realtime/channel.rb', line 162 def on_close(&block) @close_callbacks << block self end |
#on_error(&block) ⇒ Object
167 168 169 170 |
# File 'lib/supabase/realtime/channel.rb', line 167 def on_error(&block) @error_callbacks << block self end |
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.
141 142 143 144 145 146 147 148 149 150 |
# File 'lib/supabase/realtime/channel.rb', line 141 def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) unless %w[INSERT UPDATE DELETE *].include?(event) raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*" end @postgres_changes_callbacks << { event: event, schema: schema, table: table, filter: filter, callback: block } self end |
#on_presence_join(&block) ⇒ Object
183 184 185 186 187 |
# File 'lib/supabase/realtime/channel.rb', line 183 def on_presence_join(&block) @presence.on_join(&block) resubscribe_for_presence! self end |
#on_presence_leave(&block) ⇒ Object
189 190 191 192 193 |
# File 'lib/supabase/realtime/channel.rb', line 189 def on_presence_leave(&block) @presence.on_leave(&block) resubscribe_for_presence! self end |
#on_presence_sync(&block) ⇒ Object
Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API. If called after the channel is already joined, the channel resubscribes so the server starts forwarding presence events (presence has to be enabled in the join config — see #default_params).
177 178 179 180 181 |
# File 'lib/supabase/realtime/channel.rb', line 177 def on_presence_sync(&block) @presence.on_sync(&block) resubscribe_for_presence! self end |
#on_system(&block) ⇒ Object
157 158 159 160 |
# File 'lib/supabase/realtime/channel.rb', line 157 def on_system(&block) @system_callbacks << block self end |
#presence_state ⇒ Object
Shortcut for ‘channel.presence.state` so callers don’t have to drill in.
196 197 198 |
# File 'lib/supabase/realtime/channel.rb', line 196 def presence_state @presence.state end |
#push_access_token(token) ⇒ Object
Push the rotated access_token to the server for this channel. Called by Supabase::Realtime::Client#set_auth for every joined channel, mirroring supabase-py (client.py:335-337): ‘await channel.push(ChannelEvents.access_token, token)`. Routing through the normal push path means the frame is buffered (not dropped) when the socket is momentarily offline, matching py’s ‘channel.push` buffering — the prior version only sent when `connected?` and silently lost the rotation otherwise.
236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/supabase/realtime/channel.rb', line 236 def push_access_token(token) return unless @joined_once ref = @socket&.next_ref push = Push.new(self, Types::ChannelEvents::ACCESS_TOKEN, { "access_token" => token }, ref: ref, timeout: Types::DEFAULT_TIMEOUT_SECONDS) send_push(push, register_pending: true) self end |
#push_event(event, payload = {}, timeout: nil) ⇒ Object
Public low-level push for arbitrary Phoenix events. Mirrors ‘supabase-py`’s ‘channel.push(event, payload, timeout)`. Returns the Push instance so callers can attach receive() handlers and observe the reply / timeout. Raises if called before #subscribe.
253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/supabase/realtime/channel.rb', line 253 def push_event(event, payload = {}, timeout: nil) unless @joined_once raise Errors::RealtimeError, "tried to push '#{event}' to '#{@topic}' before joining. Call subscribe() first." end ref = @socket&.next_ref push = Push.new(self, event, payload, ref: ref, timeout: timeout || Types::DEFAULT_TIMEOUT_SECONDS) send_push(push, register_pending: true) push end |
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions, and by the rejoin timer after a join error.
107 108 109 110 111 112 113 114 |
# File 'lib/supabase/realtime/channel.rb', line 107 def rejoin return unless @joined_once @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings send_join_push self end |
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message. The server will forward it to other subscribers of the same topic.
204 205 206 207 208 209 210 |
# File 'lib/supabase/realtime/channel.rb', line 204 def send_broadcast(event, payload = {}) push = Push.new(self, Types::ChannelEvents::BROADCAST, { "type" => "broadcast", "event" => event, "payload" => payload }) send_push(push, register_pending: false) self end |
#subscribe(&block) ⇒ Object
Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/supabase/realtime/channel.rb', line 80 def subscribe(&block) raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once @joined_once = true @subscribe_callback = block @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings # Make subscribe a one-call entry point. If the socket is already open, # send the join now. If it isn't, just (idempotently) open it — the # client's rejoin_channels fires on socket-open and (re)sends the join # exactly once. The previous version sent/buffered the join here AND let # rejoin_channels re-send it on open, so a subscribe-before-open issued # a DUPLICATE join; the server phx_closed the extra one, which (with the # registry-removal fix) tore the channel down and broke delivery. Caught # by the live integration suite, not the mocked specs. if @socket && !@socket.connected? @socket.connect else send_join_push end self end |
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
213 214 215 216 217 218 219 |
# File 'lib/supabase/realtime/channel.rb', line 213 def track(payload) push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "track", "payload" => payload }) send_push(push, register_pending: false) self end |
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/supabase/realtime/channel.rb', line 120 def unsubscribe return self if closed? @state = Types::ChannelStates::LEAVING ref = @socket&.next_ref leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref) leave_push .receive(Types::AckStatus::OK) { |_| on_leave_ack } .receive(Types::AckStatus::ERROR) { |_| on_leave_ack } .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack } send_push(leave_push, register_pending: true) self end |
#untrack ⇒ Object
221 222 223 224 225 226 227 |
# File 'lib/supabase/realtime/channel.rb', line 221 def untrack push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "untrack" }) send_push(push, register_pending: false) self end |