Class: ActiveJob::QueueAdapters::AsyncAdapter

Inherits:
AbstractAdapter show all
Defined in:
lib/active_job/queue_adapters/async_adapter.rb

Overview

Active Job Async adapter

The Async adapter runs jobs with an in-process thread pool.

This is the default queue adapter. It’s well-suited for dev/test since it doesn’t need an external infrastructure, but it’s a poor fit for production since it drops pending jobs on restart.

To use this adapter, set queue adapter to :async:

config.active_job.queue_adapter = :async

To configure the adapter’s thread pool, instantiate the adapter and pass your own config:

config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
  min_threads: 1,
  max_threads: 2 * Concurrent.processor_count,
  idletime: 600.seconds

The adapter uses a Concurrent Ruby thread pool to schedule and execute jobs. Since jobs share a single thread pool, long-running jobs will block short-lived jobs. Fine for dev/test; bad for production.

Defined Under Namespace

Classes: JobWrapper, Scheduler

Instance Method Summary collapse

Methods inherited from AbstractAdapter

#enqueue_after_transaction_commit?

Constructor Details

#initialize(**executor_options) ⇒ AsyncAdapter

See Concurrent::ThreadPoolExecutor for executor options.



35
36
37
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 35

def initialize(**executor_options)
  @scheduler = Scheduler.new(**executor_options)
end

Instance Method Details

#enqueue(job) ⇒ Object

:nodoc:



39
40
41
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 39

def enqueue(job) # :nodoc:
  @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end

#enqueue_at(job, timestamp) ⇒ Object

:nodoc:



43
44
45
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 43

def enqueue_at(job, timestamp) # :nodoc:
  @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end

#immediate=(immediate) ⇒ Object

Used for our test suite.



55
56
57
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 55

def immediate=(immediate) # :nodoc:
  @scheduler.immediate = immediate
end

#shutdown(wait: true) ⇒ Object

Gracefully stop processing jobs. Finishes in-progress work and handles any new jobs following the executor’s fallback policy (‘caller_runs`). Waits for termination by default. Pass `wait: false` to continue.



50
51
52
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 50

def shutdown(wait: true) # :nodoc:
  @scheduler.shutdown wait: wait
end