Class: Supabase::Realtime::Push

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/push.rb

Overview

One outbound Phoenix push, awaiting a reply. The channel matches incoming phx_reply messages to pushes by ‘ref` and fires the appropriate handler.

‘receive(:ok / :error / :timeout) { |payload| … }` registers handlers before the push is sent, mirroring phoenix.js’s Push API.

Pushes can be given a timeout via ‘start_timeout(seconds)`; if no reply is received within that window the push resolves with AckStatus::TIMEOUT and is removed from the channel’s pending_pushes registry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS) ⇒ Push

Returns a new instance of Push.



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/supabase/realtime/push.rb', line 20

def initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS)
  @channel  = channel
  @event    = event
  @payload  = payload
  @ref      = ref
  @timeout  = timeout
  @handlers = Hash.new { |h, k| h[k] = [] }
  @received_status = nil
  @received_payload = nil
  @timeout_thread = nil
  @mutex = Mutex.new
end

Instance Attribute Details

#eventObject (readonly)

Returns the value of attribute event.



18
19
20
# File 'lib/supabase/realtime/push.rb', line 18

def event
  @event
end

#payloadObject (readonly)

Returns the value of attribute payload.



18
19
20
# File 'lib/supabase/realtime/push.rb', line 18

def payload
  @payload
end

#received_statusObject (readonly)

Returns the value of attribute received_status.



18
19
20
# File 'lib/supabase/realtime/push.rb', line 18

def received_status
  @received_status
end

#refObject (readonly)

Returns the value of attribute ref.



18
19
20
# File 'lib/supabase/realtime/push.rb', line 18

def ref
  @ref
end

Instance Method Details

#cancel_timeoutObject

Cancel the pending timeout (no-op if not started or already resolved).



84
85
86
87
88
89
90
# File 'lib/supabase/realtime/push.rb', line 84

def cancel_timeout
  @mutex.synchronize do
    thread = @timeout_thread
    @timeout_thread = nil
    thread&.kill if thread && thread != Thread.current
  end
end

#receive(status, &block) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/supabase/realtime/push.rb', line 33

def receive(status, &block)
  if @received_status == status
    # Reply already arrived before this handler was attached — fire immediately.
    CallbackSafety.safe(logger, "push_receive:#{status}") { block.call(@received_payload) }
  else
    @handlers[status] << block
  end
  self
end

#resolve(status:, payload:) ⇒ Object

Called by the Channel when a phx_reply with matching ref arrives.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/supabase/realtime/push.rb', line 44

def resolve(status:, payload:)
  @mutex.synchronize do
    # Idempotent: a late timeout firing after a real ack must not fire
    # callbacks twice. First resolution wins.
    return if @received_status

    @received_status  = status
    @received_payload = payload
  end
  cancel_timeout
  @handlers[status].each do |h|
    CallbackSafety.safe(logger, "push_receive:#{status}") { h.call(payload) }
  end
end

#start_timeout(seconds = @timeout) ⇒ Object

Schedule a TIMEOUT resolution if no reply arrives within ‘seconds`. Safe to call multiple times — only the first call schedules.



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/supabase/realtime/push.rb', line 61

def start_timeout(seconds = @timeout)
  @mutex.synchronize do
    return if @timeout_thread
    return if @received_status

    @timeout_thread = Thread.new do
      sleep(seconds)
      time_out
    end
  end
  self
end

#time_outObject

Called when no reply arrives in time, or as an explicit forced-timeout entry point.



76
77
78
79
80
81
# File 'lib/supabase/realtime/push.rb', line 76

def time_out
  resolve(status: Types::AckStatus::TIMEOUT, payload: {})
  # Pending registry lives on the channel — clean up so a late reply
  # doesn't reach a push we've already given up on.
  @channel.send(:remove_pending, @ref) if @channel.respond_to?(:remove_pending, true) && @ref
end