Class: Pgbus::Process::Worker
- Inherits:
-
Object
- Object
- Pgbus::Process::Worker
- Includes:
- SignalHandler
- Defined in:
- lib/pgbus/process/worker.rb
Constant Summary collapse
- WILDCARD_REFRESH_INTERVAL =
seconds
30- MISSING_QUEUE_REGEX =
Matches the physical queue name inside a “relation "pgmq.q_foo" does not exist” error. Frozen module constant to avoid recompiling the regex on every queue-missing error in hot fetch/read paths.
/pgmq\.q_(\w+)/
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#execution_mode ⇒ Object
readonly
Returns the value of attribute execution_mode.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #graceful_shutdown ⇒ Object
- #immediate_shutdown ⇒ Object
-
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
- #stats ⇒ Object
Methods included from SignalHandler
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/pgbus/process/worker.rb', line 12 def initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) @queues = Array(queues) @wildcard = @queues.include?("*") @threads = threads @config = config @execution_mode = ExecutionPools.normalize_mode(execution_mode) @group_mode = case group_mode when nil then nil when Symbol then group_mode when String then group_mode.to_sym else raise ArgumentError, "Invalid group_mode type: #{group_mode.class}. Must be nil, String, or Symbol" end unless Pgbus::Configuration::VALID_GROUP_MODES.include?(@group_mode) raise ArgumentError, "Invalid group_mode: #{@group_mode.inspect}. Must be nil, :fifo, or :round_robin" end @single_active_consumer = single_active_consumer @consumer_priority = consumer_priority @lifecycle = Lifecycle.new @last_wildcard_resolve = nil @jobs_processed = Concurrent::AtomicFixnum.new(0) @jobs_failed = Concurrent::AtomicFixnum.new(0) @in_flight = Concurrent::AtomicFixnum.new(0) @rate_counter = RateCounter.new(:processed, :failed, :dequeued) @started_at = Time.current @started_at_monotonic = monotonic_now @stat_buffer = config.stats_enabled ? Pgbus::StatBuffer.new : nil @executor = Pgbus::ActiveJob::Executor.new(stat_buffer: @stat_buffer) @wake_signal = WakeSignal.new @pool = ExecutionPools.build( mode: @execution_mode, capacity: threads, on_state_change: -> { @wake_signal.notify! } ) @circuit_breaker = Pgbus::CircuitBreaker.new(config: config) @queue_lock = QueueLock.new if @single_active_consumer end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def config @config end |
#execution_mode ⇒ Object (readonly)
Returns the value of attribute execution_mode.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def execution_mode @execution_mode end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def queues @queues end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def threads @threads end |
Instance Method Details
#graceful_shutdown ⇒ Object
95 96 97 98 99 100 |
# File 'lib/pgbus/process/worker.rb', line 95 def graceful_shutdown Pgbus.logger.info { "[Pgbus] Worker shutting down gracefully..." } Pgbus.stopping = true @lifecycle.transition_to(:draining) @wake_signal.notify! end |
#immediate_shutdown ⇒ Object
102 103 104 105 106 107 108 |
# File 'lib/pgbus/process/worker.rb', line 102 def immediate_shutdown Pgbus.logger.warn { "[Pgbus] Worker shutting down immediately!" } Pgbus.stopping = true @lifecycle.transition_to!(:stopped) @wake_signal.notify! @pool.kill end |
#run ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/pgbus/process/worker.rb', line 69 def run setup_signals start_heartbeat resolve_wildcard_queues @lifecycle.transition_to!(:running) Pgbus.logger.info do "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} " \ "mode=#{@execution_mode} pid=#{::Process.pid}" end loop do process_signals check_recycle refresh_wildcard_queues break if @lifecycle.stopped? break if @lifecycle.draining? && @pool.idle? claim_and_execute if @lifecycle.can_process? @stat_buffer&.flush_if_due @wake_signal.wait(timeout: config.polling_interval) if @lifecycle.draining? || @lifecycle.paused? end shutdown end |
#stats ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/pgbus/process/worker.rb', line 54 def stats { jobs_processed: @jobs_processed.value, jobs_failed: @jobs_failed.value, in_flight: @in_flight.value, state: @lifecycle.state, execution_mode: @execution_mode, consumer_priority: @consumer_priority, single_active_consumer: @single_active_consumer, locked_queues: @queue_lock&.held_queues || [], rates: @rate_counter.rates, started_at: @started_at }.merge(@pool.) end |