Class: JobWorkflow::QueueAdapters::SolidQueueAdapter
- Defined in:
- lib/job_workflow/queue_adapters/solid_queue_adapter.rb
Overview
rubocop:disable Naming/PredicateMethod, Metrics/ClassLength
Defined Under Namespace
Modules: ClaimedExecutionPatch, SchedulingPatch
Instance Method Summary collapse
-
#clear_queue(queue_name) ⇒ Object
: (String) -> bool.
-
#fetch_job_contexts(job_ids) ⇒ Object
: (Array) -> Array[Hash[String, untyped]].
-
#fetch_job_statuses(job_ids) ⇒ Object
: (Array) -> Hash[String, untyped].
-
#find_job(job_id) ⇒ Object
: (String) -> Hash[String, untyped]?.
-
#initialize ⇒ SolidQueueAdapter
constructor
: () -> void.
-
#initialize_adapter! ⇒ Object
: () -> void.
-
#job_status(job) ⇒ Object
: (untyped) -> Symbol.
-
#pause_queue(queue_name) ⇒ Object
: (String) -> bool.
-
#paused_queues ⇒ Object
: () -> Array.
-
#persist_job_context(job) ⇒ Object
: (DSL) -> void.
-
#queue_latency(queue_name) ⇒ Object
: (String) -> Integer?.
-
#queue_paused?(queue_name) ⇒ Boolean
: (String) -> bool.
-
#queue_size(queue_name) ⇒ Object
: (String) -> Integer.
-
#reschedule_job(job, wait) ⇒ Object
: (DSL, Numeric) -> bool.
-
#resume_queue(queue_name) ⇒ Object
: (String) -> bool.
-
#semaphore_available? ⇒ Boolean
: () -> bool.
-
#semaphore_signal(semaphore) ⇒ Object
: (Semaphore) -> bool.
-
#semaphore_wait(semaphore) ⇒ Object
: (Semaphore) -> bool.
-
#supports_concurrency_limits? ⇒ Boolean
: () -> bool.
Constructor Details
#initialize ⇒ SolidQueueAdapter
-
Registry scope: @semaphore_registry is process-scoped (shared across fibers/threads in the same process) and lives for the lifetime of the worker process. It is not serialized to persistent storage; semaphores are transient per worker instance.
-
Cleanup: The adapter relies on SolidQueue::Worker lifecycle hooks to clean up active semaphores when the worker stops. If a worker crashes, semaphores will leak until the underlying database records expire or are manually cleaned.
: () -> void
16 17 18 19 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 16 def initialize @semaphore_registry = {} #: Hash[Object, ^(SolidQueue::Worker) -> void] super end |
Instance Method Details
#clear_queue(queue_name) ⇒ Object
: (String) -> bool
145 146 147 148 149 150 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 145 def clear_queue(queue_name) return false unless defined?(SolidQueue::Queue) SolidQueue::Queue.find_by_name(queue_name).clear true end |
#fetch_job_contexts(job_ids) ⇒ Object
-
Fetches job_workflow_context hashes for the given job IDs.
: (Array) -> Array[Hash[String, untyped]]
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 178 def fetch_job_contexts(job_ids) return [] unless defined?(SolidQueue::Job) return [] if job_ids.empty? jobs = without_query_cache { SolidQueue::Job.where(active_job_id: job_ids).to_a } jobs.filter_map do |job| args = job.arguments args.is_a?(Hash) ? args["job_workflow_context"] : nil end end |
#fetch_job_statuses(job_ids) ⇒ Object
: (Array) -> Hash[String, untyped]
74 75 76 77 78 79 80 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 74 def fetch_job_statuses(job_ids) return {} unless defined?(SolidQueue::Job) without_query_cache do SolidQueue::Job.where(active_job_id: job_ids).index_by(&:active_job_id) end end |
#find_job(job_id) ⇒ Object
-
SolidQueue stores the full ActiveJob serialization in job.arguments
-
We need to extract the actual arguments array for consistency
: (String) -> Hash[String, untyped]?
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 157 def find_job(job_id) return unless defined?(SolidQueue::Job) job = without_query_cache { SolidQueue::Job.find_by(active_job_id: job_id) } return if job.nil? args = job.arguments { "job_id" => job.active_job_id, "class_name" => job.class_name, "queue_name" => job.queue_name, "arguments" => args.is_a?(Hash) ? args["arguments"] : args, "job_workflow_context" => args.is_a?(Hash) ? args["job_workflow_context"] : nil, "status" => job_status(job) } end |
#initialize_adapter! ⇒ Object
: () -> void
22 23 24 25 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 22 def initialize_adapter! SolidQueue::Configuration.prepend(SchedulingPatch) if defined?(SolidQueue::Configuration) SolidQueue::ClaimedExecution.prepend(ClaimedExecutionPatch) if defined?(SolidQueue::ClaimedExecution) end |
#job_status(job) ⇒ Object
: (untyped) -> Symbol
83 84 85 86 87 88 89 90 91 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 83 def job_status(job) without_query_cache do return :failed if job.failed? return :succeeded if job.finished? return :running if job.claimed? :pending end end |
#pause_queue(queue_name) ⇒ Object
: (String) -> bool
99 100 101 102 103 104 105 106 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 99 def pause_queue(queue_name) return false unless defined?(SolidQueue::Queue) SolidQueue::Queue.find_by_name(queue_name).pause true rescue ActiveRecord::RecordNotUnique true end |
#paused_queues ⇒ Object
: () -> Array
124 125 126 127 128 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 124 def paused_queues return [] unless defined?(SolidQueue::Pause) SolidQueue::Pause.pluck(:queue_name) end |
#persist_job_context(job) ⇒ Object
-
Persists the job’s updated context (including task outputs) back to the SolidQueue job record after execution completes. Without this, outputs computed during job execution would be lost because SolidQueue does not re-serialize job arguments after perform.
: (DSL) -> void
208 209 210 211 212 213 214 215 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 208 def persist_job_context(job) return unless defined?(SolidQueue::Job) solid_queue_job = SolidQueue::Job.find_by(active_job_id: job.job_id) return if solid_queue_job.nil? solid_queue_job.update!(arguments: job.serialize.deep_stringify_keys) end |
#queue_latency(queue_name) ⇒ Object
: (String) -> Integer?
131 132 133 134 135 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 131 def queue_latency(queue_name) return nil unless defined?(SolidQueue::Queue) SolidQueue::Queue.find_by_name(queue_name).latency end |
#queue_paused?(queue_name) ⇒ Boolean
: (String) -> bool
117 118 119 120 121 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 117 def queue_paused?(queue_name) return false unless defined?(SolidQueue::Queue) SolidQueue::Queue.find_by_name(queue_name).paused? end |
#queue_size(queue_name) ⇒ Object
: (String) -> Integer
138 139 140 141 142 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 138 def queue_size(queue_name) return 0 unless defined?(SolidQueue::Queue) SolidQueue::Queue.find_by_name(queue_name).size end |
#reschedule_job(job, wait) ⇒ Object
: (DSL, Numeric) -> bool
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 190 def reschedule_job(job, wait) return false unless defined?(SolidQueue::Job) solid_queue_job = without_query_cache { SolidQueue::Job.find_by(active_job_id: job.job_id) } return false unless solid_queue_job&.claimed? reschedule_solid_queue_job(solid_queue_job, job, wait) rescue ActiveRecord::RecordNotFound false end |
#resume_queue(queue_name) ⇒ Object
: (String) -> bool
109 110 111 112 113 114 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 109 def resume_queue(queue_name) return false unless defined?(SolidQueue::Queue) SolidQueue::Queue.find_by_name(queue_name).resume true end |
#semaphore_available? ⇒ Boolean
: () -> bool
28 29 30 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 28 def semaphore_available? defined?(SolidQueue::Semaphore) ? true : false end |
#semaphore_signal(semaphore) ⇒ Object
-
Lifecycle management: The adapter is responsible for removing the hook from SolidQueue::Worker.lifecycle_hooks before calling signal. The hook must be deleted from the registry and the global lifecycle_hooks to prevent redundant signal calls after the semaphore has already been signaled.
-
Hook deletion order: The hook is deleted before calling signal to ensure the hook lambda is no longer invoked even if the signal triggers a worker stop.
: (Semaphore) -> bool
63 64 65 66 67 68 69 70 71 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 63 def semaphore_signal(semaphore) return true unless semaphore_available? return true unless semaphore_registry.key?(semaphore) hook = semaphore_registry[semaphore] SolidQueue::Worker.lifecycle_hooks[:stop].delete(hook) semaphore_registry.delete(semaphore) SolidQueue::Semaphore.signal(semaphore) end |
#semaphore_wait(semaphore) ⇒ Object
-
Thread safety: @semaphore_registry is a non-thread-safe Hash. In multi-threaded workers, concurrent calls to semaphore_wait or semaphore_signal may cause race conditions. Mitigation: SolidQueue workers typically run in single-threaded Fiber mode; verify worker configuration does not enable raw multithreading.
-
Double-wait behavior: If semaphore_wait is called twice for the same Semaphore (e.g., due to retry or requeue), the second call returns false and does not re-register the hook. This is a fail-fast contract: the semaphore is already being waited and will signal the registered hook.
: (Semaphore) -> bool
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 43 def semaphore_wait(semaphore) return true unless semaphore_available? return false if semaphore_registry.key?(semaphore) return false unless SolidQueue::Semaphore.wait(semaphore) hook = ->(_) { SolidQueue::Semaphore.signal(semaphore) } semaphore_registry[semaphore] = hook SolidQueue::Worker.on_stop(&hook) true end |
#supports_concurrency_limits? ⇒ Boolean
: () -> bool
94 95 96 |
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 94 def supports_concurrency_limits? defined?(SolidQueue) ? true : false end |