Class: ActiveJob::QueueAdapters::KarafkaAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/active_job/queue_adapters/karafka_adapter.rb

Overview

Karafka adapter for enqueuing jobs This is here for ease of integration with ActiveJob.

Instance Method Summary collapse

Instance Method Details

#enqueue(job) ⇒ Object

Enqueues the job using the configured dispatcher

Parameters:

  • job (Object)

    job that should be enqueued



40
41
42
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 40

def enqueue(job)
  dispatcher.dispatch(job)
end

#enqueue_after_transaction_commit?true

Returns should we by default enqueue after the transaction and not during. Defaults to true to prevent weird issues during rollbacks, etc.

Returns:

  • (true)

    should we by default enqueue after the transaction and not during. Defaults to true to prevent weird issues during rollbacks, etc.



63
64
65
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 63

def enqueue_after_transaction_commit?
  true
end

#enqueue_all(jobs) ⇒ Integer

Enqueues multiple jobs in one go

Parameters:

  • jobs (Array<Object>)

    jobs that we want to enqueue

Returns:

  • (Integer)

    number of jobs enqueued (required by Rails)



47
48
49
50
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 47

def enqueue_all(jobs)
  dispatcher.dispatch_many(jobs)
  jobs.size
end

#enqueue_at(job, timestamp) ⇒ Object

Delegates time sensitive dispatch to the dispatcher. OSS will raise error, Pro will handle this as it supports scheduled messages.

Parameters:

  • job (Object)

    job we want to enqueue

  • timestamp (Time)

    time when job should run



57
58
59
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 57

def enqueue_at(job, timestamp)
  dispatcher.dispatch_at(job, timestamp)
end

#stopping?Boolean

Returns should we stop the job. Used by the ActiveJob continuation feature.

Returns:

  • (Boolean)

    should we stop the job. Used by the ActiveJob continuation feature



68
69
70
# File 'lib/active_job/queue_adapters/karafka_adapter.rb', line 68

def stopping?
  ::Karafka::App.done?
end