Class: Pgbus::Web::Streamer::StreamEventDispatcher
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::StreamEventDispatcher
- Defined in:
- lib/pgbus/web/streamer/stream_event_dispatcher.rb
Overview
The single-threaded consumer of the shared dispatch_queue. Drains three kinds of messages:
- Listener::WakeMessage(queue_name:) — a NOTIFY fired; read_after
the minimum cursor and fan out to every connection on the stream
(both registered and in-flight connects).
- ConnectMessage(connection:) — a new SSE client connected. Runs
the 5-step race-free replay sequence from design doc §6.5:
1. ensure_listening on the stream (so future WakeMessages
deliver to the in-flight buffer)
2. register an in-flight buffer keyed by connection
3. read_after(connection.since_id) + enqueue to connection
4. drain the in-flight buffer into the connection (dedup is
handled by Connection#enqueue's cursor check)
5. move the connection from in-flight to the main Registry
- DisconnectMessage(connection:) — unregister and, if the stream
now has zero subscribers, eventually unlisten (lazy GC,
implemented in the Streamer sweep rather than here).
All state ownership lives on this one thread: the registry is thread-safe (Phase 2.1) but the in-flight buffers are local to the dispatcher and accessed only from this thread, so no locks.
Named StreamEventDispatcher (rather than just “Dispatcher”) to disambiguate from Pgbus::Process::Dispatcher, which is an unrelated worker-side pool coordinator. See issue #98 item 8.
Defined Under Namespace
Classes: ConnectMessage, DisconnectMessage, StreamEnvelope
Constant Summary collapse
- WakeMessage =
Listener::WakeMessage
- DEFAULT_READ_LIMIT =
500
Instance Method Summary collapse
-
#initialize(client:, registry:, listener:, dispatch_queue:, logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT, filters: nil, config: nil) ⇒ StreamEventDispatcher
constructor
A new instance of StreamEventDispatcher.
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(client:, registry:, listener:, dispatch_queue:, logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT, filters: nil, config: nil) ⇒ StreamEventDispatcher
Returns a new instance of StreamEventDispatcher.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 50 def initialize(client:, registry:, listener:, dispatch_queue:, logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT, filters: nil, config: nil) @client = client @registry = registry @listener = listener @queue = dispatch_queue @logger = logger @read_limit = read_limit # Filters default to the process-wide registry so production # code picks up whatever was registered at boot. Tests inject # a fresh Filters instance to avoid cross-test pollution. @filters = filters || Pgbus::Streams.filters # Config is injected so the Dispatcher can read # `streams_stats_enabled` without reaching into the global # Pgbus.configuration at every call site. Tests pass a # throwaway config to flip the flag independently of the # process-wide setting. Falls back to the global config # for production call sites that don't specify one. @config = config || Pgbus.configuration # stream_name → Array<[connection, Array<Envelope>]> @in_flight = Hash.new { |h, k| h[k] = [] } # PGMQ full table name (pgbus_<prefix>_<name>) → logical stream # name. Populated on connect so handle_wake can translate # Listener::WakeMessage#queue_name (a full table name, because # that's what PG NOTIFY channels carry) into the logical name # used by Registry and the in-flight buffer. @full_to_logical = {} # Per-connection "scanned" cursor — the highest msg_id this # Dispatcher has examined for a given connection, whether or # not it was actually delivered. Needed because an audience # filter can drop an entire read_after batch; without a # separate scan cursor the dispatcher would re-read the # same hidden window forever and starve later public # messages. Connection#last_msg_id_sent still drives the # client-visible Last-Event-ID; this cursor only feeds # minimum_cursor so subsequent read_after calls advance. @scanned_cursor = {} # @running is a soft hint, not the authoritative stop signal. # The :__stop__ sentinel pushed onto @queue is what actually # terminates run_loop — even if a torn read of @running ever # happened (it cannot under MRI's GVL for a single-word # boolean assignment), the sentinel break would still fire. @running = false @thread = nil end |
Instance Method Details
#start ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 97 def start return if @running @running = true @thread = Thread.new { run_loop } self end |
#stop ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 105 def stop return unless @running @running = false @queue << :__stop__ if @thread && @thread.join(5).nil? # join returned nil → 5s timeout. The thread is still running # (probably blocked inside an unresponsive client write or a # slow Postgres query). We log and clear the reference rather # than calling Thread#kill, which leaves IO state corrupt. # The orphaned thread will exit on its own once the blocking # call returns and it sees @running == false on the next loop. @logger.warn { "[Pgbus::Streamer::StreamEventDispatcher] thread did not terminate within 5s" } end @thread = nil self end |