Class: Supabase::Realtime::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/channel.rb

Overview

A topic subscription on a shared Socket connection. Each Channel:

  • tracks its own lifecycle state (closed/joining/joined/leaving/errored)

  • holds the listener callbacks for postgres changes / broadcast / presence / system

  • dispatches inbound messages from the Client to those callbacks

  • owns its Presence sync state

Should be constructed via Supabase::Realtime::Client#channel, not directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, params: nil, socket: nil) ⇒ Channel

Returns a new instance of Channel.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/supabase/realtime/channel.rb', line 21

def initialize(topic, params: nil, socket: nil)
  @topic   = topic
  @params  = params || default_params
  @socket  = socket
  @state   = Types::ChannelStates::CLOSED
  @joined_once = false
  @presence = Presence.new

  @broadcast_callbacks        = []   # [{ event:, callback: }]
  @postgres_changes_callbacks = []   # [{ event:, schema:, table:, filter:, callback: }]
  @system_callbacks           = []
  @close_callbacks            = []
  @error_callbacks            = []

  @pending_pushes = {} # ref => Push, for matching phx_reply
  @push_buffer    = [] # outbound pushes queued while not yet joined

  @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params)
  @subscribe_callback = nil

  @join_push
    .receive(Types::AckStatus::OK)      { |p| on_join_ok(p) }
    .receive(Types::AckStatus::ERROR)   { |p| on_join_error(p) }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout }
end

Instance Attribute Details

#join_pushObject (readonly)

Returns the value of attribute join_push.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def join_push
  @join_push
end

#paramsObject (readonly)

Returns the value of attribute params.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def params
  @params
end

#pending_pushesObject (readonly)

Returns the value of attribute pending_pushes.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def pending_pushes
  @pending_pushes
end

#presenceObject (readonly)

Returns the value of attribute presence.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def presence
  @presence
end

#stateObject (readonly)

Returns the value of attribute state.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def state
  @state
end

#topicObject (readonly)

Returns the value of attribute topic.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def topic
  @topic
end

Instance Method Details

#closed?Boolean

—– State predicates —–

Returns:

  • (Boolean)


49
# File 'lib/supabase/realtime/channel.rb', line 49

def closed?;  @state == Types::ChannelStates::CLOSED;  end

#dispatch(message) ⇒ Object

Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/supabase/realtime/channel.rb', line 175

def dispatch(message)
  return false unless message.topic == @topic

  case message.event
  when Types::ChannelEvents::REPLY
    dispatch_reply(message)
  when Types::ChannelEvents::POSTGRES_CHANGES
    dispatch_postgres_changes(message)
  when Types::ChannelEvents::BROADCAST
    dispatch_broadcast(message)
  when Types::ChannelEvents::PRESENCE_STATE
    @presence.sync_state(message.payload)
  when Types::ChannelEvents::PRESENCE_DIFF
    @presence.sync_diff(message.payload)
  when Types::ChannelEvents::SYSTEM
    @system_callbacks.each { |cb| cb.call(message.payload) }
  when Types::ChannelEvents::CLOSE
    @state = Types::ChannelStates::CLOSED
    @close_callbacks.each { |cb| cb.call(message.payload) }
  when Types::ChannelEvents::ERROR
    @state = Types::ChannelStates::ERRORED
    @error_callbacks.each { |cb| cb.call(message.payload) }
  end

  true
end

#errored?Boolean

Returns:

  • (Boolean)


50
# File 'lib/supabase/realtime/channel.rb', line 50

def errored?; @state == Types::ChannelStates::ERRORED; end

#joined?Boolean

Returns:

  • (Boolean)


51
# File 'lib/supabase/realtime/channel.rb', line 51

def joined?;  @state == Types::ChannelStates::JOINED;  end

#joining?Boolean

Returns:

  • (Boolean)


52
# File 'lib/supabase/realtime/channel.rb', line 52

def joining?; @state == Types::ChannelStates::JOINING; end

#leaving?Boolean

Returns:

  • (Boolean)


53
# File 'lib/supabase/realtime/channel.rb', line 53

def leaving?; @state == Types::ChannelStates::LEAVING; end

#on_broadcast(event, &block) ⇒ Object



121
122
123
124
# File 'lib/supabase/realtime/channel.rb', line 121

def on_broadcast(event, &block)
  @broadcast_callbacks << { event: event, callback: block }
  self
end

#on_close(&block) ⇒ Object



131
132
133
134
# File 'lib/supabase/realtime/channel.rb', line 131

def on_close(&block)
  @close_callbacks << block
  self
end

#on_error(&block) ⇒ Object



136
137
138
139
# File 'lib/supabase/realtime/channel.rb', line 136

def on_error(&block)
  @error_callbacks << block
  self
end

#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object

Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.



110
111
112
113
114
115
116
117
118
119
# File 'lib/supabase/realtime/channel.rb', line 110

def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block)
  unless %w[INSERT UPDATE DELETE *].include?(event)
    raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*"
  end

  @postgres_changes_callbacks << {
    event: event, schema: schema, table: table, filter: filter, callback: block
  }
  self
end

#on_system(&block) ⇒ Object



126
127
128
129
# File 'lib/supabase/realtime/channel.rb', line 126

def on_system(&block)
  @system_callbacks << block
  self
end

#rejoinObject

Re-issue the join push without resetting @joined_once. Used by the client after a socket reconnect to restore channel subscriptions.



74
75
76
77
78
79
80
81
82
83
# File 'lib/supabase/realtime/channel.rb', line 74

def rejoin
  return unless @joined_once

  @state = Types::ChannelStates::JOINING
  inject_postgres_changes_bindings
  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  @join_push.instance_variable_set(:@received_status, nil)
  send_push(@join_push, register_pending: true)
  self
end

#send_broadcast(event, payload = {}) ⇒ Object

Send a custom broadcast message. The server will forward it to other subscribers of the same topic.



145
146
147
148
149
150
151
# File 'lib/supabase/realtime/channel.rb', line 145

def send_broadcast(event, payload = {})
  push = Push.new(self,
                  Types::ChannelEvents::BROADCAST,
                  { "type" => "broadcast", "event" => event, "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#subscribe(&block) ⇒ Object

Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/supabase/realtime/channel.rb', line 59

def subscribe(&block)
  raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once

  @joined_once = true
  @subscribe_callback = block
  @state = Types::ChannelStates::JOINING

  inject_postgres_changes_bindings
  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  send_push(@join_push, register_pending: true)
  self
end

#track(payload) ⇒ Object

Track the local user in the channel’s presence state.



154
155
156
157
158
159
160
# File 'lib/supabase/realtime/channel.rb', line 154

def track(payload)
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "track", "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#unsubscribeObject

Tear down the subscription with a phx_leave push. State stays in LEAVING until the server acks (or errors / times out) — mirrors phoenix.js and supabase-py so a fast unsubscribe→resubscribe cycle doesn’t race with the server’s reply for the previous join.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/supabase/realtime/channel.rb', line 89

def unsubscribe
  return self if closed?

  @state = Types::ChannelStates::LEAVING
  ref = @socket&.next_ref
  leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref)

  leave_push
    .receive(Types::AckStatus::OK)      { |_| on_leave_ack }
    .receive(Types::AckStatus::ERROR)   { |_| on_leave_ack }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_leave_ack }

  send_push(leave_push, register_pending: true)
  self
end

#untrackObject



162
163
164
165
166
167
168
# File 'lib/supabase/realtime/channel.rb', line 162

def untrack
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "untrack" })
  send_push(push, register_pending: false)
  self
end