Class: Pgbus::Process::NotifyListener

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/process/notify_listener.rb

Overview

Owns a single dedicated PG::Connection that LISTENs on the INSERT NOTIFY channel of every queue a Worker/Consumer reads, and fires a WakeSignal the moment any of them receives a row. This converts the worker/consumer loop from “blind-read every polling_interval” into “sleep until a real insert, poll only as a fallback” — eliminating the empty-read storm that dominates DB load on idle queues.

pgmq-ruby’s ‘wait_for_notify(queue, timeout:)` is single-queue and wraps the wait in `with_connection`, which only watches one channel and holds the pooled connection for the whole wait. Neither fits a worker that reads N queues on a small shared pool. So we own ONE raw PG::Connection and hand-roll per-channel LISTEN on it.

A persistent LISTEN connection silently dies under a transaction-pool PgBouncer (LISTEN does not survive COMMIT boundaries). Point this connection at a DIRECT port via ‘config.worker_notify_*` overrides. The health-check-on-timeout catches a connection killed out from under us and re-LISTENs everything.

NOTIFY channel naming (pgmq trigger): PG_NOTIFY(‘pgmq.’ || table || ‘.’ || TG_OP). For queue ‘pgbus_default` the table is `q_pgbus_default`, so the channel is `pgmq.q_pgbus_default.INSERT`.

Thread safety: @running, @conn, and @listening_to are guarded by blocking IO call where the mutex MUST NOT be held), so wait_once reads the connection out of the mutex first and operates on a local. Reconnect publishes the new connection + channel set under the mutex.

Constant Summary collapse

CHANNEL_PREFIX =
"pgmq.q_"
CHANNEL_SUFFIX =
".INSERT"
RECONNECT_BACKOFF_SECONDS =
0.5

Instance Method Summary collapse

Constructor Details

#initialize(physical_queues:, on_wake:, connection_options:, health_check_ms: 1000, logger: Pgbus.logger) ⇒ NotifyListener

Returns a new instance of NotifyListener.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/pgbus/process/notify_listener.rb', line 39

def initialize(physical_queues:, on_wake:, connection_options:,
               health_check_ms: 1000, logger: Pgbus.logger)
  @physical_queues = Array(physical_queues)
  @on_wake = on_wake
  @connection_options = connection_options
  @health_check_ms = health_check_ms
  @logger = logger
  @state_mutex = Mutex.new
  @listening_to = Set.new
  @commands = Queue.new
  @running = false
  @thread = nil
  @conn = nil
end

Instance Method Details

#add_queue(physical_queue) ⇒ Object



90
91
92
# File 'lib/pgbus/process/notify_listener.rb', line 90

def add_queue(physical_queue)
  @commands << [:listen, physical_queue]
end

#listening_toObject



54
55
56
# File 'lib/pgbus/process/notify_listener.rb', line 54

def listening_to
  @state_mutex.synchronize { @listening_to.dup }
end

#remove_queue(physical_queue) ⇒ Object



94
95
96
# File 'lib/pgbus/process/notify_listener.rb', line 94

def remove_queue(physical_queue)
  @commands << [:unlisten, physical_queue]
end

#startObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/pgbus/process/notify_listener.rb', line 58

def start
  @state_mutex.synchronize do
    return self if @running

    @running = true
  end
  @physical_queues.each { |q| @commands << [:listen, q] }
  @thread = Thread.new { run_loop }
  self
end

#stopObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pgbus/process/notify_listener.rb', line 69

def stop
  conn_to_close = nil
  @state_mutex.synchronize do
    return self unless @running

    @running = false
    conn_to_close = @conn
  end
  @commands << [:stop]
  # Interrupt the blocking wait by closing the socket; the rescue in
  # wait_once sees @running == false and exits cleanly.
  begin
    conn_to_close&.close if conn_to_close.respond_to?(:close)
  rescue StandardError
    nil
  end
  @thread&.join(5)
  @thread = nil
  self
end