Class: Pgbus::Web::Streamer::StreamEventDispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/stream_event_dispatcher.rb

Overview

The single-threaded consumer of the shared dispatch_queue. Drains three kinds of messages:

- Listener::WakeMessage(queue_name:) — a NOTIFY fired; read_after
  the minimum cursor and fan out to every connection on the stream
  (both registered and in-flight connects).

- ConnectMessage(connection:) — a new SSE client connected. Runs
  the 5-step race-free replay sequence from design doc §6.5:
    1. ensure_listening on the stream (so future WakeMessages
       deliver to the in-flight buffer)
    2. register an in-flight buffer keyed by connection
    3. read_after(connection.since_id) + enqueue to connection
    4. drain the in-flight buffer into the connection (dedup is
       handled by Connection#enqueue's cursor check)
    5. move the connection from in-flight to the main Registry

- DisconnectMessage(connection:) — unregister and, if the stream
  now has zero subscribers, eventually unlisten (lazy GC,
  implemented in the Streamer sweep rather than here).

All state ownership lives on this one thread: the registry is thread-safe (Phase 2.1) but the in-flight buffers are local to the dispatcher and accessed only from this thread, so no locks.

Named StreamEventDispatcher (rather than just “Dispatcher”) to disambiguate from Pgbus::Process::Dispatcher, which is an unrelated worker-side pool coordinator. See issue #98 item 8.

Defined Under Namespace

Classes: ConnectMessage, DisconnectMessage, PresenceTouchMessage, StreamEnvelope

Constant Summary collapse

WakeMessage =
Listener::WakeMessage
DEFAULT_READ_LIMIT =
500

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, registry:, listener:, dispatch_queue:, logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT, filters: nil, config: nil, stream_counter: nil, presence_provider: nil) ⇒ StreamEventDispatcher

Returns a new instance of StreamEventDispatcher.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 68

def initialize(client:, registry:, listener:, dispatch_queue:,
               logger: Pgbus.logger, read_limit: DEFAULT_READ_LIMIT,
               filters: nil, config: nil, stream_counter: nil,
               presence_provider: nil)
  @client = client
  @registry = registry
  @listener = listener
  @queue = dispatch_queue
  @logger = logger
  @read_limit = read_limit
  # Vends a presence handle for a logical stream name. Injected so
  # tests can record join/leave/touch without a DB. Production
  # defaults to the real per-stream Presence via Pgbus.stream.
  @presence_provider = presence_provider || ->(name) { Pgbus.stream(name).presence }
  # Filters default to the process-wide registry so production
  # code picks up whatever was registered at boot. Tests inject
  # a fresh Filters instance to avoid cross-test pollution.
  @filters = filters || Pgbus::Streams.filters
  # Config is injected so the Dispatcher can read
  # `streams_stats_enabled` without reaching into the global
  # Pgbus.configuration at every call site. Tests pass a
  # throwaway config to flip the flag independently of the
  # process-wide setting. Falls back to the global config
  # for production call sites that don't specify one.
  @config = config || Pgbus.configuration
  @stream_counter = stream_counter || StreamCounter.new
  # stream_name → Array<[connection, Array<Envelope>]>
  @in_flight = Hash.new { |h, k| h[k] = [] }
  # PGMQ full table name (pgbus_<prefix>_<name>) → logical stream
  # name. Populated on connect so handle_wake can translate
  # Listener::WakeMessage#queue_name (a full table name, because
  # that's what PG NOTIFY channels carry) into the logical name
  # used by Registry and the in-flight buffer.
  @full_to_logical = {}
  # Per-connection "scanned" cursor — the highest msg_id this
  # Dispatcher has examined for a given connection, whether or
  # not it was actually delivered. Needed because an audience
  # filter can drop an entire read_after batch; without a
  # separate scan cursor the dispatcher would re-read the
  # same hidden window forever and starve later public
  # messages. Connection#last_msg_id_sent still drives the
  # client-visible Last-Event-ID; this cursor only feeds
  # minimum_cursor so subsequent read_after calls advance.
  @scanned_cursor = {}
  # @running is a soft hint, not the authoritative stop signal.
  # The :__stop__ sentinel pushed onto @queue is what actually
  # terminates run_loop — even if a torn read of @running ever
  # happened (it cannot under MRI's GVL for a single-word
  # boolean assignment), the sentinel break would still fire.
  @running = false
  @thread = nil
  @ephemeral_seq = 0
end

Instance Attribute Details

#stream_counterObject (readonly)

Returns the value of attribute stream_counter.



66
67
68
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 66

def stream_counter
  @stream_counter
end

Instance Method Details

#startObject



122
123
124
125
126
127
128
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 122

def start
  return if @running

  @running = true
  @thread = Thread.new { run_loop }
  self
end

#stopObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/pgbus/web/streamer/stream_event_dispatcher.rb', line 130

def stop
  return unless @running

  @running = false
  @queue << :__stop__
  if @thread && @thread.join(5).nil?
    # join returned nil → 5s timeout. The thread is still running
    # (probably blocked inside an unresponsive client write or a
    # slow Postgres query). We log and clear the reference rather
    # than calling Thread#kill, which leaves IO state corrupt.
    # The orphaned thread will exit on its own once the blocking
    # call returns and it sees @running == false on the next loop.
    @logger.warn { "[Pgbus::Streamer::StreamEventDispatcher] thread did not terminate within 5s" }
  end
  @thread = nil
  self
end