Class: Pgbus::Web::Streamer::Heartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/heartbeat.rb

Overview

Periodic maintenance loop for SSE connections. Runs three sweeps on every tick:

1. Write an SSE comment (": heartbeat <epoch>\n\n") to each
   connection. This keeps proxies and load balancers from timing
   out idle HTTP responses; most reverse proxies close HTTP
   responses that sit idle for 30-60s, which would silently drop
   SSE clients.

2. Mark connections that have been idle longer than the
   configured idle_timeout as dead. The StreamEventDispatcher's next pass
   picks them up via its disconnect path.

3. Post a DisconnectMessage for any connection already flagged
   dead (by IoWriter returning :closed / :blocked, or by the
   idle sweep above).

The heartbeat runs on its own dedicated thread because it does blocking writes (via IoWriter with a deadline) and we don’t want to delay the dispatcher. Writes are serialised per-connection by the Connection’s own mutex, so concurrent dispatcher + heartbeat writes are safe.

Instance Method Summary collapse

Constructor Details

#initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) ⇒ Heartbeat

Returns a new instance of Heartbeat.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 29

def initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil)
  @registry = registry
  @queue = dispatch_queue
  @interval = interval
  @idle_timeout = idle_timeout
  @logger = logger
  @clock = clock || -> { ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) }
  @running = false
  @thread = nil
  @wake = ConditionVariable.new
  @wake_mutex = Mutex.new
end

Instance Method Details

#startObject



42
43
44
45
46
47
48
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 42

def start
  return if @running

  @running = true
  @thread = Thread.new { run_loop }
  self
end

#stopObject



50
51
52
53
54
55
56
57
58
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 50

def stop
  return unless @running

  @running = false
  @wake_mutex.synchronize { @wake.broadcast }
  @thread&.join(5)
  @thread = nil
  self
end

#tickObject

Runs a single sweep synchronously. Useful for tests — production code goes through the background thread.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 62

def tick
  now = @clock.call
  presence_conns = []
  @registry.each_connection do |connection|
    if connection.dead?
      # Already dead (e.g. IoWriter returned :closed on a previous
      # dispatcher write). Post the disconnect and skip the rest.
      enqueue_disconnect(connection)
      next
    end

    if connection.idle_for > @idle_timeout
      connection.mark_dead!
      enqueue_disconnect(connection)
      next
    end

    result = connection.write_comment("heartbeat #{now.to_i}")
    if connection.dead? || result != :ok
      enqueue_disconnect(connection)
      next
    end

    presence_conns << connection if presence_member?(connection)
  end

  enqueue_presence_touch(presence_conns)
end