Module: Pgbus::Web::Streamer

Defined in:
lib/pgbus/web/streamer.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/web/streamer/connection.rb,
lib/pgbus/web/streamer/stream_counter.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb

Overview

The worker-local coordinator that owns SSE connections, one PG LISTEN session, and the dispatch/heartbeat threads. Lazily created on the first SSE connection to a Puma worker (or eagerly in tests). There is exactly one Instance per Puma worker process; the module-level accessors memoise it.

This is NOT a Singleton in the GoF sense — tests are free to construct throwaway Instances directly and dependency-inject everything. The ‘current` / `reset!` helpers exist purely so the Rack StreamApp can share an instance across requests within a worker without passing it through every method call.

Defined Under Namespace

Modules: IoWriter Classes: Connection, FalconConnection, Heartbeat, Instance, Listener, Registry, StreamCounter, StreamEventDispatcher

Class Method Summary collapse

Class Method Details

.current(**factory_opts) ⇒ Object

Returns the worker-local instance, creating it on first call. ‘factory_opts` are passed to `Instance.new` the first time.



28
29
30
31
32
# File 'lib/pgbus/web/streamer.rb', line 28

def current(**factory_opts)
  @current_mutex.synchronize do
    @current ||= Instance.new(**factory_opts).tap(&:start)
  end
end

.current=(instance) ⇒ Object

Explicitly set the current instance — used by tests and by the Puma plugin to inject a pre-built instance.



36
37
38
# File 'lib/pgbus/web/streamer.rb', line 36

def current=(instance)
  @current_mutex.synchronize { @current = instance }
end

.reset!Object



44
45
46
47
48
49
50
51
# File 'lib/pgbus/web/streamer.rb', line 44

def reset!
  instance = nil
  @current_mutex.synchronize do
    instance = @current
    @current = nil
  end
  instance&.shutdown!
end

.stream_counterObject



40
41
42
# File 'lib/pgbus/web/streamer.rb', line 40

def stream_counter
  @current_mutex.synchronize { @current&.stream_counter }
end