Module: Pgbus::Web::Streamer
- Defined in:
- lib/pgbus/web/streamer.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/web/streamer/connection.rb,
lib/pgbus/web/streamer/stream_counter.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb
Overview
The worker-local coordinator that owns SSE connections, one PG LISTEN session, and the dispatch/heartbeat threads. Lazily created on the first SSE connection to a Puma worker (or eagerly in tests). There is exactly one Instance per Puma worker process; the module-level accessors memoise it.
This is NOT a Singleton in the GoF sense — tests are free to construct throwaway Instances directly and dependency-inject everything. The ‘current` / `reset!` helpers exist purely so the Rack StreamApp can share an instance across requests within a worker without passing it through every method call.
Defined Under Namespace
Modules: IoWriter Classes: Connection, FalconConnection, Heartbeat, Instance, Listener, Registry, StreamCounter, StreamEventDispatcher
Class Method Summary collapse
-
.current(**factory_opts) ⇒ Object
Returns the worker-local instance, creating it on first call.
-
.current=(instance) ⇒ Object
Explicitly set the current instance — used by tests and by the Puma plugin to inject a pre-built instance.
- .reset! ⇒ Object
- .stream_counter ⇒ Object
Class Method Details
.current(**factory_opts) ⇒ Object
Returns the worker-local instance, creating it on first call. ‘factory_opts` are passed to `Instance.new` the first time.
28 29 30 31 32 |
# File 'lib/pgbus/web/streamer.rb', line 28 def current(**factory_opts) @current_mutex.synchronize do @current ||= Instance.new(**factory_opts).tap(&:start) end end |
.current=(instance) ⇒ Object
Explicitly set the current instance — used by tests and by the Puma plugin to inject a pre-built instance.
36 37 38 |
# File 'lib/pgbus/web/streamer.rb', line 36 def current=(instance) @current_mutex.synchronize { @current = instance } end |
.reset! ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/pgbus/web/streamer.rb', line 44 def reset! instance = nil @current_mutex.synchronize do instance = @current @current = nil end instance&.shutdown! end |
.stream_counter ⇒ Object
40 41 42 |
# File 'lib/pgbus/web/streamer.rb', line 40 def stream_counter @current_mutex.synchronize { @current&.stream_counter } end |