Module: Pgbus::Web::Streamer

Defined in:
lib/pgbus/web/streamer.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/web/streamer/connection.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb

Overview

The worker-local coordinator that owns SSE connections, one PG LISTEN session, and the dispatch/heartbeat threads. Lazily created on the first SSE connection to a Puma worker (or eagerly in tests). There is exactly one Instance per Puma worker process; the module-level accessors memoise it.

This is NOT a Singleton in the GoF sense — tests are free to construct throwaway Instances directly and dependency-inject everything. The ‘current` / `reset!` helpers exist purely so the Rack StreamApp can share an instance across requests within a worker without passing it through every method call.

Defined Under Namespace

Modules: IoWriter Classes: Connection, FalconConnection, Heartbeat, Instance, Listener, Registry, StreamEventDispatcher

Class Method Summary collapse

Class Method Details

.current(**factory_opts) ⇒ Object

Returns the worker-local instance, creating it on first call. ‘factory_opts` are passed to `Instance.new` the first time.



28
29
30
31
32
# File 'lib/pgbus/web/streamer.rb', line 28

def current(**factory_opts)
  @current_mutex.synchronize do
    @current ||= Instance.new(**factory_opts).tap(&:start)
  end
end

.current=(instance) ⇒ Object

Explicitly set the current instance — used by tests and by the Puma plugin to inject a pre-built instance.



36
37
38
# File 'lib/pgbus/web/streamer.rb', line 36

def current=(instance)
  @current_mutex.synchronize { @current = instance }
end

.reset!Object

Tear down the current instance and clear the slot. Called by the Puma shutdown hook (Phase 4.4) and by tests between examples.



42
43
44
45
46
47
48
49
# File 'lib/pgbus/web/streamer.rb', line 42

def reset!
  instance = nil
  @current_mutex.synchronize do
    instance = @current
    @current = nil
  end
  instance&.shutdown!
end