Class: Pgbus::Process::NotifyListener
- Inherits:
-
Object
- Object
- Pgbus::Process::NotifyListener
- Defined in:
- lib/pgbus/process/notify_listener.rb
Overview
Owns a single dedicated PG::Connection that LISTENs on the INSERT NOTIFY channel of every queue a Worker/Consumer reads, and fires a WakeSignal the moment any of them receives a row. This converts the worker/consumer loop from “blind-read every polling_interval” into “sleep until a real insert, poll only as a fallback” — eliminating the empty-read storm that dominates DB load on idle queues.
pgmq-ruby’s ‘wait_for_notify(queue, timeout:)` is single-queue and wraps the wait in `with_connection`, which only watches one channel and holds the pooled connection for the whole wait. Neither fits a worker that reads N queues on a small shared pool. So we own ONE raw PG::Connection and hand-roll per-channel LISTEN on it.
A persistent LISTEN connection silently dies under a transaction-pool PgBouncer (LISTEN does not survive COMMIT boundaries). Point this connection at a DIRECT port via ‘config.worker_notify_*` overrides. The health-check-on-timeout catches a connection killed out from under us and re-LISTENs everything.
NOTIFY channel naming (pgmq trigger): PG_NOTIFY(‘pgmq.’ || table || ‘.’ || TG_OP). For queue ‘pgbus_default` the table is `q_pgbus_default`, so the channel is `pgmq.q_pgbus_default.INSERT`.
Thread safety: @running, @conn, and @listening_to are guarded by blocking IO call where the mutex MUST NOT be held), so wait_once reads the connection out of the mutex first and operates on a local. Reconnect publishes the new connection + channel set under the mutex.
Constant Summary collapse
- CHANNEL_PREFIX =
"pgmq.q_"- CHANNEL_SUFFIX =
".INSERT"- RECONNECT_BACKOFF_SECONDS =
0.5
Instance Method Summary collapse
- #add_queue(physical_queue) ⇒ Object
-
#initialize(physical_queues:, on_wake:, connection_options:, health_check_ms: 1000, logger: Pgbus.logger) ⇒ NotifyListener
constructor
A new instance of NotifyListener.
- #listening_to ⇒ Object
- #remove_queue(physical_queue) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(physical_queues:, on_wake:, connection_options:, health_check_ms: 1000, logger: Pgbus.logger) ⇒ NotifyListener
Returns a new instance of NotifyListener.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/pgbus/process/notify_listener.rb', line 39 def initialize(physical_queues:, on_wake:, connection_options:, health_check_ms: 1000, logger: Pgbus.logger) @physical_queues = Array(physical_queues) @on_wake = on_wake @connection_options = @health_check_ms = health_check_ms @logger = logger @state_mutex = Mutex.new @listening_to = Set.new @commands = Queue.new @running = false @thread = nil @conn = nil end |
Instance Method Details
#add_queue(physical_queue) ⇒ Object
90 91 92 |
# File 'lib/pgbus/process/notify_listener.rb', line 90 def add_queue(physical_queue) @commands << [:listen, physical_queue] end |
#listening_to ⇒ Object
54 55 56 |
# File 'lib/pgbus/process/notify_listener.rb', line 54 def listening_to @state_mutex.synchronize { @listening_to.dup } end |
#remove_queue(physical_queue) ⇒ Object
94 95 96 |
# File 'lib/pgbus/process/notify_listener.rb', line 94 def remove_queue(physical_queue) @commands << [:unlisten, physical_queue] end |
#start ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/pgbus/process/notify_listener.rb', line 58 def start @state_mutex.synchronize do return self if @running @running = true end @physical_queues.each { |q| @commands << [:listen, q] } @thread = Thread.new { run_loop } self end |
#stop ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/pgbus/process/notify_listener.rb', line 69 def stop conn_to_close = nil @state_mutex.synchronize do return self unless @running @running = false conn_to_close = @conn end @commands << [:stop] # Interrupt the blocking wait by closing the socket; the rescue in # wait_once sees @running == false and exits cleanly. begin conn_to_close&.close if conn_to_close.respond_to?(:close) rescue StandardError nil end @thread&.join(5) @thread = nil self end |