Class: Supabase::Realtime::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/channel.rb

Overview

A topic subscription on a shared Socket connection. Each Channel:

  • tracks its own lifecycle state (closed/joining/joined/leaving/errored)

  • holds the listener callbacks for postgres changes / broadcast / presence / system

  • dispatches inbound messages from the Client to those callbacks

  • owns its Presence sync state

Should be constructed via Supabase::Realtime::Client#channel, not directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, params: nil, socket: nil) ⇒ Channel

Returns a new instance of Channel.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/supabase/realtime/channel.rb', line 21

def initialize(topic, params: nil, socket: nil)
  @topic   = topic
  @params  = params || default_params
  @socket  = socket
  @state   = Types::ChannelStates::CLOSED
  @joined_once = false
  @presence = Presence.new

  @broadcast_callbacks        = []   # [{ event:, callback: }]
  @postgres_changes_callbacks = []   # [{ event:, schema:, table:, filter:, callback: }]
  @system_callbacks           = []
  @close_callbacks            = []
  @error_callbacks            = []

  @pending_pushes = {} # ref => Push, for matching phx_reply
  @push_buffer    = [] # outbound pushes queued while not yet joined

  @join_push = Push.new(self, Types::ChannelEvents::JOIN, @params)
  @subscribe_callback = nil

  @join_push
    .receive(Types::AckStatus::OK)      { |_| on_join_ok }
    .receive(Types::AckStatus::ERROR)   { |p| on_join_error(p) }
    .receive(Types::AckStatus::TIMEOUT) { |_| on_join_timeout }
end

Instance Attribute Details

#join_pushObject (readonly)

Returns the value of attribute join_push.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def join_push
  @join_push
end

#paramsObject (readonly)

Returns the value of attribute params.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def params
  @params
end

#pending_pushesObject (readonly)

Returns the value of attribute pending_pushes.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def pending_pushes
  @pending_pushes
end

#presenceObject (readonly)

Returns the value of attribute presence.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def presence
  @presence
end

#stateObject (readonly)

Returns the value of attribute state.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def state
  @state
end

#topicObject (readonly)

Returns the value of attribute topic.



19
20
21
# File 'lib/supabase/realtime/channel.rb', line 19

def topic
  @topic
end

Instance Method Details

#closed?Boolean

—– State predicates —–

Returns:

  • (Boolean)


49
# File 'lib/supabase/realtime/channel.rb', line 49

def closed?;  @state == Types::ChannelStates::CLOSED;  end

#dispatch(message) ⇒ Object

Route a parsed Message to the appropriate listeners. Returns true if the message belonged to this channel, false otherwise (so the Client knows whether to drop it).



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/supabase/realtime/channel.rb', line 151

def dispatch(message)
  return false unless message.topic == @topic

  case message.event
  when Types::ChannelEvents::REPLY
    dispatch_reply(message)
  when Types::ChannelEvents::POSTGRES_CHANGES
    dispatch_postgres_changes(message)
  when Types::ChannelEvents::BROADCAST
    dispatch_broadcast(message)
  when Types::ChannelEvents::PRESENCE_STATE
    @presence.sync_state(message.payload)
  when Types::ChannelEvents::PRESENCE_DIFF
    @presence.sync_diff(message.payload)
  when Types::ChannelEvents::SYSTEM
    @system_callbacks.each { |cb| cb.call(message.payload) }
  when Types::ChannelEvents::CLOSE
    @state = Types::ChannelStates::CLOSED
    @close_callbacks.each { |cb| cb.call(message.payload) }
  when Types::ChannelEvents::ERROR
    @state = Types::ChannelStates::ERRORED
    @error_callbacks.each { |cb| cb.call(message.payload) }
  end

  true
end

#errored?Boolean

Returns:

  • (Boolean)


50
# File 'lib/supabase/realtime/channel.rb', line 50

def errored?; @state == Types::ChannelStates::ERRORED; end

#joined?Boolean

Returns:

  • (Boolean)


51
# File 'lib/supabase/realtime/channel.rb', line 51

def joined?;  @state == Types::ChannelStates::JOINED;  end

#joining?Boolean

Returns:

  • (Boolean)


52
# File 'lib/supabase/realtime/channel.rb', line 52

def joining?; @state == Types::ChannelStates::JOINING; end

#leaving?Boolean

Returns:

  • (Boolean)


53
# File 'lib/supabase/realtime/channel.rb', line 53

def leaving?; @state == Types::ChannelStates::LEAVING; end

#on_broadcast(event, &block) ⇒ Object



97
98
99
100
# File 'lib/supabase/realtime/channel.rb', line 97

def on_broadcast(event, &block)
  @broadcast_callbacks << { event: event, callback: block }
  self
end

#on_close(&block) ⇒ Object



107
108
109
110
# File 'lib/supabase/realtime/channel.rb', line 107

def on_close(&block)
  @close_callbacks << block
  self
end

#on_error(&block) ⇒ Object



112
113
114
115
# File 'lib/supabase/realtime/channel.rb', line 112

def on_error(&block)
  @error_callbacks << block
  self
end

#on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block) ⇒ Object

Register a postgres-changes listener. event may be “INSERT”, “UPDATE”, “DELETE”, or “*” for all three. schema/table/filter narrow which rows fire the callback. Returns self so calls chain.



86
87
88
89
90
91
92
93
94
95
# File 'lib/supabase/realtime/channel.rb', line 86

def on_postgres_changes(event, schema: nil, table: nil, filter: nil, &block)
  unless %w[INSERT UPDATE DELETE *].include?(event)
    raise ArgumentError, "postgres_changes event must be INSERT/UPDATE/DELETE/*"
  end

  @postgres_changes_callbacks << {
    event: event, schema: schema, table: table, filter: filter, callback: block
  }
  self
end

#on_system(&block) ⇒ Object



102
103
104
105
# File 'lib/supabase/realtime/channel.rb', line 102

def on_system(&block)
  @system_callbacks << block
  self
end

#send_broadcast(event, payload = {}) ⇒ Object

Send a custom broadcast message. The server will forward it to other subscribers of the same topic.



121
122
123
124
125
126
127
# File 'lib/supabase/realtime/channel.rb', line 121

def send_broadcast(event, payload = {})
  push = Push.new(self,
                  Types::ChannelEvents::BROADCAST,
                  { "type" => "broadcast", "event" => event, "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#subscribe(&block) ⇒ Object

Start the join handshake. Optional block fires when the join completes, receiving the SubscribeStates value (SUBSCRIBED / CHANNEL_ERROR / TIMED_OUT).



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/supabase/realtime/channel.rb', line 59

def subscribe(&block)
  raise Errors::AlreadyJoinedError, "subscribe can only be called once per channel" if @joined_once

  @joined_once = true
  @subscribe_callback = block
  @state = Types::ChannelStates::JOINING

  @join_push.instance_variable_set(:@ref, @socket&.next_ref)
  send_push(@join_push, register_pending: true)
  self
end

#track(payload) ⇒ Object

Track the local user in the channel’s presence state.



130
131
132
133
134
135
136
# File 'lib/supabase/realtime/channel.rb', line 130

def track(payload)
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "track", "payload" => payload })
  send_push(push, register_pending: false)
  self
end

#unsubscribeObject

Tear down the subscription with a phx_leave push.



72
73
74
75
76
77
78
79
# File 'lib/supabase/realtime/channel.rb', line 72

def unsubscribe
  @state = Types::ChannelStates::LEAVING
  ref = @socket&.next_ref
  leave_push = Push.new(self, Types::ChannelEvents::LEAVE, {}, ref: ref)
  send_push(leave_push, register_pending: false)
  @state = Types::ChannelStates::CLOSED
  self
end

#untrackObject



138
139
140
141
142
143
144
# File 'lib/supabase/realtime/channel.rb', line 138

def untrack
  push = Push.new(self,
                  Types::ChannelEvents::PRESENCE,
                  { "type" => "presence", "event" => "untrack" })
  send_push(push, register_pending: false)
  self
end