Class: Supabase::Realtime::Channel
- Inherits:
-
Object
- Object
- Supabase::Realtime::Channel
- Defined in:
- lib/supabase/realtime/channel.rb
Overview
A topic subscription on a shared Socket connection. Each Channel:
-
tracks its own lifecycle state (closed/joining/joined/leaving/errored)
-
holds the listener callbacks for postgres changes / broadcast / presence / system
-
dispatches inbound messages from the Client to those callbacks
-
owns its Presence sync state
Should be constructed via Supabase::Realtime::Client#channel, not directly.
Instance Attribute Summary collapse
-
#join_push ⇒ Object
readonly
Returns the value of attribute join_push.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#pending_pushes ⇒ Object
readonly
Returns the value of attribute pending_pushes.
-
#presence ⇒ Object
readonly
Returns the value of attribute presence.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
—– State predicates —–.
-
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners.
- #errored? ⇒ Boolean
-
#initialize(topic, params: nil, socket: nil) ⇒ Channel
constructor
A new instance of Channel.
- #joined? ⇒ Boolean
- #joining? ⇒ Boolean
- #leaving? ⇒ Boolean
- #on_broadcast(event, &block) ⇒ Object
- #on_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
-
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener.
- #on_system(&block) ⇒ Object
-
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once.
-
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message.
-
#subscribe(&block) ⇒ Object
Start the join handshake.
-
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
-
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push.
- #untrack ⇒ Object
Constructor Details
#initialize(topic, params: nil, socket: nil) ⇒ Channel
Returns a new instance of Channel.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/supabase/realtime/channel.rb', line 21 def initialize(topic, params: nil, socket: nil) @topic = topic @params = params || default_params @socket = socket @state = Types::ChannelStates::CLOSED @joined_once = false @presence = Presence.new @broadcast_callbacks = [] # [{ event:, callback: }] @postgres_changes_callbacks = [] # [{ event:, schema:, table:, filter:, callback: }] @system_callbacks = [] @close_callbacks = [] @error_callbacks = [] @pending_pushes = {} # ref => Push, for matching phx_reply @push_buffer = [] # outbound pushes queued while not yet joined @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params) @subscribe_callback = nil @join_push .receive(Types::AckStatus::OK) { |p| on_join_ok(p) } .receive(Types::AckStatus::ERROR) { |p| on_join_error(p) } .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout } end |
Instance Attribute Details
#join_push ⇒ Object (readonly)
Returns the value of attribute join_push.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def join_push @join_push end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def params @params end |
#pending_pushes ⇒ Object (readonly)
Returns the value of attribute pending_pushes.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def pending_pushes @pending_pushes end |
#presence ⇒ Object (readonly)
Returns the value of attribute presence.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def presence @presence end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def state @state end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def topic @topic end |
Instance Method Details
#closed? ⇒ Boolean
—– State predicates —–
49 |
# File 'lib/supabase/realtime/channel.rb', line 49 def closed?; @state == Types::ChannelStates::CLOSED; end |
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/supabase/realtime/channel.rb', line 175 def dispatch() return false unless .topic == @topic case .event when Types::ChannelEvents::REPLY dispatch_reply() when Types::ChannelEvents::POSTGRES_CHANGES dispatch_postgres_changes() when Types::ChannelEvents::BROADCAST dispatch_broadcast() when Types::ChannelEvents::PRESENCE_STATE @presence.sync_state(.payload) when Types::ChannelEvents::PRESENCE_DIFF @presence.sync_diff(.payload) when Types::ChannelEvents::SYSTEM @system_callbacks.each { |cb| cb.call(.payload) } when Types::ChannelEvents::CLOSE @state = Types::ChannelStates::CLOSED @close_callbacks.each { |cb| cb.call(.payload) } when Types::ChannelEvents::ERROR @state = Types::ChannelStates::ERRORED @error_callbacks.each { |cb| cb.call(.payload) } end true end |
#errored? ⇒ Boolean
50 |
# File 'lib/supabase/realtime/channel.rb', line 50 def errored?; @state == Types::ChannelStates::ERRORED; end |
#joined? ⇒ Boolean
51 |
# File 'lib/supabase/realtime/channel.rb', line 51 def joined?; @state == Types::ChannelStates::JOINED; end |
#joining? ⇒ Boolean
52 |
# File 'lib/supabase/realtime/channel.rb', line 52 def joining?; @state == Types::ChannelStates::JOINING; end |
#leaving? ⇒ Boolean
53 |
# File 'lib/supabase/realtime/channel.rb', line 53 def leaving?; @state == Types::ChannelStates::LEAVING; end |
#on_broadcast(event, &block) ⇒ Object
121 122 123 124 |
# File 'lib/supabase/realtime/channel.rb', line 121 def on_broadcast(event, &block) @broadcast_callbacks << { event: event, callback: block } self end |
#on_close(&block) ⇒ Object
131 132 133 134 |
# File 'lib/supabase/realtime/channel.rb', line 131 def on_close(&block) @close_callbacks << block self end |
#on_error(&block) ⇒ Object
136 137 138 139 |
# File 'lib/supabase/realtime/channel.rb', line 136 def on_error(&block) @error_callbacks << block self end |
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/supabase/realtime/channel.rb', line 110 def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) unless %w[INSERT UPDATE DELETE *].include?(event) raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*" end @postgres_changes_callbacks << { event: event, schema: schema, table: table, filter: filter, callback: block } self end |
#on_system(&block) ⇒ Object
126 127 128 129 |
# File 'lib/supabase/realtime/channel.rb', line 126 def on_system(&block) @system_callbacks << block self end |
#rejoin ⇒ Object
Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions.
74 75 76 77 78 79 80 81 82 83 |
# File 'lib/supabase/realtime/channel.rb', line 74 def rejoin return unless @joined_once @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings @join_push.instance_variable_set(:@ref, @socket&.next_ref) @join_push.instance_variable_set(:@received_status, nil) send_push(@join_push, register_pending: true) self end |
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message. The server will forward it to other subscribers of the same topic.
145 146 147 148 149 150 151 |
# File 'lib/supabase/realtime/channel.rb', line 145 def send_broadcast(event, payload = {}) push = Push.new(self, Types::ChannelEvents::BROADCAST, { "type" => "broadcast", "event" => event, "payload" => payload }) send_push(push, register_pending: false) self end |
#subscribe(&block) ⇒ Object
Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/supabase/realtime/channel.rb', line 59 def subscribe(&block) raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once @joined_once = true @subscribe_callback = block @state = Types::ChannelStates::JOINING inject_postgres_changes_bindings @join_push.instance_variable_set(:@ref, @socket&.next_ref) send_push(@join_push, register_pending: true) self end |
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
154 155 156 157 158 159 160 |
# File 'lib/supabase/realtime/channel.rb', line 154 def track(payload) push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "track", "payload" => payload }) send_push(push, register_pending: false) self end |
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/supabase/realtime/channel.rb', line 89 def unsubscribe return self if closed? @state = Types::ChannelStates::LEAVING ref = @socket&.next_ref leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref) leave_push .receive(Types::AckStatus::OK) { |_| on_leave_ack } .receive(Types::AckStatus::ERROR) { |_| on_leave_ack } .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack } send_push(leave_push, register_pending: true) self end |
#untrack ⇒ Object
162 163 164 165 166 167 168 |
# File 'lib/supabase/realtime/channel.rb', line 162 def untrack push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "untrack" }) send_push(push, register_pending: false) self end |