Class: Pgbus::Process::Worker
- Inherits:
-
Object
- Object
- Pgbus::Process::Worker
- Includes:
- SignalHandler
- Defined in:
- lib/pgbus/process/worker.rb
Constant Summary collapse
- NOTIFY_FALLBACK_POLL_SECONDS =
15- WILDCARD_REFRESH_INTERVAL =
seconds
30- MISSING_QUEUE_REGEX =
Matches the physical queue name inside a “relation "pgmq.q_foo" does not exist” error. Frozen module constant to avoid recompiling the regex on every queue-missing error in hot fetch/read paths.
/pgmq\.q_(\w+)/
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#execution_mode ⇒ Object
readonly
Returns the value of attribute execution_mode.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #graceful_shutdown ⇒ Object
- #immediate_shutdown ⇒ Object
-
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
- #stats ⇒ Object
Methods included from SignalHandler
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pgbus/process/worker.rb', line 12 def initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) @queues = Array(queues) @initial_queues = @queues.dup.freeze @wildcard = @queues.include?("*") @threads = threads @config = config @execution_mode = ExecutionPools.normalize_mode(execution_mode) @group_mode = case group_mode when nil then nil when Symbol then group_mode when String then group_mode.to_sym else raise ArgumentError, "Invalid group_mode type: #{group_mode.class}. Must be nil, String, or Symbol" end unless Pgbus::Configuration::VALID_GROUP_MODES.include?(@group_mode) raise ArgumentError, "Invalid group_mode: #{@group_mode.inspect}. Must be nil, :fifo, or :round_robin" end @single_active_consumer = single_active_consumer @consumer_priority = consumer_priority @lifecycle = Lifecycle.new @last_wildcard_resolve = nil @jobs_processed = Concurrent::AtomicFixnum.new(0) @jobs_failed = Concurrent::AtomicFixnum.new(0) @in_flight = Concurrent::AtomicFixnum.new(0) @rate_counter = RateCounter.new(:processed, :failed, :dequeued) @started_at = Time.current @started_at_monotonic = monotonic_now @stat_buffer = config.stats_enabled ? Pgbus::StatBuffer.new : nil @executor = Pgbus::ActiveJob::Executor.new(stat_buffer: @stat_buffer) @wake_signal = WakeSignal.new @pool = ExecutionPools.build( mode: @execution_mode, capacity: threads, on_state_change: -> { @wake_signal.notify! } ) @circuit_breaker = Pgbus::CircuitBreaker.new(config: config) @queue_lock = QueueLock.new if @single_active_consumer @notify_listener = nil end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def config @config end |
#execution_mode ⇒ Object (readonly)
Returns the value of attribute execution_mode.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def execution_mode @execution_mode end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def queues @queues end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def threads @threads end |
Instance Method Details
#graceful_shutdown ⇒ Object
100 101 102 103 104 105 |
# File 'lib/pgbus/process/worker.rb', line 100 def graceful_shutdown Pgbus.logger.info { "[Pgbus] Worker shutting down gracefully..." } Pgbus.stopping = true @lifecycle.transition_to(:draining) @wake_signal.notify! end |
#immediate_shutdown ⇒ Object
107 108 109 110 111 112 113 |
# File 'lib/pgbus/process/worker.rb', line 107 def immediate_shutdown Pgbus.logger.warn { "[Pgbus] Worker shutting down immediately!" } Pgbus.stopping = true @lifecycle.transition_to!(:stopped) @wake_signal.notify! @pool.kill end |
#run ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/pgbus/process/worker.rb', line 73 def run setup_signals start_heartbeat resolve_wildcard_queues start_notify_listener @lifecycle.transition_to!(:running) Pgbus.logger.info do "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} " \ "mode=#{@execution_mode} notify_wakeup=#{notify_wakeup?} pid=#{::Process.pid}" end loop do process_signals check_recycle refresh_wildcard_queues break if @lifecycle.stopped? break if @lifecycle.draining? && @pool.idle? claim_and_execute if @lifecycle.can_process? @stat_buffer&.flush_if_due @wake_signal.wait(timeout: config.polling_interval) if @lifecycle.draining? || @lifecycle.paused? end shutdown end |
#stats ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/pgbus/process/worker.rb', line 56 def stats { jobs_processed: @jobs_processed.value, jobs_failed: @jobs_failed.value, in_flight: @in_flight.value, state: @lifecycle.state, execution_mode: @execution_mode, consumer_priority: @consumer_priority, single_active_consumer: @single_active_consumer, locked_queues: @queue_lock&.held_queues || [], rates: @rate_counter.rates, started_at: @started_at }.merge(@pool.) end |