Class: Sidekiq::Worker::Setter

Inherits:
Object
  • Object
show all
Includes:
JobUtil
Defined in:
lib/sidekiq/worker.rb

Overview

This helper class encapsulates the set options for `set`, e.g.

SomeWorker.set(queue: 'foo').perform_async(....)

Constant Summary

Constants included from JobUtil

JobUtil::TRANSIENT_ATTRIBUTES

Instance Method Summary collapse

Methods included from JobUtil

#normalize_item, #normalized_hash, #validate, #verify_json

Constructor Details

#initialize(klass, opts) ⇒ Setter

Returns a new instance of Setter.



176
177
178
179
180
181
182
183
184
# File 'lib/sidekiq/worker.rb', line 176

def initialize(klass, opts)
  @klass = klass
  # NB: the internal hash always has stringified keys
  @opts = opts.transform_keys(&:to_s)

  # ActiveJob compatibility
  interval = @opts.delete("wait_until") || @opts.delete("wait")
  at(interval) if interval
end

Instance Method Details

#perform_async(*args) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/sidekiq/worker.rb', line 194

def perform_async(*args)
  if @opts["sync"] == true
    perform_inline(*args)
  else
    @klass.client_push(@opts.merge("args" => args, "class" => @klass))
  end
end

#perform_bulk(args, batch_size: 1_000) ⇒ Object



239
240
241
242
243
244
245
246
# File 'lib/sidekiq/worker.rb', line 239

def perform_bulk(args, batch_size: 1_000)
  client = @klass.build_client
  result = args.each_slice(batch_size).flat_map do |slice|
    client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
  end

  result.is_a?(Enumerator::Lazy) ? result.force : result
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


250
251
252
# File 'lib/sidekiq/worker.rb', line 250

def perform_in(interval, *args)
  at(interval).perform_async(*args)
end

#perform_inline(*args) ⇒ Object Also known as: perform_sync

Explicit inline execution of a job. Returns nil if the job did not execute, true otherwise.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/sidekiq/worker.rb', line 204

def perform_inline(*args)
  raw = @opts.merge("args" => args, "class" => @klass)

  # validate and normalize payload
  item = normalize_item(raw)
  queue = item["queue"]

  # run client-side middleware
  result = Sidekiq.client_middleware.invoke(item["class"], item, queue, Sidekiq.redis_pool) do
    item
  end
  return nil unless result

  # round-trip the payload via JSON
  msg = Sidekiq.load_json(Sidekiq.dump_json(item))

  # prepare the job instance
  klass = msg["class"].constantize
  job = klass.new
  job.jid = msg["jid"]
  job.bid = msg["bid"] if job.respond_to?(:bid)

  # run the job through server-side middleware
  result = Sidekiq.server_middleware.invoke(job, msg, msg["queue"]) do
    # perform it
    job.perform(*msg["args"])
    true
  end
  return nil unless result
  # jobs do not return a result. they should store any
  # modified state.
  true
end

#set(options) ⇒ Object



186
187
188
189
190
191
192
# File 'lib/sidekiq/worker.rb', line 186

def set(options)
  hash = options.transform_keys(&:to_s)
  interval = hash.delete("wait_until") || @opts.delete("wait")
  @opts.merge!(hash)
  at(interval) if interval
  self
end