Module: Sidekiq::Job::ClassMethods

Defined in:
lib/sidekiq/test_api.rb,
lib/sidekiq/job.rb

Overview

The Sidekiq testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.

This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.

Example:

require 'sidekiq/testing'

assert_equal 0, HardJob.jobs.size
HardJob.perform_async(:something)
assert_equal 1, HardJob.jobs.size
assert_equal :something, HardJob.jobs[0]['args'][0]

You can also clear and drain all job types:

Sidekiq::Job.clear_all # or .drain_all

This can be useful to make sure jobs don’t linger between tests:

RSpec.configure do |config|
  config.before(:each) do
    Sidekiq::Job.clear_all
  end
end

or for acceptance testing, i.e. with cucumber:

AfterStep do
  Sidekiq::Job.drain_all
end

When I sign up as "foo@example.com"
Then I should receive a welcome email to "foo@example.com"

Instance Method Summary collapse

Instance Method Details

#build_clientObject

:nodoc:



393
394
395
396
397
# File 'lib/sidekiq/job.rb', line 393

def build_client # :nodoc:
  pool = Thread.current[:sidekiq_redis_pool] || get_sidekiq_options["pool"] || Sidekiq.default_configuration.redis_pool
  client_class = Thread.current[:sidekiq_client_class] || get_sidekiq_options["client_class"] || Sidekiq::Client
  client_class.new(pool: pool)
end

#clearObject

Clear all jobs for this worker



261
262
263
# File 'lib/sidekiq/test_api.rb', line 261

def clear
  Queues.clear_for(queue, to_s)
end

#client_push(item) ⇒ Object

:nodoc:

Raises:

  • (ArgumentError)


378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/sidekiq/job.rb', line 378

def client_push(item) # :nodoc:
  raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) }

  # allow the user to dynamically re-target jobs to another shard using the "pool" attribute
  #   FooJob.set(pool: SOME_POOL).perform_async
  old = Thread.current[:sidekiq_redis_pool]
  pool = item.delete("pool")
  Thread.current[:sidekiq_redis_pool] = pool if pool
  begin
    build_client.push(item)
  ensure
    Thread.current[:sidekiq_redis_pool] = old
  end
end

#delay(*args) ⇒ Object

Raises:

  • (ArgumentError)


278
279
280
# File 'lib/sidekiq/job.rb', line 278

def delay(*args)
  raise ArgumentError, "Do not call .delay on a Sidekiq::Job class, call .perform_async"
end

#delay_for(*args) ⇒ Object

Raises:

  • (ArgumentError)


282
283
284
# File 'lib/sidekiq/job.rb', line 282

def delay_for(*args)
  raise ArgumentError, "Do not call .delay_for on a Sidekiq::Job class, call .perform_in"
end

#delay_until(*args) ⇒ Object

Raises:

  • (ArgumentError)


286
287
288
# File 'lib/sidekiq/job.rb', line 286

def delay_until(*args)
  raise ArgumentError, "Do not call .delay_until on a Sidekiq::Job class, call .perform_at"
end

#drainObject

Drain and run all jobs for this worker



266
267
268
269
270
271
272
# File 'lib/sidekiq/test_api.rb', line 266

def drain
  while jobs.any?
    next_job = jobs.first
    Queues.delete_for(next_job["jid"], next_job["queue"], to_s)
    process_job(next_job)
  end
end

#execute_job(worker, args) ⇒ Object



291
292
293
# File 'lib/sidekiq/test_api.rb', line 291

def execute_job(worker, args)
  worker.perform(*args)
end

#jobsObject

Jobs queued for this worker



256
257
258
# File 'lib/sidekiq/test_api.rb', line 256

def jobs
  Queues.jobs_by_class[to_s]
end

#perform_async(*args) ⇒ Object



298
299
300
# File 'lib/sidekiq/job.rb', line 298

def perform_async(*args)
  Setter.new(self, {}).perform_async(*args)
end

#perform_bulk(*args, **kwargs) ⇒ Object

Push a large number of jobs to Redis, while limiting the batch of each job payload to 1,000. This method helps cut down on the number of round trips to Redis, which can increase the performance of enqueueing large numbers of jobs.

items must be an Array of Arrays.

The :at option schedules the jobs for future execution. It accepts either a single Numeric timestamp (or seconds-from-now) applied to every job, or an Array of Numeric values with the same size as items to schedule each job at its corresponding time.

For finer-grained control, use ‘Sidekiq::Client.push_bulk` directly.

Example (3 Redis round trips):

SomeJob.perform_async(1)
SomeJob.perform_async(2)
SomeJob.perform_async(3)

Would instead become (1 Redis round trip):

SomeJob.perform_bulk([[1], [2], [3]])

Scheduling every job 60 seconds from now (single Numeric :at):

SomeJob.perform_bulk([[1], [2], [3]], at: 60)

Scheduling each job at its own time (Array :at):

SomeJob.perform_bulk([[1], [2]], at: [Time.now.to_f + 30, Time.now.to_f + 60])


341
342
343
# File 'lib/sidekiq/job.rb', line 341

def perform_bulk(*args, **kwargs)
  Setter.new(self, {}).perform_bulk(*args, **kwargs)
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/sidekiq/job.rb', line 347

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = ((int < 1_000_000_000) ? now + int : int)

  item = {"class" => self, "args" => args}

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  item["at"] = ts if ts > now

  client_push(item)
end

#perform_inline(*args) ⇒ Object Also known as: perform_sync

Inline execution of job’s perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware



303
304
305
# File 'lib/sidekiq/job.rb', line 303

def perform_inline(*args)
  Setter.new(self, {}).perform_inline(*args)
end

#perform_oneObject

Pop out a single job and perform it

Raises:



275
276
277
278
279
280
# File 'lib/sidekiq/test_api.rb', line 275

def perform_one
  raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
  next_job = jobs.first
  Queues.delete_for(next_job["jid"], next_job["queue"], to_s)
  process_job(next_job)
end

#process_job(job) ⇒ Object



282
283
284
285
286
287
288
289
# File 'lib/sidekiq/test_api.rb', line 282

def process_job(job)
  instance = new
  instance.jid = job["jid"]
  instance.bid = job["bid"] if instance.respond_to?(:bid=)
  Sidekiq::Testing.server_middleware.invoke(instance, job, job["queue"]) do
    execute_job(instance, job["args"])
  end
end

#queueObject

Queue for this worker



251
252
253
# File 'lib/sidekiq/test_api.rb', line 251

def queue
  get_sidekiq_options["queue"]
end

#queue_as(q) ⇒ Object



290
291
292
# File 'lib/sidekiq/job.rb', line 290

def queue_as(q)
  sidekiq_options("queue" => q.to_s)
end

#set(options) ⇒ Object



294
295
296
# File 'lib/sidekiq/job.rb', line 294

def set(options)
  Setter.new(self, options)
end

#sidekiq_options(opts = {}) ⇒ Object

Allows customization for this type of Job. Legal options:

queue - use a named queue for this Job, default 'default'
retry - enable the RetryJobs middleware for this Job, *true* to use the default
   or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
   can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.

In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.



374
375
376
# File 'lib/sidekiq/job.rb', line 374

def sidekiq_options(opts = {})
  super
end