Class: Pgbus::Process::Worker
- Inherits:
-
Object
- Object
- Pgbus::Process::Worker
- Includes:
- SignalHandler
- Defined in:
- lib/pgbus/process/worker.rb
Constant Summary collapse
- NOTIFY_FALLBACK_POLL_SECONDS =
15- WILDCARD_REFRESH_INTERVAL =
seconds
30- MISSING_QUEUE_REGEX =
Matches the physical queue name inside a “relation "pgmq.q_foo" does not exist” error. Frozen module constant to avoid recompiling the regex on every queue-missing error in hot fetch/read paths.
/pgmq\.q_(\w+)/
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#execution_mode ⇒ Object
readonly
Returns the value of attribute execution_mode.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #graceful_shutdown ⇒ Object
- #immediate_shutdown ⇒ Object
-
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
- #stats ⇒ Object
Methods included from SignalHandler
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pgbus/process/worker.rb', line 12 def initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads, group_mode: nil) @queues = Array(queues) @initial_queues = @queues.dup.freeze @wildcard = @queues.include?("*") @threads = threads @config = config @execution_mode = ExecutionPools.normalize_mode(execution_mode) @group_mode = case group_mode when nil then nil when Symbol then group_mode when String then group_mode.to_sym else raise ArgumentError, "Invalid group_mode type: #{group_mode.class}. Must be nil, String, or Symbol" end unless Pgbus::Configuration::VALID_GROUP_MODES.include?(@group_mode) raise ArgumentError, "Invalid group_mode: #{@group_mode.inspect}. Must be nil, :fifo, or :round_robin" end @single_active_consumer = single_active_consumer @consumer_priority = consumer_priority @lifecycle = Lifecycle.new @last_wildcard_resolve = nil @jobs_processed = Concurrent::AtomicFixnum.new(0) @jobs_failed = Concurrent::AtomicFixnum.new(0) @in_flight = Concurrent::AtomicFixnum.new(0) @loop_tick_at = Concurrent::AtomicReference.new(nil) @rate_counter = RateCounter.new(:processed, :failed, :dequeued) @started_at = Time.current @started_at_monotonic = monotonic_now @stat_buffer = config.stats_enabled ? Pgbus::StatBuffer.new : nil @executor = Pgbus::ActiveJob::Executor.new(stat_buffer: @stat_buffer) @wake_signal = WakeSignal.new @pool = ExecutionPools.build( mode: @execution_mode, capacity: threads, on_state_change: -> { @wake_signal.notify! } ) @circuit_breaker = Pgbus::CircuitBreaker.new(config: config) @queue_lock = QueueLock.new if @single_active_consumer @notify_listener = nil end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def config @config end |
#execution_mode ⇒ Object (readonly)
Returns the value of attribute execution_mode.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def execution_mode @execution_mode end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def queues @queues end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def threads @threads end |
Instance Method Details
#graceful_shutdown ⇒ Object
102 103 104 105 106 107 |
# File 'lib/pgbus/process/worker.rb', line 102 def graceful_shutdown Pgbus.logger.info { "[Pgbus] Worker shutting down gracefully..." } Pgbus.stopping = true @lifecycle.transition_to(:draining) @wake_signal.notify! end |
#immediate_shutdown ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/pgbus/process/worker.rb', line 109 def immediate_shutdown Pgbus.logger.warn { "[Pgbus] Worker shutting down immediately!" } Pgbus.stopping = true @lifecycle.transition_to!(:stopped) @wake_signal.notify! @pool.kill end |
#run ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/pgbus/process/worker.rb', line 74 def run setup_signals start_heartbeat resolve_wildcard_queues start_notify_listener @lifecycle.transition_to!(:running) Pgbus.logger.info do "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} " \ "mode=#{@execution_mode} notify_wakeup=#{notify_wakeup?} pid=#{::Process.pid}" end loop do stamp_loop_tick process_signals check_recycle refresh_wildcard_queues break if @lifecycle.stopped? break if @lifecycle.draining? && @pool.idle? claim_and_execute if @lifecycle.can_process? @stat_buffer&.flush_if_due @wake_signal.wait(timeout: config.polling_interval) if @lifecycle.draining? || @lifecycle.paused? end shutdown end |
#stats ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/pgbus/process/worker.rb', line 57 def stats { jobs_processed: @jobs_processed.value, jobs_failed: @jobs_failed.value, in_flight: @in_flight.value, state: @lifecycle.state, execution_mode: @execution_mode, consumer_priority: @consumer_priority, single_active_consumer: @single_active_consumer, locked_queues: @queue_lock&.held_queues || [], rates: @rate_counter.rates, started_at: @started_at }.merge(@pool.) end |