Class: Supabase::Realtime::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/channel.rb

Overview

A topic subscription on a shared Socket connection. Each Channel:

  • tracks its own lifecycle state (closed/joining/joined/leaving/errored)

  • holds the listener callbacks for postgres changes / broadcast / presence / system

  • dispatches inbound messages from the Client to those callbacks

  • owns its Presence sync state

Should be constructed via Supabase::Realtime::Client#channel, not directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, params: nil, socket: nil) ⇒ Channel

Returns a new instance of Channel.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/supabase/realtime/channel.rb', line 21

def initialize(topic, params: nil, socket: nil)
  @topic   = topic
  @params  = params || default_params
  @socket  = socket
  @state   = Types::ChannelStates::CLOSED
  @joined_once = false
  @presence = Presence.new

  @broadcast_callbacks        = []   # [{ event:, callback: }]
  @postgres_changes_callbacks = []   # [{ event:, schema:, table:, filter:, callback: }]
  @system_callbacks           = []
  @close_callbacks            = []
  @error_callbacks            = []

  @pending_pushes = {} # ref => Push, for matching phx_reply
  @push_buffer    = [] # outbound pushes queued while not yet joined

  @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params)
  @subscribe_callback = nil

  @join_push
    .receive(Types::AckStatus::OK)      { |p| on_join_ok(p) }
    .receive(Types::AckStatus::ERROR)   { |p| on_join_error(p) }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout }
end

Instance Attribute Details

#join_pushObject (readonly)

Returns the value of attribute join_push.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def join_push
  @join_push
end

#paramsObject (readonly)

Returns the value of attribute params.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def params
  @params
end

#pending_pushesObject (readonly)

Returns the value of attribute pending_pushes.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def pending_pushes
  @pending_pushes
end

#presenceObject (readonly)

Returns the value of attribute presence.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def presence
  @presence
end

#stateObject (readonly)

Returns the value of attribute state.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def state
  @state
end

#topicObject (readonly)

Returns the value of attribute topic.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def topic
  @topic
end

Instance Method Details

#closed?Boolean

—– State predicates —–

Returns:

  • (Boolean)


49
# File 'lib/supabase/realtime/channel.rb', line 49

def closed?;  @state == Types::ChannelStates::CLOSED;  end

#dispatch(message) ⇒ Object

Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/supabase/realtime/channel.rb', line 203

def dispatch(message)
  return false unless message.topic == @topic

  case message.event
  when Types::ChannelEvents::REPLY
    dispatch_reply(message)
  when Types::ChannelEvents::POSTGRES_CHANGES
    dispatch_postgres_changes(message)
  when Types::ChannelEvents::BROADCAST
    dispatch_broadcast(message)
  when Types::ChannelEvents::PRESENCE_STATE
    @presence.sync_state(message.payload)
  when Types::ChannelEvents::PRESENCE_DIFF
    @presence.sync_diff(message.payload)
  when Types::ChannelEvents::SYSTEM
    @system_callbacks.each { |cb| cb.call(message.payload) }
  when Types::ChannelEvents::CLOSE
    @state = Types::ChannelStates::CLOSED
    @close_callbacks.each { |cb| cb.call(message.payload) }
  when Types::ChannelEvents::ERROR
    @state = Types::ChannelStates::ERRORED
    @error_callbacks.each { |cb| cb.call(message.payload) }
  end

  true
end

#errored?Boolean

Returns:

  • (Boolean)


50
# File 'lib/supabase/realtime/channel.rb', line 50

def errored?; @state == Types::ChannelStates::ERRORED; end

#joined?Boolean

Returns:

  • (Boolean)


51
# File 'lib/supabase/realtime/channel.rb', line 51

def joined?;  @state == Types::ChannelStates::JOINED;  end

#joining?Boolean

Returns:

  • (Boolean)


52
# File 'lib/supabase/realtime/channel.rb', line 52

def joining?; @state == Types::ChannelStates::JOINING; end

#leaving?Boolean

Returns:

  • (Boolean)


53
# File 'lib/supabase/realtime/channel.rb', line 53

def leaving?; @state == Types::ChannelStates::LEAVING; end

#on_broadcast(event, &block) ⇒ Object



121
122
123
124
# File 'lib/supabase/realtime/channel.rb', line 121

def on_broadcast(event, &block)
  @broadcast_callbacks << { event: event, callback: block }
  self
end

#on_close(&block) ⇒ Object



131
132
133
134
# File 'lib/supabase/realtime/channel.rb', line 131

def on_close(&block)
  @close_callbacks << block
  self
end

#on_error(&block) ⇒ Object



136
137
138
139
# File 'lib/supabase/realtime/channel.rb', line 136

def on_error(&block)
  @error_callbacks << block
  self
end

#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object

Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.



110
111
112
113
114
115
116
117
118
119
# File 'lib/supabase/realtime/channel.rb', line 110

def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block)
  unless %w[INSERT UPDATE DELETE *].include?(event)
    raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*"
  end

  @postgres_changes_callbacks << {
    event: event, schema: schema, table: table, filter: filter, callback: block
  }
  self
end

#on_presence_join(&block) ⇒ Object



152
153
154
155
156
# File 'lib/supabase/realtime/channel.rb', line 152

def on_presence_join(&block)
  @presence.on_join(&block)
  resubscribe_for_presence!
  self
end

#on_presence_leave(&block) ⇒ Object



158
159
160
161
162
# File 'lib/supabase/realtime/channel.rb', line 158

def on_presence_leave(&block)
  @presence.on_leave(&block)
  resubscribe_for_presence!
  self
end

#on_presence_sync(&block) ⇒ Object

Convenience wrappers around the underlying ‘channel.presence` object, matching supabase-py’s ‘channel.on_presence_sync/join/leave` API. If called after the channel is already joined, the channel resubscribes so the server starts forwarding presence events (presence has to be enabled in the join config — see #default_params).



146
147
148
149
150
# File 'lib/supabase/realtime/channel.rb', line 146

def on_presence_sync(&block)
  @presence.on_sync(&block)
  resubscribe_for_presence!
  self
end

#on_system(&block) ⇒ Object



126
127
128
129
# File 'lib/supabase/realtime/channel.rb', line 126

def on_system(&block)
  @system_callbacks << block
  self
end

#presence_stateObject

Shortcut for ‘channel.presence.state` so callers don’t have to drill in.



165
166
167
# File 'lib/supabase/realtime/channel.rb', line 165

def presence_state
  @presence.state
end

#rejoinObject

Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions.



74
75
76
77
78
79
80
81
82
83
# File 'lib/supabase/realtime/channel.rb', line 74

def rejoin
  return unless @joined_once

  @state = Types::ChannelStates::JOINING
  inject_postgres_changes_bindings
  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  @join_push.instance_variable_set(:@received_status, nil)
  send_push(@join_push, register_pending: true)
  self
end

#send_broadcast(event, payload = {}) ⇒ Object

Send a custom broadcast message. The server will forward it to other subscribers of the same topic.



173
174
175
176
177
178
179
# File 'lib/supabase/realtime/channel.rb', line 173

def send_broadcast(event, payload = {})
  push = Push.new(self,
                  Types::ChannelEvents::BROADCAST,
                  { "type" => "broadcast", "event" => event, "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#subscribe(&block) ⇒ Object

Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/supabase/realtime/channel.rb', line 59

def subscribe(&block)
  raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once

  @joined_once = true
  @subscribe_callback = block
  @state = Types::ChannelStates::JOINING

  inject_postgres_changes_bindings
  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  send_push(@join_push, register_pending: true)
  self
end

#track(payload) ⇒ Object

Track the local user in the channel’s presence state.



182
183
184
185
186
187
188
# File 'lib/supabase/realtime/channel.rb', line 182

def track(payload)
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "track", "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#unsubscribeObject

Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/supabase/realtime/channel.rb', line 89

def unsubscribe
  return self if closed?

  @state = Types::ChannelStates::LEAVING
  ref = @socket&.next_ref
  leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref)

  leave_push
    .receive(Types::AckStatus::OK)      { |_| on_leave_ack }
    .receive(Types::AckStatus::ERROR)   { |_| on_leave_ack }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack }

  send_push(leave_push, register_pending: true)
  self
end

#untrackObject



190
191
192
193
194
195
196
# File 'lib/supabase/realtime/channel.rb', line 190

def untrack
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "untrack" })
  send_push(push, register_pending: false)
  self
end