Class: Pgbus::Web::Streamer::Heartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/heartbeat.rb

Overview

Periodic maintenance loop for SSE connections. Runs three sweeps on every tick:

1. Write an SSE comment (": heartbeat <epoch>\n\n") to each
   connection. This keeps proxies and load balancers from timing
   out idle HTTP responses; most reverse proxies close HTTP
   responses that sit idle for 30-60s, which would silently drop
   SSE clients.

2. Mark connections that have been idle longer than the
   configured idle_timeout as dead. The StreamEventDispatcher's next pass
   picks them up via its disconnect path.

3. Post a DisconnectMessage for any connection already flagged
   dead (by IoWriter returning :closed / :blocked, or by the
   idle sweep above).

The heartbeat runs on its own dedicated thread because it does blocking writes (via IoWriter with a deadline) and we don’t want to delay the dispatcher. Writes are serialised per-connection by the Connection’s own mutex, so concurrent dispatcher + heartbeat writes are safe.

Instance Method Summary collapse

Constructor Details

#initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) ⇒ Heartbeat

Returns a new instance of Heartbeat.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 29

def initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil)
  @registry = registry
  @queue = dispatch_queue
  @interval = interval
  @idle_timeout = idle_timeout
  @logger = logger
  @clock = clock || -> { ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) }
  @running = false
  @thread = nil
  @wake = ConditionVariable.new
  @wake_mutex = Mutex.new
end

Instance Method Details

#startObject



42
43
44
45
46
47
48
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 42

def start
  return if @running

  @running = true
  @thread = Thread.new { run_loop }
  self
end

#stopObject



50
51
52
53
54
55
56
57
58
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 50

def stop
  return unless @running

  @running = false
  @wake_mutex.synchronize { @wake.broadcast }
  @thread&.join(5)
  @thread = nil
  self
end

#tickObject

Runs a single sweep synchronously. Useful for tests — production code goes through the background thread.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 62

def tick
  now = @clock.call
  @registry.each_connection do |connection|
    if connection.dead?
      # Already dead (e.g. IoWriter returned :closed on a previous
      # dispatcher write). Post the disconnect and skip the rest.
      enqueue_disconnect(connection)
      next
    end

    if connection.idle_for > @idle_timeout
      connection.mark_dead!
      enqueue_disconnect(connection)
      next
    end

    result = connection.write_comment("heartbeat #{now.to_i}")
    enqueue_disconnect(connection) if connection.dead? || result != :ok
  end
end