Class: Supabase::Realtime::Push

Inherits:
Object
  • Object
show all
Defined in:
lib/supabase/realtime/push.rb

Overview

One outbound Phoenix push, awaiting a reply. The channel matches incoming phx_reply messages to pushes by ‘ref` and fires the appropriate handler.

‘receive(:ok / :error / :timeout) { |payload| … }` registers handlers before the push is sent, mirroring phoenix.js’s Push API.

Pushes can be given a timeout via ‘start_timeout(seconds)`; if no reply is received within that window the push resolves with AckStatus::TIMEOUT and is removed from the channel’s pending_pushes registry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS) ⇒ Push

Returns a new instance of Push.



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/supabase/realtime/push.rb', line 19

def initialize(channel, event, payload = {}, ref: nil, timeout: Types::DEFAULT_TIMEOUT_SECONDS)
  @channel  = channel
  @event    = event
  @payload  = payload
  @ref      = ref
  @timeout  = timeout
  @handlers = Hash.new { |h, k| h[k] = [] }
  @received_status = nil
  @received_payload = nil
  @timeout_thread = nil
  @mutex = Mutex.new
end

Instance Attribute Details

#eventObject (readonly)

Returns the value of attribute event.



17
18
19
# File 'lib/supabase/realtime/push.rb', line 17

def event
  @event
end

#payloadObject (readonly)

Returns the value of attribute payload.



17
18
19
# File 'lib/supabase/realtime/push.rb', line 17

def payload
  @payload
end

#received_statusObject (readonly)

Returns the value of attribute received_status.



17
18
19
# File 'lib/supabase/realtime/push.rb', line 17

def received_status
  @received_status
end

#refObject (readonly)

Returns the value of attribute ref.



17
18
19
# File 'lib/supabase/realtime/push.rb', line 17

def ref
  @ref
end

Instance Method Details

#cancel_timeoutObject

Cancel the pending timeout (no-op if not started or already resolved).



81
82
83
84
85
86
87
# File 'lib/supabase/realtime/push.rb', line 81

def cancel_timeout
  @mutex.synchronize do
    thread = @timeout_thread
    @timeout_thread = nil
    thread&.kill if thread && thread != Thread.current
  end
end

#receive(status, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/supabase/realtime/push.rb', line 32

def receive(status, &block)
  if @received_status == status
    # Reply already arrived before this handler was attached — fire immediately.
    block.call(@received_payload)
  else
    @handlers[status] << block
  end
  self
end

#resolve(status:, payload:) ⇒ Object

Called by the Channel when a phx_reply with matching ref arrives.



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/supabase/realtime/push.rb', line 43

def resolve(status:, payload:)
  @mutex.synchronize do
    # Idempotent: a late timeout firing after a real ack must not fire
    # callbacks twice. First resolution wins.
    return if @received_status

    @received_status  = status
    @received_payload = payload
  end
  cancel_timeout
  @handlers[status].each { |h| h.call(payload) }
end

#start_timeout(seconds = @timeout) ⇒ Object

Schedule a TIMEOUT resolution if no reply arrives within ‘seconds`. Safe to call multiple times — only the first call schedules.



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/supabase/realtime/push.rb', line 58

def start_timeout(seconds = @timeout)
  @mutex.synchronize do
    return if @timeout_thread
    return if @received_status

    @timeout_thread = Thread.new do
      sleep(seconds)
      time_out
    end
  end
  self
end

#time_outObject

Called when no reply arrives in time, or as an explicit forced-timeout entry point.



73
74
75
76
77
78
# File 'lib/supabase/realtime/push.rb', line 73

def time_out
  resolve(status: Types::AckStatus::TIMEOUT, payload: {})
  # Pending registry lives on the channel — clean up so a late reply
  # doesn't reach a push we've already given up on.
  @channel.send(:remove_pending, @ref) if @channel.respond_to?(:remove_pending, true) && @ref
end