Class: Pgbus::Web::Streamer::Instance

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/instance.rb

Overview

Composes the Streamer’s three background threads (Listener, Dispatcher, Heartbeat) with the shared Registry and dispatch_queue. One Instance per Puma worker. Owns the lifecycle of all three threads and the dedicated PG::Connection for LISTEN.

Lifecycle:

Instance.new(...)  — allocates wiring, does NOT start threads
#start             — spawns listener/dispatcher/heartbeat in order
#register(conn)    — enqueues a ConnectMessage into the dispatch queue
#shutdown!         — sends shutdown sentinel to every connection and
                     stops all threads in reverse order

Dependency injection: every collaborator is constructor-injected so tests can swap in fakes without touching Pgbus.configuration. In production the module-level Streamer.current(…) builds all of the defaults from the configuration.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client: Pgbus.client, config: Pgbus.configuration, pg_connection: nil, logger: Pgbus.logger, registry: nil, dispatch_queue: nil) ⇒ Instance

Returns a new instance of Instance.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pgbus/web/streamer/instance.rb', line 25

def initialize(
  client: Pgbus.client,
  config: Pgbus.configuration,
  pg_connection: nil,
  logger: Pgbus.logger,
  registry: nil,
  dispatch_queue: nil
)
  @client = client
  @config = config
  @logger = logger
  @registry = registry || Registry.new
  @dispatch_queue = dispatch_queue || Queue.new

  @pg_connection = pg_connection || build_pg_connection
  @listener = Listener.new(
    pg_connection: @pg_connection,
    dispatch_queue: @dispatch_queue,
    health_check_ms: @config.streams_listen_health_check_ms,
    logger: @logger
  )
  @dispatcher = StreamEventDispatcher.new(
    client: @client,
    registry: @registry,
    listener: @listener,
    dispatch_queue: @dispatch_queue,
    logger: @logger,
    config: @config
  )
  @heartbeat = Heartbeat.new(
    registry: @registry,
    dispatch_queue: @dispatch_queue,
    interval: @config.streams_heartbeat_interval,
    idle_timeout: @config.streams_idle_timeout,
    logger: @logger
  )
  @started = false
  @shutdown_mutex = Mutex.new
end

Instance Attribute Details

#dispatch_queueObject (readonly)

Returns the value of attribute dispatch_queue.



23
24
25
# File 'lib/pgbus/web/streamer/instance.rb', line 23

def dispatch_queue
  @dispatch_queue
end

#dispatcherObject (readonly)

Returns the value of attribute dispatcher.



23
24
25
# File 'lib/pgbus/web/streamer/instance.rb', line 23

def dispatcher
  @dispatcher
end

#heartbeatObject (readonly)

Returns the value of attribute heartbeat.



23
24
25
# File 'lib/pgbus/web/streamer/instance.rb', line 23

def heartbeat
  @heartbeat
end

#listenerObject (readonly)

Returns the value of attribute listener.



23
24
25
# File 'lib/pgbus/web/streamer/instance.rb', line 23

def listener
  @listener
end

#registryObject (readonly)

Returns the value of attribute registry.



23
24
25
# File 'lib/pgbus/web/streamer/instance.rb', line 23

def registry
  @registry
end

Instance Method Details

#register(connection) ⇒ Object

Enqueue a new SSE client. The dispatcher picks this up on its next iteration and runs the 5-step race-free replay sequence. The StreamApp calls this right after hijacking the socket.

Guarded against the worker-shutdown race: if the request thread arrives here after ‘shutdown!` has flipped @started, we mark the connection dead and bail out instead of enqueueing a ConnectMessage. Otherwise the message would land on a dispatch queue that no one is draining, leaving the socket outside the registry and outside close_all_connections — the client would never see the pgbus:shutdown sentinel.



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/pgbus/web/streamer/instance.rb', line 86

def register(connection)
  return if connection.dead?

  @shutdown_mutex.synchronize do
    unless @started
      connection.mark_dead!
      return
    end

    @dispatch_queue << StreamEventDispatcher::ConnectMessage.new(connection: connection)
  end
end

#shutdown!Object

Graceful shutdown for Puma worker restart. Order matters:

1. Heartbeat first (stop writing comments to connections we're
   about to close)
2. Listener next (stop accepting new NOTIFYs)
3. Dispatcher next (drain the queue; it's now finite because
   nothing else writes into it)
4. Send pgbus:shutdown sentinel to every connection and close
   their sockets. We do this AFTER stopping the dispatcher so
   no one else is writing to these IOs concurrently.

Bounded by the configured write deadline per connection; a dead client drops instantly, a slow one stalls for at most write_deadline_ms.



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/pgbus/web/streamer/instance.rb', line 111

def shutdown!
  @shutdown_mutex.synchronize do
    return unless @started

    @started = false
    safely { @heartbeat.stop }
    safely { @listener.stop }
    safely { @dispatcher.stop }
    close_all_connections
  end
end

#startObject



65
66
67
68
69
70
71
72
73
# File 'lib/pgbus/web/streamer/instance.rb', line 65

def start
  return if @started

  @started = true
  @listener.start
  @dispatcher.start
  @heartbeat.start
  self
end