Class: Pgbus::Process::Worker
- Inherits:
-
Object
- Object
- Pgbus::Process::Worker
- Includes:
- SignalHandler
- Defined in:
- lib/pgbus/process/worker.rb
Constant Summary collapse
- WILDCARD_REFRESH_INTERVAL =
seconds
30- MISSING_QUEUE_REGEX =
Matches the physical queue name inside a “relation "pgmq.q_foo" does not exist” error. Frozen module constant to avoid recompiling the regex on every queue-missing error in hot fetch/read paths.
/pgmq\.q_(\w+)/
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#execution_mode ⇒ Object
readonly
Returns the value of attribute execution_mode.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #graceful_shutdown ⇒ Object
- #immediate_shutdown ⇒ Object
-
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
- #stats ⇒ Object
Methods included from SignalHandler
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pgbus/process/worker.rb', line 12 def initialize(queues:, threads: 5, config: Pgbus.configuration, single_active_consumer: false, consumer_priority: 0, execution_mode: :threads) @queues = Array(queues) @wildcard = @queues.include?("*") @threads = threads @config = config @execution_mode = ExecutionPools.normalize_mode(execution_mode) @single_active_consumer = single_active_consumer @consumer_priority = consumer_priority @lifecycle = Lifecycle.new @last_wildcard_resolve = nil @jobs_processed = Concurrent::AtomicFixnum.new(0) @jobs_failed = Concurrent::AtomicFixnum.new(0) @in_flight = Concurrent::AtomicFixnum.new(0) @rate_counter = RateCounter.new(:processed, :failed, :dequeued) @started_at = Time.current @started_at_monotonic = monotonic_now @stat_buffer = config.stats_enabled ? Pgbus::StatBuffer.new : nil @executor = Pgbus::ActiveJob::Executor.new(stat_buffer: @stat_buffer) @wake_signal = WakeSignal.new @pool = ExecutionPools.build( mode: @execution_mode, capacity: threads, on_state_change: -> { @wake_signal.notify! } ) @circuit_breaker = Pgbus::CircuitBreaker.new(config: config) @queue_lock = QueueLock.new if @single_active_consumer end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def config @config end |
#execution_mode ⇒ Object (readonly)
Returns the value of attribute execution_mode.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def execution_mode @execution_mode end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def queues @queues end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
10 11 12 |
# File 'lib/pgbus/process/worker.rb', line 10 def threads @threads end |
Instance Method Details
#graceful_shutdown ⇒ Object
83 84 85 86 87 88 |
# File 'lib/pgbus/process/worker.rb', line 83 def graceful_shutdown Pgbus.logger.info { "[Pgbus] Worker shutting down gracefully..." } Pgbus.stopping = true @lifecycle.transition_to(:draining) @wake_signal.notify! end |
#immediate_shutdown ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/pgbus/process/worker.rb', line 90 def immediate_shutdown Pgbus.logger.warn { "[Pgbus] Worker shutting down immediately!" } Pgbus.stopping = true @lifecycle.transition_to!(:stopped) @wake_signal.notify! @pool.kill end |
#run ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pgbus/process/worker.rb', line 57 def run setup_signals start_heartbeat resolve_wildcard_queues @lifecycle.transition_to!(:running) Pgbus.logger.info do "[Pgbus] Worker started: queues=#{queues.join(",")} threads=#{threads} " \ "mode=#{@execution_mode} pid=#{::Process.pid}" end loop do process_signals check_recycle refresh_wildcard_queues break if @lifecycle.stopped? break if @lifecycle.draining? && @pool.idle? claim_and_execute if @lifecycle.can_process? @stat_buffer&.flush_if_due @wake_signal.wait(timeout: config.polling_interval) if @lifecycle.draining? || @lifecycle.paused? end shutdown end |
#stats ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pgbus/process/worker.rb', line 42 def stats { jobs_processed: @jobs_processed.value, jobs_failed: @jobs_failed.value, in_flight: @in_flight.value, state: @lifecycle.state, execution_mode: @execution_mode, consumer_priority: @consumer_priority, single_active_consumer: @single_active_consumer, locked_queues: @queue_lock&.held_queues || [], rates: @rate_counter.rates, started_at: @started_at }.merge(@pool.) end |