Class: Supabase::Realtime::Channel
- Inherits:
-
Object
- Object
- Supabase::Realtime::Channel
- Defined in:
- lib/supabase/realtime/channel.rb
Overview
A topic subscription on a shared Socket connection. Each Channel:
-
tracks its own lifecycle state (closed/joining/joined/leaving/errored)
-
holds the listener callbacks for postgres changes / broadcast / presence / system
-
dispatches inbound messages from the Client to those callbacks
-
owns its Presence sync state
Should be constructed via Supabase::Realtime::Client#channel, not directly.
Instance Attribute Summary collapse
-
#join_push ⇒ Object
readonly
Returns the value of attribute join_push.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#pending_pushes ⇒ Object
readonly
Returns the value of attribute pending_pushes.
-
#presence ⇒ Object
readonly
Returns the value of attribute presence.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
—– State predicates —–.
-
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners.
- #errored? ⇒ Boolean
-
#initialize(topic, params: nil, socket: nil) ⇒ Channel
constructor
A new instance of Channel.
- #joined? ⇒ Boolean
- #joining? ⇒ Boolean
- #leaving? ⇒ Boolean
- #on_broadcast(event, &block) ⇒ Object
- #on_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
-
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener.
- #on_presence_join(&block) ⇒ Object
- #on_presence_leave(&block) ⇒ Object
-
#on_presence_sync(&block) ⇒ Object
Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API.
- #on_system(&block) ⇒ Object
-
#presence_state ⇒ Object
Shortcut for ‘channel.presence.state` so callers don’t have to drill in.
-
#push_event(event, payload = {}, timeout: nil) ⇒ Object
Public low-level push for arbitrary Phoenix events.
-
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once.
-
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message.
-
#subscribe(&block) ⇒ Object
Start the join handshake.
-
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
-
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push.
- #untrack ⇒ Object
Constructor Details
#initialize(topic, params: nil, socket: nil) ⇒ Channel
Returns a new instance of Channel.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/supabase/realtime/channel.rb', line 21 def initialize(topic, params: nil, socket: nil) @topic = topic @params = params || default_params @socket = socket @state = Types::ChannelStates::CLOSED @joined_once = false @presence = Presence.new @broadcast_callbacks = [] # [{ event:, callback: }] @postgres_changes_callbacks = [] # [{ event:, schema:, table:, filter:, callback: }] @system_callbacks = [] @close_callbacks = [] @error_callbacks = [] @pending_pushes = {} # ref => Push, for matching phx_reply @push_buffer = [] # outbound pushes queued while not yet joined @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params) @subscribe_callback = nil @join_push .receive(Types::AckStatus::OK) { |p| on_join_ok(p) } .receive(Types::AckStatus::ERROR) { |p| on_join_error(p) } .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout } end |
Instance Attribute Details
#join_push ⇒ Object (readonly)
Returns the value of attribute join_push.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def join_push @join_push end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def params @params end |
#pending_pushes ⇒ Object (readonly)
Returns the value of attribute pending_pushes.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def pending_pushes @pending_pushes end |
#presence ⇒ Object (readonly)
Returns the value of attribute presence.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def presence @presence end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def state @state end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def topic @topic end |
Instance Method Details
#closed? ⇒ Boolean
—– State predicates —–
49 |
# File 'lib/supabase/realtime/channel.rb', line 49 def closed?; @state == Types::ChannelStates::CLOSED; end |
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/supabase/realtime/channel.rb', line 219 def dispatch() return false unless .topic == @topic case .event when Types::ChannelEvents::REPLY dispatch_reply() when Types::ChannelEvents::POSTGRES_CHANGES dispatch_postgres_changes() when Types::ChannelEvents::BROADCAST dispatch_broadcast() when Types::ChannelEvents::PRESENCE_STATE @presence.sync_state(.payload) when Types::ChannelEvents::PRESENCE_DIFF @presence.sync_diff(.payload) when Types::ChannelEvents::SYSTEM @system_callbacks.each { |cb| cb.call(.payload) } when Types::ChannelEvents::CLOSE @state = Types::ChannelStates::CLOSED @close_callbacks.each { |cb| cb.call(.payload) } when Types::ChannelEvents::ERROR @state = Types::ChannelStates::ERRORED @error_callbacks.each { |cb| cb.call(.payload) } end true end |
#errored? ⇒ Boolean
50 |
# File 'lib/supabase/realtime/channel.rb', line 50 def errored?; @state == Types::ChannelStates::ERRORED; end |
#joined? ⇒ Boolean
51 |
# File 'lib/supabase/realtime/channel.rb', line 51 def joined?; @state == Types::ChannelStates::JOINED; end |
#joining? ⇒ Boolean
52 |
# File 'lib/supabase/realtime/channel.rb', line 52 def joining?; @state == Types::ChannelStates::JOINING; end |
#leaving? ⇒ Boolean
53 |
# File 'lib/supabase/realtime/channel.rb', line 53 def leaving?; @state == Types::ChannelStates::LEAVING; end |
#on_broadcast(event, &block) ⇒ Object
121 122 123 124 |
# File 'lib/supabase/realtime/channel.rb', line 121 def on_broadcast(event, &block) @broadcast_callbacks << { event: event, callback: block } self end |
#on_close(&block) ⇒ Object
131 132 133 134 |
# File 'lib/supabase/realtime/channel.rb', line 131 def on_close(&block) @close_callbacks << block self end |
#on_error(&block) ⇒ Object
136 137 138 139 |
# File 'lib/supabase/realtime/channel.rb', line 136 def on_error(&block) @error_callbacks << block self end |
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/supabase/realtime/channel.rb', line 110 def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) unless %w[INSERT UPDATE DELETE *].include?(event) raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*" end @postgres_changes_callbacks << { event: event, schema: schema, table: table, filter: filter, callback: block } self end |
#on_presence_join(&block) ⇒ Object
152 153 154 155 156 |
# File 'lib/supabase/realtime/channel.rb', line 152 def on_presence_join(&block) @presence.on_join(&block) resubscribe_for_presence! self end |
#on_presence_leave(&block) ⇒ Object
158 159 160 161 162 |
# File 'lib/supabase/realtime/channel.rb', line 158 def on_presence_leave(&block) @presence.on_leave(&block) resubscribe_for_presence! self end |
#on_presence_sync(&block) ⇒ Object
Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API. If called after the channel is already joined, the channel resubscribes so the server starts forwarding presence events (presence has to be enabled in the join config — see #default_params).
146 147 148 149 150 |
# File 'lib/supabase/realtime/channel.rb', line 146 def on_presence_sync(&block) @presence.on_sync(&block) resubscribe_for_presence! self end |
#on_system(&block) ⇒ Object
126 127 128 129 |
# File 'lib/supabase/realtime/channel.rb', line 126 def on_system(&block) @system_callbacks << block self end |
#presence_state ⇒ Object
Shortcut for ‘channel.presence.state` so callers don’t have to drill in.
165 166 167 |
# File 'lib/supabase/realtime/channel.rb', line 165 def presence_state @presence.state end |
#push_event(event, payload = {}, timeout: nil) ⇒ Object
Public low-level push for arbitrary Phoenix events. Mirrors ‘supabase-py`’s ‘channel.push(event, payload, timeout)`. Returns the Push instance so callers can attach receive() handlers and observe the reply / timeout. Raises if called before #subscribe.
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/supabase/realtime/channel.rb', line 202 def push_event(event, payload = {}, timeout: nil) unless @joined_once raise Errors::RealtimeError, "tried to push '#{event}' to '#{@topic}' before joining. Call subscribe() first." end ref = @socket&.next_ref push = Push.new(self, event, payload, ref: ref, timeout: timeout || Types::DEFAULT_TIMEOUT_SECONDS) send_push(push, register_pending: true) push end |
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions.
74 75 76 77 78 79 80 81 82 83 |
# File 'lib/supabase/realtime/channel.rb', line 74 def rejoin return unless @joined_once @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings @join_push.instance_variable_set(:@ref, @socket&.next_ref) @join_push.instance_variable_set(:@received_status, nil) send_push(@join_push, register_pending: true) self end |
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message. The server will forward it to other subscribers of the same topic.
173 174 175 176 177 178 179 |
# File 'lib/supabase/realtime/channel.rb', line 173 def send_broadcast(event, payload = {}) push = Push.new(self, Types::ChannelEvents::BROADCAST, { "type" => "broadcast", "event" => event, "payload" => payload }) send_push(push, register_pending: false) self end |
#subscribe(&block) ⇒ Object
Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/supabase/realtime/channel.rb', line 59 def subscribe(&block) raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once @joined_once = true @subscribe_callback = block @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings @join_push.instance_variable_set(:@ref, @socket&.next_ref) send_push(@join_push, register_pending: true) self end |
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
182 183 184 185 186 187 188 |
# File 'lib/supabase/realtime/channel.rb', line 182 def track(payload) push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "track", "payload" => payload }) send_push(push, register_pending: false) self end |
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/supabase/realtime/channel.rb', line 89 def unsubscribe return self if closed? @state = Types::ChannelStates::LEAVING ref = @socket&.next_ref leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref) leave_push .receive(Types::AckStatus::OK) { |_| on_leave_ack } .receive(Types::AckStatus::ERROR) { |_| on_leave_ack } .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack } send_push(leave_push, register_pending: true) self end |
#untrack ⇒ Object
190 191 192 193 194 195 196 |
# File 'lib/supabase/realtime/channel.rb', line 190 def untrack push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "untrack" }) send_push(push, register_pending: false) self end |