Class: Pgbus::ActiveJob::Executor
- Inherits:
-
Object
- Object
- Pgbus::ActiveJob::Executor
- Defined in:
- lib/pgbus/active_job/executor.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #execute(message, queue_name, source_queue: nil) ⇒ Object
-
#initialize(client: Pgbus.client, config: Pgbus.configuration, stat_buffer: nil) ⇒ Executor
constructor
A new instance of Executor.
Constructor Details
#initialize(client: Pgbus.client, config: Pgbus.configuration, stat_buffer: nil) ⇒ Executor
Returns a new instance of Executor.
10 11 12 13 14 |
# File 'lib/pgbus/active_job/executor.rb', line 10 def initialize(client: Pgbus.client, config: Pgbus.configuration, stat_buffer: nil) @client = client @config = config @stat_buffer = stat_buffer end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
8 9 10 |
# File 'lib/pgbus/active_job/executor.rb', line 8 def client @client end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
8 9 10 |
# File 'lib/pgbus/active_job/executor.rb', line 8 def config @config end |
Instance Method Details
#execute(message, queue_name, source_queue: nil) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/pgbus/active_job/executor.rb', line 20 def execute(, queue_name, source_queue: nil) execution_start = monotonic_now tag = "msg_id=#{.msg_id} queue=#{queue_name} read_ct=#{.read_ct}" Pgbus.logger.debug { "[Pgbus::Executor] start #{tag}" } payload = JSON.parse(.) job_class = payload["job_class"] read_count = .read_ct.to_i if read_count > config.max_retries handle_dead_letter(, queue_name, payload, source_queue: source_queue) FailedEventRecorder.clear!(queue_name: queue_name, msg_id: .msg_id.to_i) signal_concurrency(payload) signal_batch_discarded(payload) Uniqueness.release_lock(Uniqueness.extract_key(payload)) record_stat(payload, queue_name, "dead_lettered", execution_start, message: ) Pgbus.logger.debug { "[Pgbus::Executor] dead_lettered #{tag} job_class=#{job_class}" } return :dead_lettered end uniqueness_key = Uniqueness.extract_key(payload) uniqueness_strategy = Uniqueness.extract_strategy(payload) if uniqueness_key case uniqueness_strategy when :until_executed # No claim step needed — PGMQ's visibility timeout is the execution lock. # The uniqueness key row was inserted at enqueue time and will be # released on completion or DLQ. nil when :while_executing # Acquire the lock now. If another worker is already executing # this job, skip it — VT will expire and it'll be retried. unless Uniqueness.acquire_execution_lock(uniqueness_key, payload) Pgbus.logger.info { "[Pgbus] Skipping duplicate execution for #{job_class}" } return :skipped end end end Pgbus.logger.debug { "[Pgbus::Executor] deserialized #{tag} job_class=#{job_class}" } job_succeeded = false msg_id = .msg_id.to_i Instrumentation.instrument("pgbus.executor.execute", queue: queue_name, job_class: job_class) do job = ::ActiveJob::Base.deserialize(payload) Pgbus.logger.debug { "[Pgbus::Executor] running #{tag} job_class=#{job_class}" } execute_job(job) Pgbus.logger.debug { "[Pgbus::Executor] perform_returned #{tag} job_class=#{job_class}" } archive_from(queue_name, msg_id, source_queue: source_queue) Pgbus.logger.debug { "[Pgbus::Executor] archived #{tag} job_class=#{job_class}" } FailedEventRecorder.clear!(queue_name: queue_name, msg_id: msg_id) job_succeeded = true end instrument("pgbus.job_completed", queue: queue_name, job_class: job_class) record_stat(payload, queue_name, "success", execution_start, message: ) Pgbus.logger.debug { "[Pgbus::Executor] done #{tag} job_class=#{job_class}" } :success rescue *FATAL_EXCEPTIONS # Process-fatal: propagate so the supervisor/OS can react. raise rescue Exception => e # rubocop:disable Lint/RescueException # Widened from StandardError to catch Async::Stop / Async::Cancel # (both inherit from Exception, not StandardError) under execution_mode: :async. # Before this, a fiber interruption between perform_now and archive_from # silently lost control flow — no failed event row, no job_failed # notification, uniqueness lock held until VT expired. See issue #126. handle_failure(, queue_name, e, payload: payload) instrument("pgbus.job_failed", queue: queue_name, job_class: payload&.dig("job_class"), error: e.class.name) record_stat(payload, queue_name, "failed", execution_start, message: ) Pgbus.logger.debug { "[Pgbus::Executor] failed #{tag} job_class=#{payload&.dig("job_class")} error=#{e.class}" } # Don't signal concurrency on transient failure — the job will be retried. # Semaphore is released only on success or dead-lettering. :failed ensure # Signal concurrency and batch only when the job was archived successfully. # job_succeeded is set AFTER archive_message, so if archive fails the # semaphore slot stays held until VT expires and the job is retried. if job_succeeded signal_concurrency(payload) signal_batch_completed(payload) # Release uniqueness lock on successful completion (both strategies) Uniqueness.release_lock(uniqueness_key) if uniqueness_key end end |