Class: Supabase::Realtime::Channel
- Inherits:
-
Object
- Object
- Supabase::Realtime::Channel
- Defined in:
- lib/supabase/realtime/channel.rb
Overview
A topic subscription on a shared Socket connection. Each Channel:
-
tracks its own lifecycle state (closed/joining/joined/leaving/errored)
-
holds the listener callbacks for postgres changes / broadcast / presence / system
-
dispatches inbound messages from the Client to those callbacks
-
owns its Presence sync state
Should be constructed via Supabase::Realtime::Client#channel, not directly.
Instance Attribute Summary collapse
-
#join_push ⇒ Object
readonly
Returns the value of attribute join_push.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#pending_pushes ⇒ Object
readonly
Returns the value of attribute pending_pushes.
-
#presence ⇒ Object
readonly
Returns the value of attribute presence.
-
#rejoin_timer ⇒ Object
readonly
Returns the value of attribute rejoin_timer.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
—– State predicates —–.
-
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners.
- #errored? ⇒ Boolean
-
#initialize(topic, params: nil, socket: nil) ⇒ Channel
constructor
A new instance of Channel.
- #joined? ⇒ Boolean
- #joining? ⇒ Boolean
- #leaving? ⇒ Boolean
-
#logger ⇒ Object
Logger used by CallbackSafety when a user callback raises.
- #on_broadcast(event, &block) ⇒ Object
- #on_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
-
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener.
- #on_presence_join(&block) ⇒ Object
- #on_presence_leave(&block) ⇒ Object
-
#on_presence_sync(&block) ⇒ Object
Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API.
- #on_system(&block) ⇒ Object
-
#presence_state ⇒ Object
Shortcut for ‘channel.presence.state` so callers don’t have to drill in.
-
#push_event(event, payload = {}, timeout: nil) ⇒ Object
Public low-level push for arbitrary Phoenix events.
-
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once.
-
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message.
-
#subscribe(&block) ⇒ Object
Start the join handshake.
-
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
-
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push.
- #untrack ⇒ Object
Constructor Details
#initialize(topic, params: nil, socket: nil) ⇒ Channel
Returns a new instance of Channel.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/supabase/realtime/channel.rb', line 23 def initialize(topic, params: nil, socket: nil) @topic = topic @params = params || default_params @socket = socket @state = Types::ChannelStates::CLOSED @joined_once = false @presence = Presence.new(logger: logger) @broadcast_callbacks = [] # [{ event:, callback: }] @postgres_changes_callbacks = [] # [{ event:, schema:, table:, filter:, callback: }] @system_callbacks = [] @close_callbacks = [] @error_callbacks = [] @pending_pushes = {} # ref => Push, for matching phx_reply @push_buffer = [] # outbound pushes queued while not yet joined @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params) @subscribe_callback = nil # py rejoin uses `lambda tries: 2**tries` with no cap # (`realtime/_async/channel.py:109-111`). Timer (US-006) bumps `tries` # before invoking this lambda with `tries + 1`, so the curve for the # first five attempts is 4, 8, 16, 32, 64 s — identical to py. @rejoin_timer = Timer.new( callback: -> { rejoin if @joined_once && !leaving? && !closed? }, backoff: ->(tries) { 2.0**tries } ) @join_push .receive(Types::AckStatus::OK) { |p| on_join_ok(p) } .receive(Types::AckStatus::ERROR) { |p| on_join_error(p) } .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout } end |
Instance Attribute Details
#join_push ⇒ Object (readonly)
Returns the value of attribute join_push.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def join_push @join_push end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def params @params end |
#pending_pushes ⇒ Object (readonly)
Returns the value of attribute pending_pushes.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def pending_pushes @pending_pushes end |
#presence ⇒ Object (readonly)
Returns the value of attribute presence.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def presence @presence end |
#rejoin_timer ⇒ Object (readonly)
Returns the value of attribute rejoin_timer.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def rejoin_timer @rejoin_timer end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def state @state end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
21 22 23 |
# File 'lib/supabase/realtime/channel.rb', line 21 def topic @topic end |
Instance Method Details
#closed? ⇒ Boolean
—– State predicates —–
70 |
# File 'lib/supabase/realtime/channel.rb', line 70 def closed?; @state == Types::ChannelStates::CLOSED; end |
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/supabase/realtime/channel.rb', line 245 def dispatch() return false unless .topic == @topic case .event when Types::ChannelEvents::REPLY dispatch_reply() when Types::ChannelEvents::POSTGRES_CHANGES dispatch_postgres_changes() when Types::ChannelEvents::BROADCAST dispatch_broadcast() when Types::ChannelEvents::PRESENCE_STATE @presence.sync_state(.payload) when Types::ChannelEvents::PRESENCE_DIFF @presence.sync_diff(.payload) when Types::ChannelEvents::SYSTEM @system_callbacks.each do |cb| CallbackSafety.safe(logger, "system") { cb.call(.payload) } end when Types::ChannelEvents::CLOSE @state = Types::ChannelStates::CLOSED @close_callbacks.each do |cb| CallbackSafety.safe(logger, "phx_close") { cb.call(.payload) } end when Types::ChannelEvents::ERROR @state = Types::ChannelStates::ERRORED @error_callbacks.each do |cb| CallbackSafety.safe(logger, "phx_error") { cb.call(.payload) } end end true end |
#errored? ⇒ Boolean
71 |
# File 'lib/supabase/realtime/channel.rb', line 71 def errored?; @state == Types::ChannelStates::ERRORED; end |
#joined? ⇒ Boolean
72 |
# File 'lib/supabase/realtime/channel.rb', line 72 def joined?; @state == Types::ChannelStates::JOINED; end |
#joining? ⇒ Boolean
73 |
# File 'lib/supabase/realtime/channel.rb', line 73 def joining?; @state == Types::ChannelStates::JOINING; end |
#leaving? ⇒ Boolean
74 |
# File 'lib/supabase/realtime/channel.rb', line 74 def leaving?; @state == Types::ChannelStates::LEAVING; end |
#logger ⇒ Object
Logger used by Supabase::Realtime::CallbackSafety when a user callback raises. Resolved lazily from the realtime client (‘@socket` in this class is the Supabase::Realtime::Client, which exposes its injected logger via `Client#logger`). When the underlying transport doesn’t carry a logger (e.g. tests that pass a bare TestSocket as ‘socket:`), `safe` falls through to `Kernel#warn`.
64 65 66 |
# File 'lib/supabase/realtime/channel.rb', line 64 def logger @socket.respond_to?(:logger) ? @socket.logger : nil end |
#on_broadcast(event, &block) ⇒ Object
147 148 149 150 |
# File 'lib/supabase/realtime/channel.rb', line 147 def on_broadcast(event, &block) @broadcast_callbacks << { event: event, callback: block } self end |
#on_close(&block) ⇒ Object
157 158 159 160 |
# File 'lib/supabase/realtime/channel.rb', line 157 def on_close(&block) @close_callbacks << block self end |
#on_error(&block) ⇒ Object
162 163 164 165 |
# File 'lib/supabase/realtime/channel.rb', line 162 def on_error(&block) @error_callbacks << block self end |
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/supabase/realtime/channel.rb', line 136 def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) unless %w[INSERT UPDATE DELETE *].include?(event) raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*" end @postgres_changes_callbacks << { event: event, schema: schema, table: table, filter: filter, callback: block } self end |
#on_presence_join(&block) ⇒ Object
178 179 180 181 182 |
# File 'lib/supabase/realtime/channel.rb', line 178 def on_presence_join(&block) @presence.on_join(&block) resubscribe_for_presence! self end |
#on_presence_leave(&block) ⇒ Object
184 185 186 187 188 |
# File 'lib/supabase/realtime/channel.rb', line 184 def on_presence_leave(&block) @presence.on_leave(&block) resubscribe_for_presence! self end |
#on_presence_sync(&block) ⇒ Object
Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API. If called after the channel is already joined, the channel resubscribes so the server starts forwarding presence events (presence has to be enabled in the join config — see #default_params).
172 173 174 175 176 |
# File 'lib/supabase/realtime/channel.rb', line 172 def on_presence_sync(&block) @presence.on_sync(&block) resubscribe_for_presence! self end |
#on_system(&block) ⇒ Object
152 153 154 155 |
# File 'lib/supabase/realtime/channel.rb', line 152 def on_system(&block) @system_callbacks << block self end |
#presence_state ⇒ Object
Shortcut for ‘channel.presence.state` so callers don’t have to drill in.
191 192 193 |
# File 'lib/supabase/realtime/channel.rb', line 191 def presence_state @presence.state end |
#push_event(event, payload = {}, timeout: nil) ⇒ Object
Public low-level push for arbitrary Phoenix events. Mirrors ‘supabase-py`’s ‘channel.push(event, payload, timeout)`. Returns the Push instance so callers can attach receive() handlers and observe the reply / timeout. Raises if called before #subscribe.
228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/supabase/realtime/channel.rb', line 228 def push_event(event, payload = {}, timeout: nil) unless @joined_once raise Errors::RealtimeError, "tried to push '#{event}' to '#{@topic}' before joining. Call subscribe() first." end ref = @socket&.next_ref push = Push.new(self, event, payload, ref: ref, timeout: timeout || Types::DEFAULT_TIMEOUT_SECONDS) send_push(push, register_pending: true) push end |
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions.
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/supabase/realtime/channel.rb', line 100 def rejoin return unless @joined_once @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings @join_push.instance_variable_set(:@ref, @socket&.next_ref) @join_push.instance_variable_set(:@received_status, nil) send_push(@join_push, register_pending: true) self end |
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message. The server will forward it to other subscribers of the same topic.
199 200 201 202 203 204 205 |
# File 'lib/supabase/realtime/channel.rb', line 199 def send_broadcast(event, payload = {}) push = Push.new(self, Types::ChannelEvents::BROADCAST, { "type" => "broadcast", "event" => event, "payload" => payload }) send_push(push, register_pending: false) self end |
#subscribe(&block) ⇒ Object
Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/supabase/realtime/channel.rb', line 80 def subscribe(&block) raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once @joined_once = true @subscribe_callback = block @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings @join_push.instance_variable_set(:@ref, @socket&.next_ref) # Make subscribe a one-call entry point: if the caller hasn't already # connected the underlying transport, open it now so the join frame # actually reaches the wire instead of being held forever in the # Client#send_buffer. Matches supabase-py's `channel.subscribe()` ergonomics. @socket.connect if @socket && !@socket.connected? send_push(@join_push, register_pending: true) self end |
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
208 209 210 211 212 213 214 |
# File 'lib/supabase/realtime/channel.rb', line 208 def track(payload) push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "track", "payload" => payload }) send_push(push, register_pending: false) self end |
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/supabase/realtime/channel.rb', line 115 def unsubscribe return self if closed? @state = Types::ChannelStates::LEAVING ref = @socket&.next_ref leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref) leave_push .receive(Types::AckStatus::OK) { |_| on_leave_ack } .receive(Types::AckStatus::ERROR) { |_| on_leave_ack } .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack } send_push(leave_push, register_pending: true) self end |
#untrack ⇒ Object
216 217 218 219 220 221 222 |
# File 'lib/supabase/realtime/channel.rb', line 216 def untrack push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "untrack" }) send_push(push, register_pending: false) self end |