Class: GoodJob::Job

Inherits:
BaseRecord
  • Object
show all
Includes:
AdvisoryLockable, ErrorEvents, Filterable, LockTypeAccessor, Lockable, Reportable
Defined in:
app/models/good_job/job.rb,
app/models/good_job/job/lockable.rb

Overview

Active Record model that represents an ActiveJob job.

Defined Under Namespace

Modules: LockTypeAccessor, Lockable

Constant Summary collapse

PreviouslyPerformedError =

Raised if something attempts to execute a previously completed Execution again.

Class.new(StandardError)
ERROR_MESSAGE_SEPARATOR =

String separating Error Class from Error Message

": "
DEFAULT_QUEUE_NAME =

ActiveJob jobs without a queue_name attribute are placed on this queue.

'default'
DEFAULT_PRIORITY =

ActiveJob jobs without a priority attribute are given this priority.

0
ActionForStateMismatchError =

Raised when an inappropriate action is applied to a Job based on its state.

Class.new(StandardError)
AdapterNotGoodJobError =

Raised when GoodJob is not configured as the Active Job Queue Adapter

Class.new(StandardError)
DiscardJobError =

Attached to a Job’s Execution when the Job is discarded.

Class.new(StandardError)
ActiveJobDeserializationError =

Raised when Active Job data cannot be deserialized

Class.new(StandardError)

Constants included from LockTypeAccessor

LockTypeAccessor::LOCK_TYPES

Constants included from ErrorEvents

ErrorEvents::DISCARDED, ErrorEvents::HANDLED, ErrorEvents::INTERRUPTED, ErrorEvents::RETRIED, ErrorEvents::RETRY_STOPPED, ErrorEvents::UNHANDLED

Constants included from AdvisoryLockable

AdvisoryLockable::ADVISORY_LOCK_COUNTS, AdvisoryLockable::RecordAlreadyAdvisoryLockedError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from LockTypeAccessor

included, #lock_type, #lock_type=

Methods included from Reportable

#last_status_at, #status

Methods included from AdvisoryLockable

#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, hash_function, hash_function=, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock

Methods inherited from BaseRecord

bind_value, lease_connection, migrated?, migration_pending_warning!, with_logger_silenced

Class Method Details

.active_job_id(active_job_id) ⇒ ActiveRecord::Relation

Get jobs with given ActiveJob ID

Parameters:

  • active_job_id (String)

    ActiveJob ID

Returns:

  • (ActiveRecord::Relation)


110
# File 'app/models/good_job/job.rb', line 110

scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) }

.build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object



325
326
327
# File 'app/models/good_job/job.rb', line 325

def self.build_for_enqueue(active_job, scheduled_at: nil)
  new(**enqueue_args(active_job, scheduled_at: scheduled_at))
end

.creation_orderedActiveRecord:Relation

Order jobs by created_at, for first-in first-out

Returns:

  • (ActiveRecord:Relation)


162
# File 'app/models/good_job/job.rb', line 162

scope :creation_ordered, -> { order(created_at: :asc) }

.defer_after_commit_maybe(good_job_or_active_job_classes) ⇒ Object

When code needs to optionally handle enqueue_after_transaction_commit



551
552
553
554
555
556
557
# File 'app/models/good_job/job.rb', line 551

def self.defer_after_commit_maybe(good_job_or_active_job_classes)
  if enqueue_after_commit?(good_job_or_active_job_classes)
    ActiveRecord.after_all_transactions_commit { yield(true) }
  else
    yield(false)
  end
end

.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation

Order jobs for de-queueing

Parameters:

  • parsed_queues (Hash)

    optional output of .queue_parser, parsed queues, will be used for ordered queues.

Returns:

  • (ActiveRecord::Relation)


171
172
173
174
175
176
177
178
179
180
181
182
# File 'app/models/good_job/job.rb', line 171

scope :dequeueing_ordered, (lambda do |parsed_queues|
  relation = self
  relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include]

  relation = if GoodJob.configuration.dequeue_query_sort == :scheduled_at
               relation.priority_ordered.schedule_ordered.order(:id)
             else
               relation.priority_ordered.creation_ordered
             end

  relation
end)

.effective_lock_strategy(strategy = GoodJob.configuration.lock_strategy) ⇒ Object

Returns the effective lock strategy, falling back to :advisory if the lock_type column hasn’t been migrated yet.



318
319
320
321
322
# File 'app/models/good_job/job.rb', line 318

def effective_lock_strategy(strategy = GoodJob.configuration.lock_strategy)
  return :advisory unless lock_type_column_exists?

  strategy
end

.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false, lock_id: nil, lock_type: nil) ⇒ Execution

Places an ActiveJob job on a queue by creating a new Execution record.

Parameters:

  • active_job (ActiveJob::Base)

    The job to enqueue.

  • scheduled_at (Float) (defaults to: nil)

    Epoch timestamp when the job should be executed, if blank will delegate to the ActiveJob instance

  • create_with_advisory_lock (Boolean) (defaults to: false)

    Whether to establish a lock on the Execution record after it is created.

  • lock_id (String, nil) (defaults to: nil)

    When provided, sets locked_by_id/locked_at/lock_type on the job at creation (for inline non-advisory claiming).

  • lock_type (Symbol, nil) (defaults to: nil)

    The lock type to record when lock_id is given (e.g. :skiplocked, :hybrid).

Returns:



496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'app/models/good_job/job.rb', line 496

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false, lock_id: nil, lock_type: nil)
  ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
    current_job = CurrentThread.job

    retried = current_job && current_job.active_job_id == active_job.job_id
    if retried
      job = current_job
      job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at))
      job.scheduled_at ||= Time.current
      # TODO: these values ideally shouldn't be persisted until the current_job is finished
      #   which will require handling `retry_job` being called from outside the job context.
      job.performed_at = nil
      job.finished_at = nil
    else
      job = build_for_enqueue(active_job, scheduled_at: scheduled_at)
    end

    if create_with_advisory_lock
      if job.persisted?
        job.advisory_lock
      else
        job.create_with_advisory_lock = true
      end
    end

    job.assign_attributes(locked_by_id: lock_id, locked_at: Time.current, lock_type: lock_type) if lock_id

    instrument_payload[:job] = job
    begin
      job.save!
    rescue ActiveRecord::RecordNotUnique
      raise unless job.cron_key

      # Active Job doesn't have a clean way to cancel an enqueue for unexceptional reasons
      # This is a workaround to mark it as having been halted in before_enqueue
      active_job.send(:halted_callback_hook, "duplicate_cron_key", "good_job")
      return false
    end

    CurrentThread.retried_job = job if retried

    active_job.provider_job_id = job.id
    raise "These should be equal" if active_job.provider_job_id != active_job.job_id

    job
  end
end

.enqueue_after_commit?(good_job_or_active_job_classes) ⇒ Boolean

Returns:

  • (Boolean)


559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'app/models/good_job/job.rb', line 559

def self.enqueue_after_commit?(good_job_or_active_job_classes)
  good_job_or_active_job_classes = Array(good_job_or_active_job_classes)

  feature_exists = ActiveRecord.respond_to?(:after_all_transactions_commit)
  feature_exists && good_job_or_active_job_classes.any? do |klass|
    active_job_class = case klass
                       when String
                         klass.constantize
                       when Job
                         klass.job_class.constantize
                       else
                         klass
                       end

    active_job_class.respond_to?(:enqueue_after_transaction_commit)
  end
end

.enqueue_args(active_job, scheduled_at: nil) ⇒ Object

Construct arguments for GoodJob::Execution from an ActiveJob instance.



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'app/models/good_job/job.rb', line 330

def self.enqueue_args(active_job, scheduled_at: nil)
  execution_args = {
    id: active_job.job_id,
    active_job_id: active_job.job_id,
    job_class: active_job.class.name,
    queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
    priority: active_job.priority || DEFAULT_PRIORITY,
    serialized_params: active_job.serialize,
    created_at: Time.current,
  }

  execution_args[:scheduled_at] = if scheduled_at
                                    scheduled_at
                                  elsif active_job.scheduled_at
                                    Time.zone.at(active_job.scheduled_at)
                                  else
                                    execution_args[:created_at]
                                  end

  execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)

  if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any?
    labels = active_job.good_job_labels.dup
    labels.map! { |label| label.to_s.strip.presence }
    labels.tap(&:compact!).tap(&:uniq!)
    execution_args[:labels] = labels
  end

  reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
  current_job = CurrentThread.job

  if reenqueued_current_job
    execution_args[:batch_id] = current_job.batch_id
    execution_args[:batch_callback_id] = current_job.batch_callback_id
    execution_args[:cron_key] = current_job.cron_key
  else
    execution_args[:batch_id] = GoodJob::Batch.current_batch_id
    execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id
    execution_args[:cron_key] = CurrentThread.cron_key
    execution_args[:cron_at] = CurrentThread.cron_at
  end

  execution_args
end

.exclude_pausedActiveRecord::Relation

Exclude jobs that are paused via queue_name or job_class. Only applies when enable_pauses configuration is true.

Returns:

  • (ActiveRecord::Relation)


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'app/models/good_job/job.rb', line 130

scope :exclude_paused, lambda {
  return all unless GoodJob.configuration.enable_pauses

  paused_query = GoodJob::Setting.where(key: GoodJob::Setting::PAUSES)
  paused_queues_query = paused_query.select("jsonb_array_elements_text(value->'queues')")
  paused_job_classes_query = paused_query.select("jsonb_array_elements_text(value->'job_classes')")
  paused_labels_query = paused_query.select("jsonb_array_elements_text(value->'labels')")

  where.not(queue_name: paused_queues_query)
       .where.not(job_class: paused_job_classes_query)
       .where(
         Arel::Nodes::Not.new(
           Arel::Nodes::NamedFunction.new(
             "COALESCE", [
               Arel::Nodes::InfixOperation.new('&&', arel_table['labels'], Arel::Nodes::NamedFunction.new('ARRAY', [paused_labels_query.arel])),
               Arel::Nodes::SqlLiteral.new('FALSE'),
             ]
           )
         )
       )
}

.finished_before(timestamp) ⇒ ActiveRecord::Relation

Get Jobs finished before the given timestamp.

Parameters:

  • timestamp (DateTime, Time)

Returns:

  • (ActiveRecord::Relation)


80
# File 'app/models/good_job/job.rb', line 80

scope :finished_before, ->(timestamp) { where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) }

.format_error(error) ⇒ Object

Raises:

  • (ArgumentError)


544
545
546
547
548
# File 'app/models/good_job/job.rb', line 544

def self.format_error(error)
  raise ArgumentError unless error.is_a?(Exception)

  [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join
end

.historic_finished_at_index_migrated?Boolean

Returns:

  • (Boolean)


288
289
290
291
292
293
# File 'app/models/good_job/job.rb', line 288

def historic_finished_at_index_migrated?
  return true if connection.index_name_exists?(:good_jobs, "index_good_jobs_on_queue_name_priority_scheduled_at_unfinished")

  migration_pending_warning!
  false
end

.job_class(name) ⇒ ActiveRecord::Relation

With a given class name

Parameters:

  • name (String)

    Job class name

Returns:

  • (ActiveRecord::Relation)


102
# File 'app/models/good_job/job.rb', line 102

scope :job_class, ->(name) { where(params_job_class.eq(name)) }

.json_string(json, attr) ⇒ Object



272
273
274
# File 'app/models/good_job/job.rb', line 272

def json_string(json, attr)
  Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr)))
end

.lock_type_column_exists?Boolean

Returns true if the lock_type column exists on the good_jobs table. Used to guard lock_type writes for zero-downtime deployment safety: if the migration hasn’t run yet, all strategies fall back to :advisory.

Returns:

  • (Boolean)


305
306
307
308
309
# File 'app/models/good_job/job.rb', line 305

def lock_type_column_exists?
  return @_lock_type_column_exists if defined?(@_lock_type_column_exists)

  @_lock_type_column_exists = connection_pool.with_connection { |conn| conn.column_exists?(:good_jobs, :lock_type) }
end

.lock_type_migrated?Boolean

Returns:

  • (Boolean)


295
296
297
298
299
300
# File 'app/models/good_job/job.rb', line 295

def lock_type_migrated?
  return true if connection.index_name_exists?(:good_jobs, :index_good_jobs_for_candidate_dequeue_unlocked)

  migration_pending_warning!
  false
end

.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>

Fetches the scheduled execution time of the next eligible Execution(s).

Parameters:

  • after (DateTime) (defaults to: nil)
  • limit (Integer) (defaults to: 100)
  • now_limit (Integer, nil) (defaults to: nil)

Returns:

  • (Array<DateTime>)


466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'app/models/good_job/job.rb', line 466

def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
  query = unfinished.where(locked_by_id: nil).schedule_ordered

  after ||= Time.current
  after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime)
  after_query = query.where(arel_table['scheduled_at'].gt(after_bind))
  after_at = after_query.limit(limit).pluck(:scheduled_at)

  if now_limit&.positive?
    now_bind = bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime)
    now_query = query.where(arel_table['scheduled_at'].lt(now_bind))
    now_at = now_query.limit(now_limit).pluck(:scheduled_at)
  end

  Array(now_at) + after_at
end

.only_scheduledActiveRecord::Relation

Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).

Returns:

  • (ActiveRecord::Relation)


123
# File 'app/models/good_job/job.rb', line 123

scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))) }

.params_execution_countObject



280
281
282
283
284
285
286
# File 'app/models/good_job/job.rb', line 280

def params_execution_count
  Arel::Nodes::InfixOperation.new(
    '::',
    json_string(arel_table['serialized_params'], 'executions'),
    Arel.sql('integer')
  )
end

.params_job_classObject



276
277
278
# File 'app/models/good_job/job.rb', line 276

def params_job_class
  arel_table[:job_class]
end

.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?

Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.

Yields:

  • (Execution, nil)

    The next eligible Execution, or nil if none found, before it is performed.

Returns:

  • (ExecutionResult, nil)

    If a job was executed, returns an array with the Execution record, the return value for the job’s #perform method, and the exception the job raised, if any (if the job raised, then the second array entry will be nil). If there were no jobs to execute, returns nil.



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'app/models/good_job/job.rb', line 383

def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil)
  unfinished.dequeueing_ordered(parsed_queues)
            .only_scheduled
            .exclude_paused
            .limit(1)
            .with_advisory_lock_claim(select_limit: queue_select_limit) do |job|
    if job&.executable?
      yield(job) if block_given?
      job.perform(lock_id: lock_id)
    else
      yield(nil) if block_given?
      nil
    end
  end
end

.perform_with_hybrid_lock(lock_id:, parsed_queues: nil) {|Job, nil| ... } ⇒ ExecutionResult?

Claims and performs a job using SELECT FOR UPDATE SKIP LOCKED combined with a session-level advisory lock for pg_locks visibility.

Parameters:

  • lock_id (String)

    Process UUID used as the lock identifier

  • parsed_queues (Hash) (defaults to: nil)

    Optional parsed queue config

Yields:

  • (Job, nil)

    The claimed job, or nil if none found

Returns:



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'app/models/good_job/job.rb', line 444

def self.perform_with_hybrid_lock(lock_id:, parsed_queues: nil)
  unfinished.where(locked_by_id: nil)
            .dequeueing_ordered(parsed_queues)
            .only_scheduled
            .exclude_paused
            .limit(1)
            .with_hybrid_lock_claim(locked_by_id: lock_id, locked_at: Time.current, lock_type: :hybrid) do |job|
    if job
      yield(job) if block_given?
      job.perform(lock_id: lock_id, already_claimed: true)
    else
      yield(nil) if block_given?
      nil
    end
  end
end

.perform_with_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil, lock_strategy: :advisory) {|Job, nil| ... } ⇒ ExecutionResult?

Dispatches to the appropriate perform method based on lock_strategy.

Parameters:

  • lock_id (String)

    Process UUID used as the lock identifier

  • parsed_queues (Hash) (defaults to: nil)

    Optional parsed queue config

  • queue_select_limit (Integer, nil) (defaults to: nil)

    Optional candidate selection limit

  • lock_strategy (Symbol) (defaults to: :advisory)

    One of :advisory, :skiplocked, :hybrid

Yields:

  • (Job, nil)

    The claimed job, or nil if none found

Returns:



406
407
408
409
410
411
412
413
# File 'app/models/good_job/job.rb', line 406

def self.perform_with_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil, lock_strategy: :advisory, &block)
  lock_strategy = effective_lock_strategy(lock_strategy)
  case lock_strategy
  when :advisory   then perform_with_advisory_lock(lock_id: lock_id, parsed_queues: parsed_queues, queue_select_limit: queue_select_limit, &block)
  when :skiplocked then perform_with_skip_locked(lock_id: lock_id, parsed_queues: parsed_queues, &block)
  when :hybrid     then perform_with_hybrid_lock(lock_id: lock_id, parsed_queues: parsed_queues, &block)
  end
end

.perform_with_skip_locked(lock_id:, parsed_queues: nil) {|Job, nil| ... } ⇒ ExecutionResult?

Claims and performs a job using SELECT FOR UPDATE SKIP LOCKED. No session-level advisory locks are acquired.

Parameters:

  • lock_id (String)

    Process UUID used as the lock identifier

  • parsed_queues (Hash) (defaults to: nil)

    Optional parsed queue config

Yields:

  • (Job, nil)

    The claimed job, or nil if none found

Returns:



421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'app/models/good_job/job.rb', line 421

def self.perform_with_skip_locked(lock_id:, parsed_queues: nil)
  unfinished.where(locked_by_id: nil)
            .dequeueing_ordered(parsed_queues)
            .only_scheduled
            .exclude_paused
            .limit(1)
            .with_skip_locked_claim(locked_by_id: lock_id, locked_at: Time.current, lock_type: :skiplocked) do |job|
    if job
      yield(job) if block_given?
      job.perform(lock_id: lock_id, already_claimed: true)
    else
      yield(nil) if block_given?
      nil
    end
  end
end

.priority_orderedActiveRecord::Relation

Order jobs by priority (highest priority first).

Returns:

  • (ActiveRecord::Relation)


156
# File 'app/models/good_job/job.rb', line 156

scope :priority_ordered, -> { order('priority ASC NULLS LAST') }

.queue_ordered(queues) ⇒ ActiveRecord::Relation

Order jobs in order of queues in array param

Parameters:

  • queues (Array<string] ordered names of queues)

    ueues [Array<string] ordered names of queues

Returns:

  • (ActiveRecord::Relation)


189
190
191
192
193
194
# File 'app/models/good_job/job.rb', line 189

scope :queue_ordered, (lambda do |queues|
  clauses = queues.map.with_index do |queue_name, index|
    sanitize_sql_array(["WHEN queue_name = ? THEN ?", queue_name, index])
  end
  order(Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.size} END)"))
end)

.queue_parser(string) ⇒ Hash

Parse a string representing a group of queues into a more readable data structure.

Examples:

GoodJob::Execution.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }

Parameters:

  • string (String)

    Queue string

Returns:

  • (Hash)

    How to match a given queue. It can have the following keys and values:

    • { all: true } indicates that all queues match.

    • { exclude: Array<String> } indicates the listed queue names should not match.

    • { include: Array<String> } indicates the listed queue names should match.

    • { include: Array<String>, ordered_queues: true } indicates the listed queue names should match, and dequeue should respect queue order.



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'app/models/good_job/job.rb', line 244

def queue_parser(string)
  string = string.strip.presence || '*'

  case string.first
  when '-'
    exclude_queues = true
    string = string[1..]
  when '+'
    ordered_queues = true
    string = string[1..]
  end

  queues = string.split(',').map(&:strip)

  if queues.include?('*')
    { all: true }
  elsif exclude_queues
    { exclude: queues }
  elsif ordered_queues
    {
      include: queues,
      ordered_queues: true,
    }
  else
    { include: queues }
  end
end

.queue_string(string) ⇒ ActiveRecord::Relation

Get Jobs on queues that match the given queue string.

Parameters:

  • string (String)

    A string expression describing what queues to select. See queue_parser or README for more details on the format of the string. Note this only handles individual semicolon-separated segments of that string format.

Returns:

  • (ActiveRecord::Relation)


212
213
214
215
216
217
218
219
220
221
222
# File 'app/models/good_job/job.rb', line 212

scope :queue_string, (lambda do |string|
  parsed = queue_parser(string)

  if parsed[:all]
    all
  elsif parsed[:exclude]
    where.not(queue_name: parsed[:exclude]).or where(queue_name: nil)
  elsif parsed[:include]
    where(queue_name: parsed[:include])
  end
end)

.reset_column_informationObject



311
312
313
314
# File 'app/models/good_job/job.rb', line 311

def reset_column_information
  remove_instance_variable(:@_lock_type_column_exists) if instance_variable_defined?(:@_lock_type_column_exists)
  super
end

.schedule_orderedActiveRecord::Relation

Order jobs by scheduled or created (oldest first).

Returns:

  • (ActiveRecord::Relation)


200
# File 'app/models/good_job/job.rb', line 200

scope :schedule_ordered, -> { order(scheduled_at: :asc) }

.unfinishedActiveRecord::Relation

Get jobs that have not yet finished (succeeded or discarded).

Returns:

  • (ActiveRecord::Relation)


116
# File 'app/models/good_job/job.rb', line 116

scope :unfinished, -> { where(finished_at: nil) }

Instance Method Details

#active_job(ignore_deserialization_errors: false) ⇒ Object

Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.

Parameters:

  • ignore_deserialization_errors (Boolean) (defaults to: false)

    Whether to ignore ActiveJob::DeserializationError and NameError when deserializing the arguments. This is most useful if you aren’t planning to use the arguments directly.



742
743
744
745
746
747
748
749
750
# File 'app/models/good_job/job.rb', line 742

def active_job(ignore_deserialization_errors: false)
  ActiveJob::Base.deserialize(active_job_data).tap do |aj|
    aj.send(:deserialize_arguments_if_needed)
  rescue ActiveJob::DeserializationError
    raise unless ignore_deserialization_errors
  end
rescue NameError
  raise unless ignore_deserialization_errors
end

#destroy_jobvoid

This method returns an undefined value.

Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.



729
730
731
732
733
734
735
# File 'app/models/good_job/job.rb', line 729

def destroy_job
  with_advisory_lock do
    raise ActionForStateMismatchError if finished_at.blank?

    destroy
  end
end

#discard_job(message) ⇒ void

This method returns an undefined value.

Discard a job so that it will not be executed further. This action will add a DiscardJobError to the job’s Execution and mark it as finished.



702
703
704
705
706
# File 'app/models/good_job/job.rb', line 702

def discard_job(message)
  with_advisory_lock do
    _discard_job(message)
  end
end

#discarded?Boolean

Tests whether the job has finished but with an error.

Returns:

  • (Boolean)


642
643
644
# File 'app/models/good_job/job.rb', line 642

def discarded?
  finished? && error.present?
end

#display_errorString

Errors for the job to be displayed in the Dashboard.

Returns:

  • (String)


597
598
599
600
601
602
603
# File 'app/models/good_job/job.rb', line 597

def display_error
  return error if error.present?

  serialized_params.fetch('exception_executions', {}).map do |exception, count|
    "#{exception}: #{count}"
  end.join(', ')
end

#display_nameString

Used when displaying this job in the GoodJob dashboard.

Returns:

  • (String)


615
616
617
# File 'app/models/good_job/job.rb', line 615

def display_name
  job_class
end

#display_serialized_paramsHash

Return formatted serialized_params for display in the dashboard

Returns:

  • (Hash)


607
608
609
610
611
# File 'app/models/good_job/job.rb', line 607

def display_serialized_params
  serialized_params.merge({
                            _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'),
                          })
end

#executable?Boolean

Tests whether this job is safe to be executed by this thread.

Returns:

  • (Boolean)


907
908
909
910
911
# File 'app/models/good_job/job.rb', line 907

def executable?
  reload.finished_at.blank?
rescue ActiveRecord::RecordNotFound
  false
end

#executions_countObject



619
620
621
# File 'app/models/good_job/job.rb', line 619

def executions_count
  super || 0
end

#finished?Boolean

Tests whether the job has finished (succeeded or discarded).

Returns:

  • (Boolean)


636
637
638
# File 'app/models/good_job/job.rb', line 636

def finished?
  finished_at.present?
end

#force_discard_job(message) ⇒ Object

Force discard a job so that it will not be executed further. Force discard allows discarding a running job. This action will add a DiscardJobError to the job’s Execution and mark it as finished.



711
712
713
# File 'app/models/good_job/job.rb', line 711

def force_discard_job(message)
  _discard_job(message)
end

#job_stateObject



931
932
933
934
935
# File 'app/models/good_job/job.rb', line 931

def job_state
  state = { queue_name: queue_name }
  state[:scheduled_at] = scheduled_at if scheduled_at
  state
end

#numberObject



913
914
915
# File 'app/models/good_job/job.rb', line 913

def number
  serialized_params.fetch('executions', 0) + 1
end

#perform(lock_id:, already_claimed: false) ⇒ ExecutionResult

Execute the ActiveJob job this Execution represents.

Parameters:

  • lock_id (String)

    Process UUID used as the lock identifier

  • already_claimed (Boolean) (defaults to: false)

    When true, skips writing lock columns (already set by CTE claim)

Returns:

  • (ExecutionResult)

    An array of the return value of the job’s #perform method and the exception raised by the job, if any. If the job completed successfully, the second array entry (the exception) will be nil and vice versa.



759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
# File 'app/models/good_job/job.rb', line 759

def perform(lock_id:, already_claimed: false)
  run_callbacks(:perform) do
    raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

    job_performed_at = Time.current
    monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
    execution = nil
    result = GoodJob::CurrentThread.within do |current_thread|
      current_thread.reset
      current_thread.job = self

      existing_performed_at = performed_at
      if existing_performed_at
        current_thread.execution_interrupted = existing_performed_at

        interrupt_error_string = self.class.format_error(GoodJob::InterruptedError.new("Interrupted after starting perform at '#{existing_performed_at}'"))
        self.error = interrupt_error_string
        self.error_event = ErrorEvents::INTERRUPTED
        monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds

        execution_attrs = {
          error: interrupt_error_string,
          finished_at: job_performed_at,
          error_event: error_event,
          duration: monotonic_duration,
        }
        executions.where(finished_at: nil).where.not(performed_at: nil).update_all(execution_attrs) # rubocop:disable Rails/SkipsModelValidations
      end

      transaction do
        execution_attrs = {
          job_class: job_class,
          queue_name: queue_name,
          serialized_params: serialized_params,
          scheduled_at: scheduled_at || created_at,
          created_at: job_performed_at,
          process_id: lock_id,
        }
        job_attrs = {
          performed_at: job_performed_at,
          executions_count: ((executions_count || 0) + 1),
        }
        unless already_claimed
          job_attrs[:locked_by_id] = lock_id
          job_attrs[:locked_at] = Time.current
          job_attrs[:lock_type] = :advisory if self.class.lock_type_column_exists?
        end

        execution = executions.create!(execution_attrs)
        update!(job_attrs)
      end

      ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
        value = ActiveJob::Base.execute(active_job_data)
        active_job = current_thread.active_job

        if value.is_a?(Exception)
          handled_error = value
          value = nil
        end
        handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard
        error_event = if !handled_error
                        nil
                      elsif handled_error == current_thread.error_on_discard
                        ErrorEvents::DISCARDED
                      elsif handled_error == current_thread.error_on_retry
                        ErrorEvents::RETRIED
                      elsif handled_error == current_thread.error_on_retry_stopped
                        ErrorEvents::RETRY_STOPPED
                      elsif handled_error
                        ErrorEvents::HANDLED
                      end

        instrument_payload.merge!(
          value: value,
          error: handled_error,
          handled_error: handled_error,
          retried: current_thread.retried_job.present?,
          error_event: error_event
        )
        ExecutionResult.new(value: value, active_job: active_job, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job)
      rescue *GoodJob.handled_exception_classes => e
        active_job ||= current_thread.active_job
        error_event = if e.is_a?(GoodJob::InterruptError)
                        ErrorEvents::INTERRUPTED
                      elsif e == current_thread.error_on_retry_stopped
                        ErrorEvents::RETRY_STOPPED
                      else
                        ErrorEvents::UNHANDLED
                      end

        instrument_payload.merge!(
          error: e,
          unhandled_error: e,
          error_event: error_event
        )
        ExecutionResult.new(value: nil, active_job: active_job, unhandled_error: e, error_event: error_event)
      end
    end

    job_attributes = { locked_by_id: nil, locked_at: nil }
    job_attributes[:lock_type] = nil if self.class.lock_type_column_exists?

    job_error = result.handled_error || result.unhandled_error
    if job_error
      error_string = self.class.format_error(job_error)

      job_attributes[:error] = error_string
      job_attributes[:error_event] = result.error_event

      execution.error = error_string
      execution.error_event = result.error_event
      execution.error_backtrace = job_error.backtrace
    else
      job_attributes[:error] = nil
      job_attributes[:error_event] = nil
    end

    job_finished_at = Time.current
    monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds
    job_attributes[:finished_at] = job_finished_at

    execution.finished_at = job_finished_at
    execution.duration = monotonic_duration

    retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error
    reenqueued = result.retried? || retry_unhandled_error
    if reenqueued
      job_attributes[:performed_at] = nil
      job_attributes[:finished_at] = nil
    end

    assign_attributes(job_attributes)
    if finished_at.blank? || cron_key.present? || preserve_job_record?(result)
      transaction do
        execution.save!
        save!
      end
    else
      destroy!
    end

    result
  end
end

#queue_latencyObject

Time between when this job was expected to run and when it started running



918
919
920
921
922
923
924
# File 'app/models/good_job/job.rb', line 918

def queue_latency
  now = Time.zone.now
  expected_start = scheduled_at || created_at
  actual_start = performed_at || finished_at || now

  actual_start - expected_start unless expected_start >= now
end

#recent_errorString

The most recent error message. If the job has been retried, the error will be fetched from the previous Execution record.

Returns:

  • (String)


586
587
588
589
590
591
592
593
# File 'app/models/good_job/job.rb', line 586

def recent_error
  GoodJob.deprecator.warn(<<~DEPRECATION)
    The `GoodJob::Job#recent_error` method is deprecated and will be removed in the next major release.

    Replace usage of GoodJob::Job#recent_error with `GoodJob::Job#error`.
  DEPRECATION
  error
end

#reschedule_job(scheduled_at = Time.current) ⇒ void

This method returns an undefined value.

Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.

Parameters:

  • scheduled_at (DateTime, Time) (defaults to: Time.current)

    When to reschedule the job



718
719
720
721
722
723
724
725
# File 'app/models/good_job/job.rb', line 718

def reschedule_job(scheduled_at = Time.current)
  with_advisory_lock do
    reload
    raise ActionForStateMismatchError if finished_at.present?

    update(scheduled_at: scheduled_at)
  end
end

#retry_jobActiveJob::Base

Retry a job that has errored and been discarded. This action will create a new Execution record for the job.

Returns:

  • (ActiveJob::Base)


655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
# File 'app/models/good_job/job.rb', line 655

def retry_job
  Rails.application.executor.wrap do
    with_advisory_lock do
      reload
      active_job = self.active_job(ignore_deserialization_errors: true)

      raise ActiveJobDeserializationError if active_job.nil?
      raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter
      raise ActionForStateMismatchError if finished_at.blank? || error.blank?

      # Update the executions count because the previous execution will not have been preserved
      # Do not update `exception_executions` because that comes from rescue_from's arguments
      active_job.executions = (active_job.executions || 0) + 1

      begin
        error_class, error_message = error.split(ERROR_MESSAGE_SEPARATOR).map(&:strip)
        error = error_class.constantize.new(error_message)
      rescue StandardError
        error = StandardError.new(error)
      end

      new_active_job = nil

      transaction do
        Job.defer_after_commit_maybe(active_job.class) do
          GoodJob::CurrentThread.within do |current_thread|
            current_thread.job = self
            current_thread.retry_now = true

            # NOTE: I18n.with_locale necessary until fixed in rails https://github.com/rails/rails/pull/52121
            I18n.with_locale(active_job.locale) do
              new_active_job = active_job.retry_job(wait: 0, error: error)
            end
          end
        end
        self.error_event = :retried if error
        save!
      end

      new_active_job
    end
  end
end

#running?Boolean

Tests whether the job is being executed right now.

Returns:

  • (Boolean)


625
626
627
628
629
630
631
632
# File 'app/models/good_job/job.rb', line 625

def running?
  # Avoid N+1 Query: `.includes_advisory_locks`
  if has_attribute?(:locktype)
    self['locktype'].present? || locked_by_id.present?
  else
    advisory_locked? || locked_by_id.present?
  end
end

#runtime_latencyObject

Time between when this job started and finished



927
928
929
# File 'app/models/good_job/job.rb', line 927

def runtime_latency
  (finished_at || Time.zone.now) - performed_at if performed_at
end

#succeeded?Boolean

Tests whether the job has finished without error

Returns:

  • (Boolean)


648
649
650
# File 'app/models/good_job/job.rb', line 648

def succeeded?
  finished? && !discarded?
end