Class: Supabase::Realtime::Presence

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/presence.rb

Overview

Tracks presence state for one channel and implements the Phoenix Presence sync algorithm. Mirrors supabase-py’s AsyncRealtimePresence: raw ‘{ key => { “metas” => [{ “phx_ref” => …, … }] } }` wire payloads are transformed to a flat `{ key => [{ “presence_ref” => …, … }, …] }` shape before being stored or emitted, so listener callbacks receive `(key, current_presences, new_presences)` with `presence_ref` keys.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil) ⇒ Presence

Returns a new instance of Presence.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/supabase/realtime/presence.rb', line 14

def initialize(logger: nil)
  @state = {}
  # Guards every read/write of @state so a reader thread iterating over
  # `presence_state` cannot collide with the realtime read-thread
  # applying inbound presence_state / presence_diff frames. US-007 stress
  # spec demonstrates the bare-Hash version raises "can't add a new key
  # into hash during iteration" under load; with the mutex + snapshot
  # accessor the same scenario stays clean. Callbacks are fanned out
  # AFTER the mutex is released to avoid user code reentering `state`
  # under the same lock.
  @mutex = Mutex.new
  @on_sync_callbacks = []
  @on_join_callbacks = []
  @on_leave_callbacks = []
  @logger = logger
end

Class Method Details

.transform_meta(meta) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/supabase/realtime/presence.rb', line 123

def self.transform_meta(meta)
  meta = meta.dup
  meta.delete("phx_ref_prev")
  if meta.key?("phx_ref")
    ref = meta.delete("phx_ref")
    { "presence_ref" => ref }.merge(meta)
  else
    meta
  end
end

.transform_state(state) ⇒ Object

Convert raw Phoenix wire format ‘{ key => { “metas” => […] } }` to flat `{ key => […, …] }`. Idempotent on already transformed input.



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/supabase/realtime/presence.rb', line 111

def self.transform_state(state)
  new_state = {}
  (state || {}).each do |key, presences|
    new_state[key] = if presences.is_a?(Hash) && presences.key?("metas")
                       presences["metas"].map { |meta| transform_meta(meta) }
                     else
                       Array(presences).map { |meta| transform_meta(meta) }
                     end
  end
  new_state
end

Instance Method Details

#any_callbacks?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/supabase/realtime/presence.rb', line 104

def any_callbacks?
  [@on_sync_callbacks, @on_join_callbacks, @on_leave_callbacks].any? { |list| !list.empty? }
end

#listObject

Flat list of every presence currently tracked.



85
86
87
# File 'lib/supabase/realtime/presence.rb', line 85

def list
  @mutex.synchronize { @state.values.flatten }
end

#on_join(&block) ⇒ Object



94
95
96
97
# File 'lib/supabase/realtime/presence.rb', line 94

def on_join(&block)
  @on_join_callbacks << block
  self
end

#on_leave(&block) ⇒ Object



99
100
101
102
# File 'lib/supabase/realtime/presence.rb', line 99

def on_leave(&block)
  @on_leave_callbacks << block
  self
end

#on_sync(&block) ⇒ Object



89
90
91
92
# File 'lib/supabase/realtime/presence.rb', line 89

def on_sync(&block)
  @on_sync_callbacks << block
  self
end

#stateObject

Snapshot of the current presence state. Returns a shallow dup of the internal hash so callers can iterate safely while the read-thread continues to apply inbound diffs (US-007 thread safety AC).



34
35
36
# File 'lib/supabase/realtime/presence.rb', line 34

def state
  @mutex.synchronize { @state.dup }
end

#sync_diff(raw_diff) ⇒ Object

Subsequent presence_diff messages: apply joins/leaves to the local state. Raw input is transformed before being applied.



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/supabase/realtime/presence.rb', line 72

def sync_diff(raw_diff)
  events = nil
  @mutex.synchronize do
    joins = self.class.transform_state(raw_diff["joins"] || {})
    leaves = self.class.transform_state(raw_diff["leaves"] || {})
    events = apply_sync_diff_locked(joins, leaves)
  end
  fire_events(events)
  fire_sync_callbacks
  state
end

#sync_state(raw_state) ⇒ Object

First snapshot after joining: diff against the (possibly empty) local state and apply the joins/leaves through the same code path as ‘sync_diff`.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/supabase/realtime/presence.rb', line 41

def sync_state(raw_state)
  events = nil
  @mutex.synchronize do
    new_state = self.class.transform_state(raw_state)
    joins = {}
    leaves = @state.reject { |k, _| new_state.key?(k) }

    new_state.each do |key, presences|
      current = @state[key] || []

      if current.any?
        current_refs = current.map { |p| p["presence_ref"] }
        new_refs = presences.map { |p| p["presence_ref"] }
        joined_presences = presences.reject { |p| current_refs.include?(p["presence_ref"]) }
        left_presences = current.reject { |p| new_refs.include?(p["presence_ref"]) }
        joins[key] = joined_presences if joined_presences.any?
        leaves[key] = left_presences if left_presences.any?
      else
        joins[key] = presences
      end
    end

    events = apply_sync_diff_locked(joins, leaves)
  end
  fire_events(events)
  fire_sync_callbacks
  state
end