Class: Pgbus::Web::Streamer::StreamEventDispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/stream_event_dispatcher.rb

Overview

The single-threaded consumer of the shared dispatch_queue. Drains three kinds of messages:

- Listener::WakeMessage(queue_name:) — a NOTIFY fired; read_after
  the minimum cursor and fan out to every connection on the stream
  (both registered and in-flight connects).

- ConnectMessage(connection:) — a new SSE client connected. Runs
  the 5-step race-free replay sequence from design doc §6.5:
    1. ensure_listening on the stream (so future WakeMessages
       deliver to the in-flight buffer)
    2. register an in-flight buffer keyed by connection
    3. read_after(connection.since_id) + enqueue to connection
    4. drain the in-flight buffer into the connection (dedup is
       handled by Connection#enqueue's cursor check)
    5. move the connection from in-flight to the main Registry

- DisconnectMessage(connection:) — unregister and, if the stream
  now has zero subscribers, eventually unlisten (lazy GC,
  implemented in the Streamer sweep rather than here).

All state ownership lives on this one thread: the registry is thread-safe (Phase 2.1) but the in-flight buffers are local to the dispatcher and accessed only from this thread, so no locks.

Named StreamEventDispatcher (rather than just “Dispatcher”) to disambiguate from Pgbus::Process::Dispatcher, which is an unrelated worker-side pool coordinator. See issue #98 item 8.

Defined Under Namespace

Classes: ConnectMessage, DisconnectMessage, StreamEnvelope

Constant Summary collapse

WakeMessage =
Listener::WakeMessage
DEFAULT_READ_LIMIT =
500

Instance Method Summary collapse

Constructor Details

#initialize(client:, registry:, listener:, dispatch_queue:, logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT, filters: nil, config: nil) ⇒ StreamEventDispatcher

Returns a new instance of StreamEventDispatcher.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 50

def initialize(client:, registry:, listener:, dispatch_queue:,
               logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT,
               filters: nil, config: nil)
  @client = client
  @registry = registry
  @listener = listener
  @queue = dispatch_queue
  @logger = logger
  @read_limit = read_limit
  # Filters default to the process-wide registry so production
  # code picks up whatever was registered at boot. Tests inject
  # a fresh Filters instance to avoid cross-test pollution.
  @filters = filters || Pgbus::Streams.filters
  # Config is injected so the Dispatcher can read
  # `streams_stats_enabled` without reaching into the global
  # Pgbus.configuration at every call site. Tests pass a
  # throwaway config to flip the flag independently of the
  # process-wide setting. Falls back to the global config
  # for production call sites that don't specify one.
  @config = config || Pgbus.configuration
  # stream_name → Array<[connection, Array<Envelope>]>
  @in_flight = Hash.new { |h, k| h[k] = [] }
  # PGMQ full table name (pgbus_<prefix>_<name>) → logical stream
  # name. Populated on connect so handle_wake can translate
  # Listener::WakeMessage#queue_name (a full table name, because
  # that's what PG NOTIFY channels carry) into the logical name
  # used by Registry and the in-flight buffer.
  @full_to_logical = {}
  # Per-connection "scanned" cursor — the highest msg_id this
  # Dispatcher has examined for a given connection, whether or
  # not it was actually delivered. Needed because an audience
  # filter can drop an entire read_after batch; without a
  # separate scan cursor the dispatcher would re-read the
  # same hidden window forever and starve later public
  # messages. Connection#last_msg_id_sent still drives the
  # client-visible Last-Event-ID; this cursor only feeds
  # minimum_cursor so subsequent read_after calls advance.
  @scanned_cursor = {}
  # @running is a soft hint, not the authoritative stop signal.
  # The :__stop__ sentinel pushed onto @queue is what actually
  # terminates run_loop — even if a torn read of @running ever
  # happened (it cannot under MRI's GVL for a single-word
  # boolean assignment), the sentinel break would still fire.
  @running = false
  @thread = nil
end

Instance Method Details

#startObject



97
98
99
100
101
102
103
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 97

def start
  return if @running

  @running = true
  @thread = Thread.new { run_loop }
  self
end

#stopObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 105

def stop
  return unless @running

  @running = false
  @queue << :__stop__
  if @thread && @thread.join(5).nil?
    # join returned nil → 5s timeout. The thread is still running
    # (probably blocked inside an unresponsive client write or a
    # slow Postgres query). We log and clear the reference rather
    # than calling Thread#kill, which leaves IO state corrupt.
    # The orphaned thread will exit on its own once the blocking
    # call returns and it sees @running == false on the next loop.
    @logger.warn { "[Pgbus::Streamer::StreamEventDispatcher] thread did not terminate within 5s" }
  end
  @thread = nil
  self
end