Module: Wurk::Worker::ClassMethods

Defined in:
lib/wurk/worker.rb

Overview

rubocop:disable Metrics/ModuleLength

Instance Method Summary collapse

Instance Method Details

#build_client(pool = get_sidekiq_options['pool'], client_class: nil) ⇒ Object

‘client_class` swaps the enqueue client (e.g. TransactionAwareClient via Wurk.transactional_push!). Resolution order: per-call `set(client_class:)`, then the class option, then the live process default, then Wurk::Client. The default_job_options fallback keeps a global `transactional_push!` order-independent: a class whose options memoized before the opt-in (its inherited copy is a stale dup) still routes through the new client.



133
134
135
136
137
# File 'lib/wurk/worker.rb', line 133

def build_client(pool = get_sidekiq_options['pool'], client_class: nil)
  klass = client_class || get_sidekiq_options['client_class'] ||
          Wurk.default_job_options['client_class'] || Wurk::Client
  klass.new(pool: pool)
end

#clearObject



152
153
154
# File 'lib/wurk/worker.rb', line 152

def clear
  ::Wurk::Queues.clear_class(to_s)
end

#client_push(item) ⇒ Object

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
125
# File 'lib/wurk/worker.rb', line 117

def client_push(item)
  raise ArgumentError, "Job arguments to #{name || self} must have string keys" if symbol_keyed?(item)

  # `pool` is a transient enqueue-time attribute: a per-call `set(pool:)`
  # overrides the class-level option, then it's deleted so it never reaches
  # the wire (normalize_item strips any class-level pool re-merged below).
  pool = item.delete('pool') || get_sidekiq_options['pool']
  build_client(pool, client_class: item.delete('client_class')).push(item)
end

#delayObject Also known as: delay_for, delay_until

Raises:

  • (ArgumentError)


193
194
195
# File 'lib/wurk/worker.rb', line 193

def delay(*)
  raise ArgumentError, "#{name || self}.delay is removed in Sidekiq 7+. Use #{name || 'klass'}.perform_async."
end

#drainObject

Run & remove every fake job for this class — including ones it enqueues mid-drain. Returns the count processed.



158
159
160
161
162
163
164
165
# File 'lib/wurk/worker.rb', line 158

def drain
  count = 0
  while (job = ::Wurk::Queues.shift_class(to_s))
    process_job(job)
    count += 1
  end
  count
end

#execute_job(worker, args) ⇒ Object



189
190
191
# File 'lib/wurk/worker.rb', line 189

def execute_job(worker, args)
  worker.perform(*args)
end

#get_sidekiq_optionsObject

Sidekiq’s public API name — wire-compat sacred. Must stay ‘get_sidekiq_options`.



75
76
77
# File 'lib/wurk/worker.rb', line 75

def get_sidekiq_options # rubocop:disable Naming/AccessorMethodName
  @sidekiq_options_hash ||= inherited_sidekiq_options # rubocop:disable Naming/MemoizedInstanceVariableName
end

#inherited(subclass) ⇒ Object



199
200
201
202
203
204
# File 'lib/wurk/worker.rb', line 199

def inherited(subclass)
  super
  subclass.instance_variable_set(:@sidekiq_options_hash, get_sidekiq_options.dup)
  inherit_block(subclass, :@sidekiq_retry_in_block)
  inherit_block(subclass, :@sidekiq_retries_exhausted_block)
end

#jobsObject

Fake jobs enqueued for this class, across every queue.



148
149
150
# File 'lib/wurk/worker.rb', line 148

def jobs
  ::Wurk::Queues.jobs_by_class[to_s] || []
end

#perform_asyncObject



95
96
97
# File 'lib/wurk/worker.rb', line 95

def perform_async(*)
  Wurk::Worker::Setter.new(self, {}).perform_async(*)
end

#perform_bulk(items) ⇒ Object



109
110
111
# File 'lib/wurk/worker.rb', line 109

def perform_bulk(items, **)
  Wurk::Worker::Setter.new(self, {}).perform_bulk(items, **)
end

#perform_in(interval) ⇒ Object Also known as: perform_at



104
105
106
# File 'lib/wurk/worker.rb', line 104

def perform_in(interval, *)
  Wurk::Worker::Setter.new(self, {}).perform_in(interval, *)
end

#perform_inlineObject Also known as: perform_sync



99
100
101
# File 'lib/wurk/worker.rb', line 99

def perform_inline(*)
  new.perform(*)
end

#perform_oneObject

Run & remove the first fake job for this class; EmptyQueueError if none.



168
169
170
171
172
173
# File 'lib/wurk/worker.rb', line 168

def perform_one
  job = ::Wurk::Queues.shift_class(to_s)
  raise ::Wurk::Testing::EmptyQueueError, "no #{self} jobs were found" if job.nil?

  process_job(job)
end

#process_job(job_hash) ⇒ Object

Execute a normalized job hash through the inline server-middleware chain (empty by default — see Wurk::Testing.server_middleware). Returns the value of the server-middleware ‘invoke` (i.e. the worker’s ‘perform` return), matching Sidekiq::Testing — so `perform_one` yields the job result.



180
181
182
183
184
185
186
187
# File 'lib/wurk/worker.rb', line 180

def process_job(job_hash)
  instance = new
  instance.jid = job_hash['jid']
  instance.bid = job_hash['bid'] if instance.respond_to?(:bid=)
  ::Wurk::Testing.server_middleware.invoke(instance, job_hash, job_hash['queue'] || queue) do
    execute_job(instance, job_hash['args'])
  end
end

#queueObject

— Sidekiq::Testing class-level helpers (spec §24.3) ————– Only meaningful in :fake / :inline mode; the in-memory store is empty otherwise.



143
144
145
# File 'lib/wurk/worker.rb', line 143

def queue
  get_sidekiq_options['queue']
end

#queue_as(queue) ⇒ Object



83
84
85
# File 'lib/wurk/worker.rb', line 83

def queue_as(queue)
  sidekiq_options('queue' => queue.to_s)
end

#set(opts) ⇒ Object



113
114
115
# File 'lib/wurk/worker.rb', line 113

def set(opts)
  Wurk::Worker::Setter.new(self, opts)
end

#sidekiq_options(opts = {}) ⇒ Object



69
70
71
72
# File 'lib/wurk/worker.rb', line 69

def sidekiq_options(opts = {})
  merged = get_sidekiq_options.merge(opts.transform_keys(&:to_s))
  @sidekiq_options_hash = merged
end

#sidekiq_options_hashObject



79
80
81
# File 'lib/wurk/worker.rb', line 79

def sidekiq_options_hash
  get_sidekiq_options
end

#sidekiq_retries_exhausted(&block) ⇒ Object



91
92
93
# File 'lib/wurk/worker.rb', line 91

def sidekiq_retries_exhausted(&block)
  self.sidekiq_retries_exhausted_block = block
end

#sidekiq_retry_in(&block) ⇒ Object



87
88
89
# File 'lib/wurk/worker.rb', line 87

def sidekiq_retry_in(&block)
  self.sidekiq_retry_in_block = block
end