Class: Supabase::Realtime::Presence
- Inherits:
-
Object
- Object
- Supabase::Realtime::Presence
- Defined in:
- lib/supabase/realtime/presence.rb
Overview
Tracks presence state for one channel and implements the Phoenix Presence sync algorithm. Mirrors supabase-py’s AsyncRealtimePresence: raw ‘{ key => { “metas” => [{ “phx_ref” => …, … }] } }` wire payloads are transformed to a flat `{ key => [{ “presence_ref” => …, … }, …] }` shape before being stored or emitted, so listener callbacks receive `(key, current_presences, new_presences)` with `presence_ref` keys.
Class Method Summary collapse
- .transform_meta(meta) ⇒ Object
-
.transform_state(state) ⇒ Object
Convert raw Phoenix wire format ‘{ key => { “metas” => […] } }` to flat `{ key => […, …] }`.
Instance Method Summary collapse
- #any_callbacks? ⇒ Boolean
-
#initialize(logger: nil) ⇒ Presence
constructor
A new instance of Presence.
-
#list ⇒ Object
Flat list of every presence currently tracked.
- #on_join(&block) ⇒ Object
- #on_leave(&block) ⇒ Object
- #on_sync(&block) ⇒ Object
-
#state ⇒ Object
Snapshot of the current presence state.
-
#sync_diff(raw_diff) ⇒ Object
Subsequent presence_diff messages: apply joins/leaves to the local state.
-
#sync_state(raw_state) ⇒ Object
First snapshot after joining: diff against the (possibly empty) local state and apply the joins/leaves through the same code path as ‘sync_diff`.
Constructor Details
#initialize(logger: nil) ⇒ Presence
Returns a new instance of Presence.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/supabase/realtime/presence.rb', line 14 def initialize(logger: nil) @state = {} # Guards every read/write of @state so a reader thread iterating over # `presence_state` cannot collide with the realtime read-thread # applying inbound presence_state / presence_diff frames. US-007 stress # spec demonstrates the bare-Hash version raises "can't add a new key # into hash during iteration" under load; with the mutex + snapshot # accessor the same scenario stays clean. Callbacks are fanned out # AFTER the mutex is released to avoid user code reentering `state` # under the same lock. @mutex = Mutex.new @on_sync_callbacks = [] @on_join_callbacks = [] @on_leave_callbacks = [] @logger = logger end |
Class Method Details
.transform_meta(meta) ⇒ Object
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/supabase/realtime/presence.rb', line 123 def self.() = .dup .delete("phx_ref_prev") if .key?("phx_ref") ref = .delete("phx_ref") { "presence_ref" => ref }.merge() else end end |
.transform_state(state) ⇒ Object
Convert raw Phoenix wire format ‘{ key => { “metas” => […] } }` to flat `{ key => […, …] }`. Idempotent on already transformed input.
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/supabase/realtime/presence.rb', line 111 def self.transform_state(state) new_state = {} (state || {}).each do |key, presences| new_state[key] = if presences.is_a?(Hash) && presences.key?("metas") presences["metas"].map { || () } else Array(presences).map { || () } end end new_state end |
Instance Method Details
#any_callbacks? ⇒ Boolean
104 105 106 |
# File 'lib/supabase/realtime/presence.rb', line 104 def any_callbacks? [@on_sync_callbacks, @on_join_callbacks, @on_leave_callbacks].any? { |list| !list.empty? } end |
#list ⇒ Object
Flat list of every presence currently tracked.
85 86 87 |
# File 'lib/supabase/realtime/presence.rb', line 85 def list @mutex.synchronize { @state.values.flatten } end |
#on_join(&block) ⇒ Object
94 95 96 97 |
# File 'lib/supabase/realtime/presence.rb', line 94 def on_join(&block) @on_join_callbacks << block self end |
#on_leave(&block) ⇒ Object
99 100 101 102 |
# File 'lib/supabase/realtime/presence.rb', line 99 def on_leave(&block) @on_leave_callbacks << block self end |
#on_sync(&block) ⇒ Object
89 90 91 92 |
# File 'lib/supabase/realtime/presence.rb', line 89 def on_sync(&block) @on_sync_callbacks << block self end |
#state ⇒ Object
Snapshot of the current presence state. Returns a shallow dup of the internal hash so callers can iterate safely while the read-thread continues to apply inbound diffs (US-007 thread safety AC).
34 35 36 |
# File 'lib/supabase/realtime/presence.rb', line 34 def state @mutex.synchronize { @state.dup } end |
#sync_diff(raw_diff) ⇒ Object
Subsequent presence_diff messages: apply joins/leaves to the local state. Raw input is transformed before being applied.
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/supabase/realtime/presence.rb', line 72 def sync_diff(raw_diff) events = nil @mutex.synchronize do joins = self.class.transform_state(raw_diff["joins"] || {}) leaves = self.class.transform_state(raw_diff["leaves"] || {}) events = apply_sync_diff_locked(joins, leaves) end fire_events(events) fire_sync_callbacks state end |
#sync_state(raw_state) ⇒ Object
First snapshot after joining: diff against the (possibly empty) local state and apply the joins/leaves through the same code path as ‘sync_diff`.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/supabase/realtime/presence.rb', line 41 def sync_state(raw_state) events = nil @mutex.synchronize do new_state = self.class.transform_state(raw_state) joins = {} leaves = @state.reject { |k, _| new_state.key?(k) } new_state.each do |key, presences| current = @state[key] || [] if current.any? current_refs = current.map { |p| p["presence_ref"] } new_refs = presences.map { |p| p["presence_ref"] } joined_presences = presences.reject { |p| current_refs.include?(p["presence_ref"]) } left_presences = current.reject { |p| new_refs.include?(p["presence_ref"]) } joins[key] = joined_presences if joined_presences.any? leaves[key] = left_presences if left_presences.any? else joins[key] = presences end end events = apply_sync_diff_locked(joins, leaves) end fire_events(events) fire_sync_callbacks state end |