Class: Supabase::Realtime::Push
- Inherits:
-
Object
- Object
- Supabase::Realtime::Push
- Defined in:
- lib/supabase/realtime/push.rb
Overview
One outbound Phoenix push, awaiting a reply. The channel matches incoming phx_reply messages to pushes by ‘ref` and fires the appropriate handler.
‘receive(:ok / :error / :timeout) { |payload| … }` registers handlers before the push is sent, mirroring phoenix.js’s Push API.
Pushes can be given a timeout via ‘start_timeout(seconds)`; if no reply is received within that window the push resolves with AckStatus::TIMEOUT and is removed from the channel’s pending_pushes registry.
Instance Attribute Summary collapse
-
#event ⇒ Object
readonly
Returns the value of attribute event.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
-
#received_status ⇒ Object
readonly
Returns the value of attribute received_status.
-
#ref ⇒ Object
readonly
Returns the value of attribute ref.
Instance Method Summary collapse
-
#cancel_timeout ⇒ Object
Cancel the pending timeout (no-op if not started or already resolved).
-
#initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS) ⇒ Push
constructor
A new instance of Push.
- #receive(status, &block) ⇒ Object
-
#resolve(status:, payload:) ⇒ Object
Called by the Channel when a phx_reply with matching ref arrives.
-
#start_timeout(seconds = @timeout) ⇒ Object
Schedule a TIMEOUT resolution if no reply arrives within ‘seconds`.
-
#time_out ⇒ Object
Called when no reply arrives in time, or as an explicit forced-timeout entry point.
Constructor Details
#initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS) ⇒ Push
Returns a new instance of Push.
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/supabase/realtime/push.rb', line 19 def initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS) @channel = channel @event = event @payload = payload @ref = ref @timeout = timeout @handlers = Hash.new { |h, k| h[k] = [] } @received_status = nil @received_payload = nil @timeout_thread = nil @mutex = Mutex.new end |
Instance Attribute Details
#event ⇒ Object (readonly)
Returns the value of attribute event.
17 18 19 |
# File 'lib/supabase/realtime/push.rb', line 17 def event @event end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
17 18 19 |
# File 'lib/supabase/realtime/push.rb', line 17 def payload @payload end |
#received_status ⇒ Object (readonly)
Returns the value of attribute received_status.
17 18 19 |
# File 'lib/supabase/realtime/push.rb', line 17 def received_status @received_status end |
#ref ⇒ Object (readonly)
Returns the value of attribute ref.
17 18 19 |
# File 'lib/supabase/realtime/push.rb', line 17 def ref @ref end |
Instance Method Details
#cancel_timeout ⇒ Object
Cancel the pending timeout (no-op if not started or already resolved).
81 82 83 84 85 86 87 |
# File 'lib/supabase/realtime/push.rb', line 81 def cancel_timeout @mutex.synchronize do thread = @timeout_thread @timeout_thread = nil thread&.kill if thread && thread != Thread.current end end |
#receive(status, &block) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/supabase/realtime/push.rb', line 32 def receive(status, &block) if @received_status == status # Reply already arrived before this handler was attached — fire immediately. block.call(@received_payload) else @handlers[status] << block end self end |
#resolve(status:, payload:) ⇒ Object
Called by the Channel when a phx_reply with matching ref arrives.
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/supabase/realtime/push.rb', line 43 def resolve(status:, payload:) @mutex.synchronize do # Idempotent: a late timeout firing after a real ack must not fire # callbacks twice. First resolution wins. return if @received_status @received_status = status @received_payload = payload end cancel_timeout @handlers[status].each { |h| h.call(payload) } end |
#start_timeout(seconds = @timeout) ⇒ Object
Schedule a TIMEOUT resolution if no reply arrives within ‘seconds`. Safe to call multiple times — only the first call schedules.
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/supabase/realtime/push.rb', line 58 def start_timeout(seconds = @timeout) @mutex.synchronize do return if @timeout_thread return if @received_status @timeout_thread = Thread.new do sleep(seconds) time_out end end self end |
#time_out ⇒ Object
Called when no reply arrives in time, or as an explicit forced-timeout entry point.
73 74 75 76 77 78 |
# File 'lib/supabase/realtime/push.rb', line 73 def time_out resolve(status: Types::AckStatus::TIMEOUT, payload: {}) # Pending registry lives on the channel — clean up so a late reply # doesn't reach a push we've already given up on. @channel.send(:remove_pending, @ref) if @channel.respond_to?(:remove_pending, true) && @ref end |