Class: JobWorkflow::QueueAdapters::SolidQueueAdapter

Inherits:
Abstract
  • Object
show all
Defined in:
lib/job_workflow/queue_adapters/solid_queue_adapter.rb

Overview

rubocop:disable Naming/PredicateMethod, Metrics/ClassLength

Defined Under Namespace

Modules: ClaimedExecutionPatch, SchedulingPatch

Instance Method Summary collapse

Constructor Details

#initializeSolidQueueAdapter

Note:
  • Registry scope: @semaphore_registry is process-scoped (shared across fibers/threads in the same process) and lives for the lifetime of the worker process. It is not serialized to persistent storage; semaphores are transient per worker instance.

  • Cleanup: The adapter relies on SolidQueue::Worker lifecycle hooks to clean up active semaphores when the worker stops. If a worker crashes, semaphores will leak until the underlying database records expire or are manually cleaned.

: () -> void



16
17
18
19
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 16

def initialize
  @semaphore_registry = {} #: Hash[Object, ^(SolidQueue::Worker) -> void]
  super
end

Instance Method Details

#clear_queue(queue_name) ⇒ Object

: (String) -> bool



145
146
147
148
149
150
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 145

def clear_queue(queue_name)
  return false unless defined?(SolidQueue::Queue)

  SolidQueue::Queue.find_by_name(queue_name).clear
  true
end

#fetch_job_contexts(job_ids) ⇒ Object

Note:
  • Fetches job_workflow_context hashes for the given job IDs.

: (Array) -> Array[Hash[String, untyped]]



178
179
180
181
182
183
184
185
186
187
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 178

def fetch_job_contexts(job_ids)
  return [] unless defined?(SolidQueue::Job)
  return [] if job_ids.empty?

  jobs = without_query_cache { SolidQueue::Job.where(active_job_id: job_ids).to_a }
  jobs.filter_map do |job|
    args = job.arguments
    args.is_a?(Hash) ? args["job_workflow_context"] : nil
  end
end

#fetch_job_statuses(job_ids) ⇒ Object

: (Array) -> Hash[String, untyped]



74
75
76
77
78
79
80
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 74

def fetch_job_statuses(job_ids)
  return {} unless defined?(SolidQueue::Job)

  without_query_cache do
    SolidQueue::Job.where(active_job_id: job_ids).index_by(&:active_job_id)
  end
end

#find_job(job_id) ⇒ Object

Note:
  • SolidQueue stores the full ActiveJob serialization in job.arguments

  • We need to extract the actual arguments array for consistency

: (String) -> Hash[String, untyped]?



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 157

def find_job(job_id)
  return unless defined?(SolidQueue::Job)

  job = without_query_cache { SolidQueue::Job.find_by(active_job_id: job_id) }
  return if job.nil?

  args = job.arguments
  {
    "job_id" => job.active_job_id,
    "class_name" => job.class_name,
    "queue_name" => job.queue_name,
    "arguments" => args.is_a?(Hash) ? args["arguments"] : args,
    "job_workflow_context" => args.is_a?(Hash) ? args["job_workflow_context"] : nil,
    "status" => job_status(job)
  }
end

#initialize_adapter!Object

: () -> void



22
23
24
25
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 22

def initialize_adapter!
  SolidQueue::Configuration.prepend(SchedulingPatch) if defined?(SolidQueue::Configuration)
  SolidQueue::ClaimedExecution.prepend(ClaimedExecutionPatch) if defined?(SolidQueue::ClaimedExecution)
end

#job_status(job) ⇒ Object

: (untyped) -> Symbol



83
84
85
86
87
88
89
90
91
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 83

def job_status(job)
  without_query_cache do
    return :failed if job.failed?
    return :succeeded if job.finished?
    return :running if job.claimed?

    :pending
  end
end

#pause_queue(queue_name) ⇒ Object

: (String) -> bool



99
100
101
102
103
104
105
106
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 99

def pause_queue(queue_name)
  return false unless defined?(SolidQueue::Queue)

  SolidQueue::Queue.find_by_name(queue_name).pause
  true
rescue ActiveRecord::RecordNotUnique
  true
end

#paused_queuesObject

: () -> Array



124
125
126
127
128
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 124

def paused_queues
  return [] unless defined?(SolidQueue::Pause)

  SolidQueue::Pause.pluck(:queue_name)
end

#persist_job_context(job) ⇒ Object

Note:
  • Persists the job’s updated context (including task outputs) back to the SolidQueue job record after execution completes. Without this, outputs computed during job execution would be lost because SolidQueue does not re-serialize job arguments after perform.

: (DSL) -> void



208
209
210
211
212
213
214
215
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 208

def persist_job_context(job)
  return unless defined?(SolidQueue::Job)

  solid_queue_job = SolidQueue::Job.find_by(active_job_id: job.job_id)
  return if solid_queue_job.nil?

  solid_queue_job.update!(arguments: job.serialize.deep_stringify_keys)
end

#queue_latency(queue_name) ⇒ Object

: (String) -> Integer?



131
132
133
134
135
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 131

def queue_latency(queue_name)
  return nil unless defined?(SolidQueue::Queue)

  SolidQueue::Queue.find_by_name(queue_name).latency
end

#queue_paused?(queue_name) ⇒ Boolean

: (String) -> bool

Returns:

  • (Boolean)


117
118
119
120
121
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 117

def queue_paused?(queue_name)
  return false unless defined?(SolidQueue::Queue)

  SolidQueue::Queue.find_by_name(queue_name).paused?
end

#queue_size(queue_name) ⇒ Object

: (String) -> Integer



138
139
140
141
142
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 138

def queue_size(queue_name)
  return 0 unless defined?(SolidQueue::Queue)

  SolidQueue::Queue.find_by_name(queue_name).size
end

#reschedule_job(job, wait) ⇒ Object

: (DSL, Numeric) -> bool



190
191
192
193
194
195
196
197
198
199
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 190

def reschedule_job(job, wait)
  return false unless defined?(SolidQueue::Job)

  solid_queue_job = without_query_cache { SolidQueue::Job.find_by(active_job_id: job.job_id) }
  return false unless solid_queue_job&.claimed?

  reschedule_solid_queue_job(solid_queue_job, job, wait)
rescue ActiveRecord::RecordNotFound
  false
end

#resume_queue(queue_name) ⇒ Object

: (String) -> bool



109
110
111
112
113
114
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 109

def resume_queue(queue_name)
  return false unless defined?(SolidQueue::Queue)

  SolidQueue::Queue.find_by_name(queue_name).resume
  true
end

#semaphore_available?Boolean

: () -> bool

Returns:

  • (Boolean)


28
29
30
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 28

def semaphore_available?
  defined?(SolidQueue::Semaphore) ? true : false
end

#semaphore_signal(semaphore) ⇒ Object

Note:
  • Lifecycle management: The adapter is responsible for removing the hook from SolidQueue::Worker.lifecycle_hooks before calling signal. The hook must be deleted from the registry and the global lifecycle_hooks to prevent redundant signal calls after the semaphore has already been signaled.

  • Hook deletion order: The hook is deleted before calling signal to ensure the hook lambda is no longer invoked even if the signal triggers a worker stop.

: (Semaphore) -> bool



63
64
65
66
67
68
69
70
71
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 63

def semaphore_signal(semaphore)
  return true unless semaphore_available?
  return true unless semaphore_registry.key?(semaphore)

  hook = semaphore_registry[semaphore]
  SolidQueue::Worker.lifecycle_hooks[:stop].delete(hook)
  semaphore_registry.delete(semaphore)
  SolidQueue::Semaphore.signal(semaphore)
end

#semaphore_wait(semaphore) ⇒ Object

Note:
  • Thread safety: @semaphore_registry is a non-thread-safe Hash. In multi-threaded workers, concurrent calls to semaphore_wait or semaphore_signal may cause race conditions. Mitigation: SolidQueue workers typically run in single-threaded Fiber mode; verify worker configuration does not enable raw multithreading.

  • Double-wait behavior: If semaphore_wait is called twice for the same Semaphore (e.g., due to retry or requeue), the second call returns false and does not re-register the hook. This is a fail-fast contract: the semaphore is already being waited and will signal the registered hook.

: (Semaphore) -> bool



43
44
45
46
47
48
49
50
51
52
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 43

def semaphore_wait(semaphore)
  return true unless semaphore_available?
  return false if semaphore_registry.key?(semaphore)
  return false unless SolidQueue::Semaphore.wait(semaphore)

  hook = ->(_) { SolidQueue::Semaphore.signal(semaphore) }
  semaphore_registry[semaphore] = hook
  SolidQueue::Worker.on_stop(&hook)
  true
end

#supports_concurrency_limits?Boolean

: () -> bool

Returns:

  • (Boolean)


94
95
96
# File 'lib/job_workflow/queue_adapters/solid_queue_adapter.rb', line 94

def supports_concurrency_limits?
  defined?(SolidQueue) ? true : false
end