Module: Pgbus::Process::ConsumerPriority

Defined in:
lib/pgbus/process/consumer_priority.rb

Overview

Implements consumer priority by checking whether higher-priority workers are active for the same queues. When a higher-priority worker is healthy and not at its prefetch limit, lower-priority workers yield by using a longer polling interval.

Inspired by LavinMQ’s consumer priority where higher-priority consumers are served first and lower-priority consumers wait until all higher-priority consumers are at their prefetch limit.

Class Method Summary collapse

Class Method Details

.effective_polling_interval(base_interval:, my_priority:, max_priority:) ⇒ Object

Calculate the effective polling interval for this worker. Higher-priority workers use the base interval. Lower-priority workers multiply by a backoff factor.



56
57
58
59
60
61
# File 'lib/pgbus/process/consumer_priority.rb', line 56

def self.effective_polling_interval(base_interval:, my_priority:, max_priority:)
  return base_interval if my_priority >= max_priority

  # Lower-priority workers back off: 3x the base interval
  base_interval * 3
end

.max_active_priority(queues, my_pid) ⇒ Object

Returns the highest consumer_priority among healthy workers that share at least one queue with the given queue list, excluding the current worker (by PID).



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pgbus/process/consumer_priority.rb', line 29

def self.max_active_priority(queues, my_pid)
  conn = Pgbus.configuration.connects_to ? Pgbus::BusRecord.connection : ActiveRecord::Base.connection
  rows = conn.select_all(
    "SELECT metadata FROM pgbus_processes WHERE kind = 'worker' AND pid != $1 AND last_heartbeat_at > $2",
    "Pgbus ConsumerPriority",
    [my_pid, Time.now.utc - Heartbeat::ALIVE_THRESHOLD]
  )

  max_priority = 0
  rows.each do |row|
     = row["metadata"]
     = JSON.parse() if .is_a?(String)
    next unless 

    other_queues = ["queues"] || []
    next unless queues.intersect?(other_queues)

    other_priority = ["consumer_priority"] || 0
    max_priority = other_priority if other_priority > max_priority
  end

  max_priority
end

.should_yield?(queues:, my_priority:, my_pid:) ⇒ Boolean

Check if this worker should yield to a higher-priority worker. Returns true if a higher-priority healthy worker exists for any of the given queues.

Returns:

  • (Boolean)


17
18
19
20
21
22
23
24
# File 'lib/pgbus/process/consumer_priority.rb', line 17

def self.should_yield?(queues:, my_priority:, my_pid:)
  return false if my_priority >= max_active_priority(queues, my_pid)

  true
rescue StandardError => e
  Pgbus.logger.debug { "[Pgbus] Consumer priority check failed: #{e.message}" }
  false
end