Class: Pgbus::Web::Streamer::Heartbeat
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::Heartbeat
- Defined in:
- lib/pgbus/web/streamer/heartbeat.rb
Overview
Periodic maintenance loop for SSE connections. Runs three sweeps on every tick:
1. Write an SSE comment (": heartbeat <epoch>\n\n") to each
connection. This keeps proxies and load balancers from timing
out idle HTTP responses; most reverse proxies close HTTP
responses that sit idle for 30-60s, which would silently drop
SSE clients.
2. Mark connections that have been idle longer than the
configured idle_timeout as dead. The StreamEventDispatcher's next pass
picks them up via its disconnect path.
3. Post a DisconnectMessage for any connection already flagged
dead (by IoWriter returning :closed / :blocked, or by the
idle sweep above).
The heartbeat runs on its own dedicated thread because it does blocking writes (via IoWriter with a deadline) and we don’t want to delay the dispatcher. Writes are serialised per-connection by the Connection’s own mutex, so concurrent dispatcher + heartbeat writes are safe.
Instance Method Summary collapse
-
#initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) ⇒ Heartbeat
constructor
A new instance of Heartbeat.
- #start ⇒ Object
- #stop ⇒ Object
-
#tick ⇒ Object
Runs a single sweep synchronously.
Constructor Details
#initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) ⇒ Heartbeat
Returns a new instance of Heartbeat.
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 29 def initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) @registry = registry @queue = dispatch_queue @interval = interval @idle_timeout = idle_timeout @logger = logger @clock = clock || -> { ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) } @running = false @thread = nil @wake = ConditionVariable.new @wake_mutex = Mutex.new end |
Instance Method Details
#start ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 42 def start return if @running @running = true @thread = Thread.new { run_loop } self end |
#stop ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 50 def stop return unless @running @running = false @wake_mutex.synchronize { @wake.broadcast } @thread&.join(5) @thread = nil self end |
#tick ⇒ Object
Runs a single sweep synchronously. Useful for tests — production code goes through the background thread.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 62 def tick now = @clock.call @registry.each_connection do |connection| if connection.dead? # Already dead (e.g. IoWriter returned :closed on a previous # dispatcher write). Post the disconnect and skip the rest. enqueue_disconnect(connection) next end if connection.idle_for > @idle_timeout connection.mark_dead! enqueue_disconnect(connection) next end result = connection.write_comment("heartbeat #{now.to_i}") enqueue_disconnect(connection) if connection.dead? || result != :ok end end |