Class: Pgbus::Web::Streamer::Heartbeat
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::Heartbeat
- Defined in:
- lib/pgbus/web/streamer/heartbeat.rb
Overview
Periodic maintenance loop for SSE connections. Runs three sweeps on every tick:
1. Write an SSE comment (": heartbeat <epoch>\n\n") to each
connection. This keeps proxies and load balancers from timing
out idle HTTP responses; most reverse proxies close HTTP
responses that sit idle for 30-60s, which would silently drop
SSE clients.
2. Mark connections that have been idle longer than the
configured idle_timeout as dead. The StreamEventDispatcher's next pass
picks them up via its disconnect path.
3. Post a DisconnectMessage for any connection already flagged
dead (by IoWriter returning :closed / :blocked, or by the
idle sweep above).
The heartbeat runs on its own dedicated thread because it does blocking writes (via IoWriter with a deadline) and we don’t want to delay the dispatcher. Writes are serialised per-connection by the Connection’s own mutex, so concurrent dispatcher + heartbeat writes are safe.
Instance Method Summary collapse
-
#initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) ⇒ Heartbeat
constructor
A new instance of Heartbeat.
- #start ⇒ Object
- #stop ⇒ Object
-
#tick ⇒ Object
Runs a single sweep synchronously.
Constructor Details
#initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) ⇒ Heartbeat
Returns a new instance of Heartbeat.
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 29 def initialize(registry:, dispatch_queue:, interval:, idle_timeout:, logger: Pgbus.logger, clock: nil) @registry = registry @queue = dispatch_queue @interval = interval @idle_timeout = idle_timeout @logger = logger @clock = clock || -> { ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) } @running = false @thread = nil @wake = ConditionVariable.new @wake_mutex = Mutex.new end |
Instance Method Details
#start ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 42 def start return if @running @running = true @thread = Thread.new { run_loop } self end |
#stop ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 50 def stop return unless @running @running = false @wake_mutex.synchronize { @wake.broadcast } @thread&.join(5) @thread = nil self end |
#tick ⇒ Object
Runs a single sweep synchronously. Useful for tests — production code goes through the background thread.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/pgbus/web/streamer/heartbeat.rb', line 62 def tick now = @clock.call presence_conns = [] @registry.each_connection do |connection| if connection.dead? # Already dead (e.g. IoWriter returned :closed on a previous # dispatcher write). Post the disconnect and skip the rest. enqueue_disconnect(connection) next end if connection.idle_for > @idle_timeout connection.mark_dead! enqueue_disconnect(connection) next end result = connection.write_comment("heartbeat #{now.to_i}") if connection.dead? || result != :ok enqueue_disconnect(connection) next end presence_conns << connection if presence_member?(connection) end enqueue_presence_touch(presence_conns) end |