Class: Pgbus::Web::Streamer::Instance
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::Instance
- Defined in:
- lib/pgbus/web/streamer/instance.rb
Overview
Composes the Streamer’s three background threads (Listener, Dispatcher, Heartbeat) with the shared Registry and dispatch_queue. One Instance per Puma worker. Owns the lifecycle of all three threads and the dedicated PG::Connection for LISTEN.
Lifecycle:
Instance.new(...) — allocates wiring, does NOT start threads
#start — spawns listener/dispatcher/heartbeat in order
#register(conn) — enqueues a ConnectMessage into the dispatch queue
#shutdown! — sends shutdown sentinel to every connection and
stops all threads in reverse order
Dependency injection: every collaborator is constructor-injected so tests can swap in fakes without touching Pgbus.configuration. In production the module-level Streamer.current(…) builds all of the defaults from the configuration.
Instance Attribute Summary collapse
-
#dispatch_queue ⇒ Object
readonly
Returns the value of attribute dispatch_queue.
-
#dispatcher ⇒ Object
readonly
Returns the value of attribute dispatcher.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
-
#listener ⇒ Object
readonly
Returns the value of attribute listener.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
Instance Method Summary collapse
-
#initialize(client: Pgbus.client, config: Pgbus.configuration, pg_connection: nil, logger: Pgbus.logger, registry: nil, dispatch_queue: nil) ⇒ Instance
constructor
A new instance of Instance.
-
#register(connection) ⇒ Object
Enqueue a new SSE client.
-
#shutdown! ⇒ Object
Graceful shutdown for Puma worker restart.
- #start ⇒ Object
Constructor Details
#initialize(client: Pgbus.client, config: Pgbus.configuration, pg_connection: nil, logger: Pgbus.logger, registry: nil, dispatch_queue: nil) ⇒ Instance
Returns a new instance of Instance.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/pgbus/web/streamer/instance.rb', line 25 def initialize( client: Pgbus.client, config: Pgbus.configuration, pg_connection: nil, logger: Pgbus.logger, registry: nil, dispatch_queue: nil ) @client = client @config = config @logger = logger @registry = registry || Registry.new @dispatch_queue = dispatch_queue || Queue.new @pg_connection = pg_connection || build_pg_connection @listener = Listener.new( pg_connection: @pg_connection, dispatch_queue: @dispatch_queue, health_check_ms: @config.streams_listen_health_check_ms, logger: @logger ) @dispatcher = StreamEventDispatcher.new( client: @client, registry: @registry, listener: @listener, dispatch_queue: @dispatch_queue, logger: @logger, config: @config ) @heartbeat = Heartbeat.new( registry: @registry, dispatch_queue: @dispatch_queue, interval: @config.streams_heartbeat_interval, idle_timeout: @config.streams_idle_timeout, logger: @logger ) @started = false @shutdown_mutex = Mutex.new end |
Instance Attribute Details
#dispatch_queue ⇒ Object (readonly)
Returns the value of attribute dispatch_queue.
23 24 25 |
# File 'lib/pgbus/web/streamer/instance.rb', line 23 def dispatch_queue @dispatch_queue end |
#dispatcher ⇒ Object (readonly)
Returns the value of attribute dispatcher.
23 24 25 |
# File 'lib/pgbus/web/streamer/instance.rb', line 23 def dispatcher @dispatcher end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
23 24 25 |
# File 'lib/pgbus/web/streamer/instance.rb', line 23 def heartbeat @heartbeat end |
#listener ⇒ Object (readonly)
Returns the value of attribute listener.
23 24 25 |
# File 'lib/pgbus/web/streamer/instance.rb', line 23 def listener @listener end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
23 24 25 |
# File 'lib/pgbus/web/streamer/instance.rb', line 23 def registry @registry end |
Instance Method Details
#register(connection) ⇒ Object
Enqueue a new SSE client. The dispatcher picks this up on its next iteration and runs the 5-step race-free replay sequence. The StreamApp calls this right after hijacking the socket.
Guarded against the worker-shutdown race: if the request thread arrives here after ‘shutdown!` has flipped @started, we mark the connection dead and bail out instead of enqueueing a ConnectMessage. Otherwise the message would land on a dispatch queue that no one is draining, leaving the socket outside the registry and outside close_all_connections — the client would never see the pgbus:shutdown sentinel.
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/pgbus/web/streamer/instance.rb', line 86 def register(connection) return if connection.dead? @shutdown_mutex.synchronize do unless @started connection.mark_dead! return end @dispatch_queue << StreamEventDispatcher::ConnectMessage.new(connection: connection) end end |
#shutdown! ⇒ Object
Graceful shutdown for Puma worker restart. Order matters:
1. Heartbeat first (stop writing comments to connections we're
about to close)
2. Listener next (stop accepting new NOTIFYs)
3. Dispatcher next (drain the queue; it's now finite because
nothing else writes into it)
4. Send pgbus:shutdown sentinel to every connection and close
their sockets. We do this AFTER stopping the dispatcher so
no one else is writing to these IOs concurrently.
Bounded by the configured write deadline per connection; a dead client drops instantly, a slow one stalls for at most write_deadline_ms.
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/pgbus/web/streamer/instance.rb', line 111 def shutdown! @shutdown_mutex.synchronize do return unless @started @started = false safely { @heartbeat.stop } safely { @listener.stop } safely { @dispatcher.stop } close_all_connections end end |
#start ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/pgbus/web/streamer/instance.rb', line 65 def start return if @started @started = true @listener.start @dispatcher.start @heartbeat.start self end |