Module: Pgbus::Web::Streamer
- Defined in:
- lib/pgbus/web/streamer.rb,
lib/pgbus/web/streamer/instance.rb,
lib/pgbus/web/streamer/listener.rb,
lib/pgbus/web/streamer/registry.rb,
lib/pgbus/web/streamer/heartbeat.rb,
lib/pgbus/web/streamer/io_writer.rb,
lib/pgbus/web/streamer/connection.rb,
lib/pgbus/web/streamer/falcon_connection.rb,
lib/pgbus/web/streamer/stream_event_dispatcher.rb
Overview
The worker-local coordinator that owns SSE connections, one PG LISTEN session, and the dispatch/heartbeat threads. Lazily created on the first SSE connection to a Puma worker (or eagerly in tests). There is exactly one Instance per Puma worker process; the module-level accessors memoise it.
This is NOT a Singleton in the GoF sense — tests are free to construct throwaway Instances directly and dependency-inject everything. The ‘current` / `reset!` helpers exist purely so the Rack StreamApp can share an instance across requests within a worker without passing it through every method call.
Defined Under Namespace
Modules: IoWriter Classes: Connection, FalconConnection, Heartbeat, Instance, Listener, Registry, StreamEventDispatcher
Class Method Summary collapse
-
.current(**factory_opts) ⇒ Object
Returns the worker-local instance, creating it on first call.
-
.current=(instance) ⇒ Object
Explicitly set the current instance — used by tests and by the Puma plugin to inject a pre-built instance.
-
.reset! ⇒ Object
Tear down the current instance and clear the slot.
Class Method Details
.current(**factory_opts) ⇒ Object
Returns the worker-local instance, creating it on first call. ‘factory_opts` are passed to `Instance.new` the first time.
28 29 30 31 32 |
# File 'lib/pgbus/web/streamer.rb', line 28 def current(**factory_opts) @current_mutex.synchronize do @current ||= Instance.new(**factory_opts).tap(&:start) end end |
.current=(instance) ⇒ Object
Explicitly set the current instance — used by tests and by the Puma plugin to inject a pre-built instance.
36 37 38 |
# File 'lib/pgbus/web/streamer.rb', line 36 def current=(instance) @current_mutex.synchronize { @current = instance } end |
.reset! ⇒ Object
Tear down the current instance and clear the slot. Called by the Puma shutdown hook (Phase 4.4) and by tests between examples.
42 43 44 45 46 47 48 49 |
# File 'lib/pgbus/web/streamer.rb', line 42 def reset! instance = nil @current_mutex.synchronize do instance = @current @current = nil end instance&.shutdown! end |