Class: Supabase::Realtime::Channel
- Inherits:
-
Object
- Object
- Supabase::Realtime::Channel
- Defined in:
- lib/supabase/realtime/channel.rb
Overview
A topic subscription on a shared Socket connection. Each Channel:
-
tracks its own lifecycle state (closed/joining/joined/leaving/errored)
-
holds the listener callbacks for postgres changes / broadcast / presence / system
-
dispatches inbound messages from the Client to those callbacks
-
owns its Presence sync state
Should be constructed via Supabase::Realtime::Client#channel, not directly.
Instance Attribute Summary collapse
-
#join_push ⇒ Object
readonly
Returns the value of attribute join_push.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#pending_pushes ⇒ Object
readonly
Returns the value of attribute pending_pushes.
-
#presence ⇒ Object
readonly
Returns the value of attribute presence.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
—– State predicates —–.
-
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners.
- #errored? ⇒ Boolean
-
#initialize(topic, params: nil, socket: nil) ⇒ Channel
constructor
A new instance of Channel.
- #joined? ⇒ Boolean
- #joining? ⇒ Boolean
- #leaving? ⇒ Boolean
- #on_broadcast(event, &block) ⇒ Object
- #on_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
-
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener.
- #on_system(&block) ⇒ Object
-
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message.
-
#subscribe(&block) ⇒ Object
Start the join handshake.
-
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
-
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push.
- #untrack ⇒ Object
Constructor Details
#initialize(topic, params: nil, socket: nil) ⇒ Channel
Returns a new instance of Channel.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/supabase/realtime/channel.rb', line 21 def initialize(topic, params: nil, socket: nil) @topic = topic @params = params || default_params @socket = socket @state = Types::ChannelStates::CLOSED @joined_once = false @presence = Presence.new @broadcast_callbacks = [] # [{ event:, callback: }] @postgres_changes_callbacks = [] # [{ event:, schema:, table:, filter:, callback: }] @system_callbacks = [] @close_callbacks = [] @error_callbacks = [] @pending_pushes = {} # ref => Push, for matching phx_reply @push_buffer = [] # outbound pushes queued while not yet joined @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params) @subscribe_callback = nil @join_push .receive(Types::AckStatus::OK) { |_| on_join_ok } .receive(Types::AckStatus::ERROR) { |p| on_join_error(p) } .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout } end |
Instance Attribute Details
#join_push ⇒ Object (readonly)
Returns the value of attribute join_push.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def join_push @join_push end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def params @params end |
#pending_pushes ⇒ Object (readonly)
Returns the value of attribute pending_pushes.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def pending_pushes @pending_pushes end |
#presence ⇒ Object (readonly)
Returns the value of attribute presence.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def presence @presence end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def state @state end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
19 20 21 |
# File 'lib/supabase/realtime/channel.rb', line 19 def topic @topic end |
Instance Method Details
#closed? ⇒ Boolean
—– State predicates —–
49 |
# File 'lib/supabase/realtime/channel.rb', line 49 def closed?; @state == Types::ChannelStates::CLOSED; end |
#dispatch(message) ⇒ Object
Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/supabase/realtime/channel.rb', line 151 def dispatch() return false unless .topic == @topic case .event when Types::ChannelEvents::REPLY dispatch_reply() when Types::ChannelEvents::POSTGRES_CHANGES dispatch_postgres_changes() when Types::ChannelEvents::BROADCAST dispatch_broadcast() when Types::ChannelEvents::PRESENCE_STATE @presence.sync_state(.payload) when Types::ChannelEvents::PRESENCE_DIFF @presence.sync_diff(.payload) when Types::ChannelEvents::SYSTEM @system_callbacks.each { |cb| cb.call(.payload) } when Types::ChannelEvents::CLOSE @state = Types::ChannelStates::CLOSED @close_callbacks.each { |cb| cb.call(.payload) } when Types::ChannelEvents::ERROR @state = Types::ChannelStates::ERRORED @error_callbacks.each { |cb| cb.call(.payload) } end true end |
#errored? ⇒ Boolean
50 |
# File 'lib/supabase/realtime/channel.rb', line 50 def errored?; @state == Types::ChannelStates::ERRORED; end |
#joined? ⇒ Boolean
51 |
# File 'lib/supabase/realtime/channel.rb', line 51 def joined?; @state == Types::ChannelStates::JOINED; end |
#joining? ⇒ Boolean
52 |
# File 'lib/supabase/realtime/channel.rb', line 52 def joining?; @state == Types::ChannelStates::JOINING; end |
#leaving? ⇒ Boolean
53 |
# File 'lib/supabase/realtime/channel.rb', line 53 def leaving?; @state == Types::ChannelStates::LEAVING; end |
#on_broadcast(event, &block) ⇒ Object
97 98 99 100 |
# File 'lib/supabase/realtime/channel.rb', line 97 def on_broadcast(event, &block) @broadcast_callbacks << { event: event, callback: block } self end |
#on_close(&block) ⇒ Object
107 108 109 110 |
# File 'lib/supabase/realtime/channel.rb', line 107 def on_close(&block) @close_callbacks << block self end |
#on_error(&block) ⇒ Object
112 113 114 115 |
# File 'lib/supabase/realtime/channel.rb', line 112 def on_error(&block) @error_callbacks << block self end |
#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object
Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/supabase/realtime/channel.rb', line 86 def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) unless %w[INSERT UPDATE DELETE *].include?(event) raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*" end @postgres_changes_callbacks << { event: event, schema: schema, table: table, filter: filter, callback: block } self end |
#on_system(&block) ⇒ Object
102 103 104 105 |
# File 'lib/supabase/realtime/channel.rb', line 102 def on_system(&block) @system_callbacks << block self end |
#send_broadcast(event, payload = {}) ⇒ Object
Send a custom broadcast message. The server will forward it to other subscribers of the same topic.
121 122 123 124 125 126 127 |
# File 'lib/supabase/realtime/channel.rb', line 121 def send_broadcast(event, payload = {}) push = Push.new(self, Types::ChannelEvents::BROADCAST, { "type" => "broadcast", "event" => event, "payload" => payload }) send_push(push, register_pending: false) self end |
#subscribe(&block) ⇒ Object
Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/supabase/realtime/channel.rb', line 59 def subscribe(&block) raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once @joined_once = true @subscribe_callback = block @state = Types::ChannelStates::JOINING @join_push.instance_variable_set(:@ref, @socket&.next_ref) send_push(@join_push, register_pending: true) self end |
#track(payload) ⇒ Object
Track the local user in the channel’s presence state.
130 131 132 133 134 135 136 |
# File 'lib/supabase/realtime/channel.rb', line 130 def track(payload) push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "track", "payload" => payload }) send_push(push, register_pending: false) self end |
#unsubscribe ⇒ Object
Tear down the subscription with a phx_leave push.
72 73 74 75 76 77 78 79 |
# File 'lib/supabase/realtime/channel.rb', line 72 def unsubscribe @state = Types::ChannelStates::LEAVING ref = @socket&.next_ref leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref) send_push(leave_push, register_pending: false) @state = Types::ChannelStates::CLOSED self end |
#untrack ⇒ Object
138 139 140 141 142 143 144 |
# File 'lib/supabase/realtime/channel.rb', line 138 def untrack push = Push.new(self, Types::ChannelEvents::PRESENCE, { "type" => "presence", "event" => "untrack" }) send_push(push, register_pending: false) self end |