Class: GoodJob::Job
- Inherits:
-
BaseRecord
- Object
- ActiveRecord::Base
- BaseRecord
- GoodJob::Job
- Includes:
- AdvisoryLockable, ErrorEvents, Filterable, LockTypeAccessor, Lockable, Reportable
- Defined in:
- app/models/good_job/job.rb,
app/models/good_job/job/lockable.rb
Overview
Active Record model that represents an ActiveJob job.
Defined Under Namespace
Modules: LockTypeAccessor, Lockable
Constant Summary collapse
- PreviouslyPerformedError =
Raised if something attempts to execute a previously completed Execution again.
Class.new(StandardError)
- ERROR_MESSAGE_SEPARATOR =
String separating Error Class from Error Message
": "- DEFAULT_QUEUE_NAME =
ActiveJob jobs without a
queue_nameattribute are placed on this queue. 'default'- DEFAULT_PRIORITY =
ActiveJob jobs without a
priorityattribute are given this priority. 0- ActionForStateMismatchError =
Raised when an inappropriate action is applied to a Job based on its state.
Class.new(StandardError)
- AdapterNotGoodJobError =
Raised when GoodJob is not configured as the Active Job Queue Adapter
Class.new(StandardError)
- DiscardJobError =
Attached to a Job’s Execution when the Job is discarded.
Class.new(StandardError)
- ActiveJobDeserializationError =
Raised when Active Job data cannot be deserialized
Class.new(StandardError)
Constants included from LockTypeAccessor
Constants included from ErrorEvents
ErrorEvents::DISCARDED, ErrorEvents::HANDLED, ErrorEvents::INTERRUPTED, ErrorEvents::RETRIED, ErrorEvents::RETRY_STOPPED, ErrorEvents::UNHANDLED
Constants included from AdvisoryLockable
AdvisoryLockable::ADVISORY_LOCK_COUNTS, AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary collapse
-
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID.
- .build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
-
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out.
-
.defer_after_commit_maybe(good_job_or_active_job_classes) ⇒ Object
When code needs to optionally handle enqueue_after_transaction_commit.
-
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing.
-
.effective_lock_strategy(strategy = GoodJob.configuration.lock_strategy) ⇒ Object
Returns the effective lock strategy, falling back to :advisory if the lock_type column hasn’t been migrated yet.
-
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false, lock_id: nil, lock_type: nil) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
- .enqueue_after_commit?(good_job_or_active_job_classes) ⇒ Boolean
-
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
-
.exclude_paused ⇒ ActiveRecord::Relation
Exclude jobs that are paused via queue_name or job_class.
-
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
- .format_error(error) ⇒ Object
- .historic_finished_at_index_migrated? ⇒ Boolean
-
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name.
- .json_string(json, attr) ⇒ Object
-
.lock_type_column_exists? ⇒ Boolean
Returns true if the lock_type column exists on the good_jobs table.
- .lock_type_migrated? ⇒ Boolean
-
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
-
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
- .params_execution_count ⇒ Object
- .params_job_class ⇒ Object
-
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
-
.perform_with_hybrid_lock(lock_id:, parsed_queues: nil) {|Job, nil| ... } ⇒ ExecutionResult?
Claims and performs a job using SELECT FOR UPDATE SKIP LOCKED combined with a session-level advisory lock for pg_locks visibility.
-
.perform_with_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil, lock_strategy: :advisory) {|Job, nil| ... } ⇒ ExecutionResult?
Dispatches to the appropriate perform method based on lock_strategy.
-
.perform_with_skip_locked(lock_id:, parsed_queues: nil) {|Job, nil| ... } ⇒ ExecutionResult?
Claims and performs a job using SELECT FOR UPDATE SKIP LOCKED.
-
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
-
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param.
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
-
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
- .reset_column_information ⇒ Object
-
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
-
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
Instance Method Summary collapse
-
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
-
#destroy_job ⇒ void
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
-
#discard_job(message) ⇒ void
Discard a job so that it will not be executed further.
-
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
-
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
-
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
-
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard.
-
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
- #executions_count ⇒ Object
-
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
-
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further.
- #job_state ⇒ Object
- #number ⇒ Object
-
#perform(lock_id:, already_claimed: false) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
-
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running.
-
#recent_error ⇒ String
The most recent error message.
-
#reschedule_job(scheduled_at = Time.current) ⇒ void
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
-
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded.
-
#running? ⇒ Boolean
Tests whether the job is being executed right now.
-
#runtime_latency ⇒ Object
Time between when this job started and finished.
-
#succeeded? ⇒ Boolean
Tests whether the job has finished without error.
Methods included from LockTypeAccessor
included, #lock_type, #lock_type=
Methods included from Reportable
Methods included from AdvisoryLockable
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, hash_function, hash_function=, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, lease_connection, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID
110 |
# File 'app/models/good_job/job.rb', line 110 scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } |
.build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
325 326 327 |
# File 'app/models/good_job/job.rb', line 325 def self.build_for_enqueue(active_job, scheduled_at: nil) new(**enqueue_args(active_job, scheduled_at: scheduled_at)) end |
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out
162 |
# File 'app/models/good_job/job.rb', line 162 scope :creation_ordered, -> { order(created_at: :asc) } |
.defer_after_commit_maybe(good_job_or_active_job_classes) ⇒ Object
When code needs to optionally handle enqueue_after_transaction_commit
551 552 553 554 555 556 557 |
# File 'app/models/good_job/job.rb', line 551 def self.defer_after_commit_maybe(good_job_or_active_job_classes) if enqueue_after_commit?(good_job_or_active_job_classes) ActiveRecord.after_all_transactions_commit { yield(true) } else yield(false) end end |
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing
171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'app/models/good_job/job.rb', line 171 scope :dequeueing_ordered, (lambda do |parsed_queues| relation = self relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] relation = if GoodJob.configuration.dequeue_query_sort == :scheduled_at relation.priority_ordered.schedule_ordered.order(:id) else relation.priority_ordered.creation_ordered end relation end) |
.effective_lock_strategy(strategy = GoodJob.configuration.lock_strategy) ⇒ Object
Returns the effective lock strategy, falling back to :advisory if the lock_type column hasn’t been migrated yet.
318 319 320 321 322 |
# File 'app/models/good_job/job.rb', line 318 def effective_lock_strategy(strategy = GoodJob.configuration.lock_strategy) return :advisory unless lock_type_column_exists? strategy end |
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false, lock_id: nil, lock_type: nil) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 |
# File 'app/models/good_job/job.rb', line 496 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false, lock_id: nil, lock_type: nil) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| current_job = CurrentThread.job retried = current_job && current_job.active_job_id == active_job.job_id if retried job = current_job job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) job.scheduled_at ||= Time.current # TODO: these values ideally shouldn't be persisted until the current_job is finished # which will require handling `retry_job` being called from outside the job context. job.performed_at = nil job.finished_at = nil else job = build_for_enqueue(active_job, scheduled_at: scheduled_at) end if create_with_advisory_lock if job.persisted? job.advisory_lock else job.create_with_advisory_lock = true end end job.assign_attributes(locked_by_id: lock_id, locked_at: Time.current, lock_type: lock_type) if lock_id instrument_payload[:job] = job begin job.save! rescue ActiveRecord::RecordNotUnique raise unless job.cron_key # Active Job doesn't have a clean way to cancel an enqueue for unexceptional reasons # This is a workaround to mark it as having been halted in before_enqueue active_job.send(:halted_callback_hook, "duplicate_cron_key", "good_job") return false end CurrentThread.retried_job = job if retried active_job.provider_job_id = job.id raise "These should be equal" if active_job.provider_job_id != active_job.job_id job end end |
.enqueue_after_commit?(good_job_or_active_job_classes) ⇒ Boolean
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'app/models/good_job/job.rb', line 559 def self.enqueue_after_commit?(good_job_or_active_job_classes) good_job_or_active_job_classes = Array(good_job_or_active_job_classes) feature_exists = ActiveRecord.respond_to?(:after_all_transactions_commit) feature_exists && good_job_or_active_job_classes.any? do |klass| active_job_class = case klass when String klass.constantize when Job klass.job_class.constantize else klass end active_job_class.respond_to?(:enqueue_after_transaction_commit) end end |
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'app/models/good_job/job.rb', line 330 def self.enqueue_args(active_job, scheduled_at: nil) execution_args = { id: active_job.job_id, active_job_id: active_job.job_id, job_class: active_job.class.name, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, created_at: Time.current, } execution_args[:scheduled_at] = if scheduled_at scheduled_at elsif active_job.scheduled_at Time.zone.at(active_job.scheduled_at) else execution_args[:created_at] end execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? labels = active_job.good_job_labels.dup labels.map! { |label| label.to_s.strip.presence } labels.tap(&:compact!).tap(&:uniq!) execution_args[:labels] = labels end reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_job = CurrentThread.job if reenqueued_current_job execution_args[:batch_id] = current_job.batch_id execution_args[:batch_callback_id] = current_job.batch_callback_id execution_args[:cron_key] = current_job.cron_key else execution_args[:batch_id] = GoodJob::Batch.current_batch_id execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at end execution_args end |
.exclude_paused ⇒ ActiveRecord::Relation
Exclude jobs that are paused via queue_name or job_class. Only applies when enable_pauses configuration is true.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'app/models/good_job/job.rb', line 130 scope :exclude_paused, lambda { return all unless GoodJob.configuration.enable_pauses paused_query = GoodJob::Setting.where(key: GoodJob::Setting::PAUSES) paused_queues_query = paused_query.select("jsonb_array_elements_text(value->'queues')") paused_job_classes_query = paused_query.select("jsonb_array_elements_text(value->'job_classes')") paused_labels_query = paused_query.select("jsonb_array_elements_text(value->'labels')") where.not(queue_name: paused_queues_query) .where.not(job_class: paused_job_classes_query) .where( Arel::Nodes::Not.new( Arel::Nodes::NamedFunction.new( "COALESCE", [ Arel::Nodes::InfixOperation.new('&&', arel_table['labels'], Arel::Nodes::NamedFunction.new('ARRAY', [paused_labels_query.arel])), Arel::Nodes::SqlLiteral.new('FALSE'), ] ) ) ) } |
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
80 |
# File 'app/models/good_job/job.rb', line 80 scope :finished_before, ->() { where(arel_table['finished_at'].lteq(bind_value('finished_at', , ActiveRecord::Type::DateTime))) } |
.format_error(error) ⇒ Object
544 545 546 547 548 |
# File 'app/models/good_job/job.rb', line 544 def self.format_error(error) raise ArgumentError unless error.is_a?(Exception) [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.].join end |
.historic_finished_at_index_migrated? ⇒ Boolean
288 289 290 291 292 293 |
# File 'app/models/good_job/job.rb', line 288 def historic_finished_at_index_migrated? return true if connection.index_name_exists?(:good_jobs, "index_good_jobs_on_queue_name_priority_scheduled_at_unfinished") migration_pending_warning! false end |
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name
102 |
# File 'app/models/good_job/job.rb', line 102 scope :job_class, ->(name) { where(params_job_class.eq(name)) } |
.json_string(json, attr) ⇒ Object
272 273 274 |
# File 'app/models/good_job/job.rb', line 272 def json_string(json, attr) Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) end |
.lock_type_column_exists? ⇒ Boolean
Returns true if the lock_type column exists on the good_jobs table. Used to guard lock_type writes for zero-downtime deployment safety: if the migration hasn’t run yet, all strategies fall back to :advisory.
305 306 307 308 309 |
# File 'app/models/good_job/job.rb', line 305 def lock_type_column_exists? return @_lock_type_column_exists if defined?(@_lock_type_column_exists) @_lock_type_column_exists = connection_pool.with_connection { |conn| conn.column_exists?(:good_jobs, :lock_type) } end |
.lock_type_migrated? ⇒ Boolean
295 296 297 298 299 300 |
# File 'app/models/good_job/job.rb', line 295 def lock_type_migrated? return true if connection.index_name_exists?(:good_jobs, :index_good_jobs_for_candidate_dequeue_unlocked) migration_pending_warning! false end |
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 |
# File 'app/models/good_job/job.rb', line 466 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = unfinished.where(locked_by_id: nil).schedule_ordered after ||= Time.current after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) after_query = query.where(arel_table['scheduled_at'].gt(after_bind)) after_at = after_query.limit(limit).pluck(:scheduled_at) if now_limit&.positive? now_bind = bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime) now_query = query.where(arel_table['scheduled_at'].lt(now_bind)) now_at = now_query.limit(now_limit).pluck(:scheduled_at) end Array(now_at) + after_at end |
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
123 |
# File 'app/models/good_job/job.rb', line 123 scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))) } |
.params_execution_count ⇒ Object
280 281 282 283 284 285 286 |
# File 'app/models/good_job/job.rb', line 280 def params_execution_count Arel::Nodes::InfixOperation.new( '::', json_string(arel_table['serialized_params'], 'executions'), Arel.sql('integer') ) end |
.params_job_class ⇒ Object
276 277 278 |
# File 'app/models/good_job/job.rb', line 276 def params_job_class arel_table[:job_class] end |
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'app/models/good_job/job.rb', line 383 def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) unfinished.dequeueing_ordered(parsed_queues) .only_scheduled .exclude_paused .limit(1) .with_advisory_lock_claim(select_limit: queue_select_limit) do |job| if job&.executable? yield(job) if block_given? job.perform(lock_id: lock_id) else yield(nil) if block_given? nil end end end |
.perform_with_hybrid_lock(lock_id:, parsed_queues: nil) {|Job, nil| ... } ⇒ ExecutionResult?
Claims and performs a job using SELECT FOR UPDATE SKIP LOCKED combined with a session-level advisory lock for pg_locks visibility.
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 |
# File 'app/models/good_job/job.rb', line 444 def self.perform_with_hybrid_lock(lock_id:, parsed_queues: nil) unfinished.where(locked_by_id: nil) .dequeueing_ordered(parsed_queues) .only_scheduled .exclude_paused .limit(1) .with_hybrid_lock_claim(locked_by_id: lock_id, locked_at: Time.current, lock_type: :hybrid) do |job| if job yield(job) if block_given? job.perform(lock_id: lock_id, already_claimed: true) else yield(nil) if block_given? nil end end end |
.perform_with_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil, lock_strategy: :advisory) {|Job, nil| ... } ⇒ ExecutionResult?
Dispatches to the appropriate perform method based on lock_strategy.
406 407 408 409 410 411 412 413 |
# File 'app/models/good_job/job.rb', line 406 def self.perform_with_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil, lock_strategy: :advisory, &block) lock_strategy = effective_lock_strategy(lock_strategy) case lock_strategy when :advisory then perform_with_advisory_lock(lock_id: lock_id, parsed_queues: parsed_queues, queue_select_limit: queue_select_limit, &block) when :skiplocked then perform_with_skip_locked(lock_id: lock_id, parsed_queues: parsed_queues, &block) when :hybrid then perform_with_hybrid_lock(lock_id: lock_id, parsed_queues: parsed_queues, &block) end end |
.perform_with_skip_locked(lock_id:, parsed_queues: nil) {|Job, nil| ... } ⇒ ExecutionResult?
Claims and performs a job using SELECT FOR UPDATE SKIP LOCKED. No session-level advisory locks are acquired.
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'app/models/good_job/job.rb', line 421 def self.perform_with_skip_locked(lock_id:, parsed_queues: nil) unfinished.where(locked_by_id: nil) .dequeueing_ordered(parsed_queues) .only_scheduled .exclude_paused .limit(1) .with_skip_locked_claim(locked_by_id: lock_id, locked_at: Time.current, lock_type: :skiplocked) do |job| if job yield(job) if block_given? job.perform(lock_id: lock_id, already_claimed: true) else yield(nil) if block_given? nil end end end |
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
156 |
# File 'app/models/good_job/job.rb', line 156 scope :priority_ordered, -> { order('priority ASC NULLS LAST') } |
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param
189 190 191 192 193 194 |
# File 'app/models/good_job/job.rb', line 189 scope :queue_ordered, (lambda do |queues| clauses = queues.map.with_index do |queue_name, index| sanitize_sql_array(["WHEN queue_name = ? THEN ?", queue_name, index]) end order(Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.size} END)")) end) |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'app/models/good_job/job.rb', line 244 def queue_parser(string) string = string.strip.presence || '*' case string.first when '-' exclude_queues = true string = string[1..] when '+' ordered_queues = true string = string[1..] end queues = string.split(',').map(&:strip) if queues.include?('*') { all: true } elsif exclude_queues { exclude: queues } elsif ordered_queues { include: queues, ordered_queues: true, } else { include: queues } end end |
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
212 213 214 215 216 217 218 219 220 221 222 |
# File 'app/models/good_job/job.rb', line 212 scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all elsif parsed[:exclude] where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) elsif parsed[:include] where(queue_name: parsed[:include]) end end) |
.reset_column_information ⇒ Object
311 312 313 314 |
# File 'app/models/good_job/job.rb', line 311 def reset_column_information remove_instance_variable(:@_lock_type_column_exists) if instance_variable_defined?(:@_lock_type_column_exists) super end |
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
200 |
# File 'app/models/good_job/job.rb', line 200 scope :schedule_ordered, -> { order(scheduled_at: :asc) } |
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
116 |
# File 'app/models/good_job/job.rb', line 116 scope :unfinished, -> { where(finished_at: nil) } |
Instance Method Details
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
742 743 744 745 746 747 748 749 750 |
# File 'app/models/good_job/job.rb', line 742 def active_job(ignore_deserialization_errors: false) ActiveJob::Base.deserialize(active_job_data).tap do |aj| aj.send(:deserialize_arguments_if_needed) rescue ActiveJob::DeserializationError raise unless ignore_deserialization_errors end rescue NameError raise unless ignore_deserialization_errors end |
#destroy_job ⇒ void
This method returns an undefined value.
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
729 730 731 732 733 734 735 |
# File 'app/models/good_job/job.rb', line 729 def destroy_job with_advisory_lock do raise ActionForStateMismatchError if finished_at.blank? destroy end end |
#discard_job(message) ⇒ void
This method returns an undefined value.
Discard a job so that it will not be executed further. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
702 703 704 705 706 |
# File 'app/models/good_job/job.rb', line 702 def discard_job() with_advisory_lock do _discard_job() end end |
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
642 643 644 |
# File 'app/models/good_job/job.rb', line 642 def discarded? finished? && error.present? end |
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
597 598 599 600 601 602 603 |
# File 'app/models/good_job/job.rb', line 597 def display_error return error if error.present? serialized_params.fetch('exception_executions', {}).map do |exception, count| "#{exception}: #{count}" end.join(', ') end |
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
615 616 617 |
# File 'app/models/good_job/job.rb', line 615 def display_name job_class end |
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard
607 608 609 610 611 |
# File 'app/models/good_job/job.rb', line 607 def display_serialized_params serialized_params.merge({ _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), }) end |
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
907 908 909 910 911 |
# File 'app/models/good_job/job.rb', line 907 def executable? reload.finished_at.blank? rescue ActiveRecord::RecordNotFound false end |
#executions_count ⇒ Object
619 620 621 |
# File 'app/models/good_job/job.rb', line 619 def executions_count super || 0 end |
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
636 637 638 |
# File 'app/models/good_job/job.rb', line 636 def finished? finished_at.present? end |
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further. Force discard allows discarding a running job. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
711 712 713 |
# File 'app/models/good_job/job.rb', line 711 def force_discard_job() _discard_job() end |
#job_state ⇒ Object
931 932 933 934 935 |
# File 'app/models/good_job/job.rb', line 931 def job_state state = { queue_name: queue_name } state[:scheduled_at] = scheduled_at if scheduled_at state end |
#number ⇒ Object
913 914 915 |
# File 'app/models/good_job/job.rb', line 913 def number serialized_params.fetch('executions', 0) + 1 end |
#perform(lock_id:, already_claimed: false) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 |
# File 'app/models/good_job/job.rb', line 759 def perform(lock_id:, already_claimed: false) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at job_performed_at = Time.current monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.job = self existing_performed_at = performed_at if existing_performed_at current_thread.execution_interrupted = existing_performed_at interrupt_error_string = self.class.format_error(GoodJob::InterruptedError.new("Interrupted after starting perform at '#{existing_performed_at}'")) self.error = interrupt_error_string self.error_event = ErrorEvents::INTERRUPTED monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds execution_attrs = { error: interrupt_error_string, finished_at: job_performed_at, error_event: error_event, duration: monotonic_duration, } executions.where(finished_at: nil).where.not(performed_at: nil).update_all(execution_attrs) # rubocop:disable Rails/SkipsModelValidations end transaction do execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: scheduled_at || created_at, created_at: job_performed_at, process_id: lock_id, } job_attrs = { performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1), } unless already_claimed job_attrs[:locked_by_id] = lock_id job_attrs[:locked_at] = Time.current job_attrs[:lock_type] = :advisory if self.class.lock_type_column_exists? end execution = executions.create!(execution_attrs) update!(job_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) active_job = current_thread.active_job if value.is_a?(Exception) handled_error = value value = nil end handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard error_event = if !handled_error nil elsif handled_error == current_thread.error_on_discard ErrorEvents::DISCARDED elsif handled_error == current_thread.error_on_retry ErrorEvents::RETRIED elsif handled_error == current_thread.error_on_retry_stopped ErrorEvents::RETRY_STOPPED elsif handled_error ErrorEvents::HANDLED end instrument_payload.merge!( value: value, error: handled_error, handled_error: handled_error, retried: current_thread.retried_job.present?, error_event: error_event ) ExecutionResult.new(value: value, active_job: active_job, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) rescue *GoodJob.handled_exception_classes => e active_job ||= current_thread.active_job error_event = if e.is_a?(GoodJob::InterruptError) ErrorEvents::INTERRUPTED elsif e == current_thread.error_on_retry_stopped ErrorEvents::RETRY_STOPPED else ErrorEvents::UNHANDLED end instrument_payload.merge!( error: e, unhandled_error: e, error_event: error_event ) ExecutionResult.new(value: nil, active_job: active_job, unhandled_error: e, error_event: error_event) end end job_attributes = { locked_by_id: nil, locked_at: nil } job_attributes[:lock_type] = nil if self.class.lock_type_column_exists? job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error) job_attributes[:error] = error_string job_attributes[:error_event] = result.error_event execution.error = error_string execution.error_event = result.error_event execution.error_backtrace = job_error.backtrace else job_attributes[:error] = nil job_attributes[:error_event] = nil end job_finished_at = Time.current monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds job_attributes[:finished_at] = job_finished_at execution.finished_at = job_finished_at execution.duration = monotonic_duration retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error reenqueued = result.retried? || retry_unhandled_error if reenqueued job_attributes[:performed_at] = nil job_attributes[:finished_at] = nil end assign_attributes(job_attributes) if finished_at.blank? || cron_key.present? || preserve_job_record?(result) transaction do execution.save! save! end else destroy! end result end end |
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running
918 919 920 921 922 923 924 |
# File 'app/models/good_job/job.rb', line 918 def queue_latency now = Time.zone.now expected_start = scheduled_at || created_at actual_start = performed_at || finished_at || now actual_start - expected_start unless expected_start >= now end |
#recent_error ⇒ String
The most recent error message. If the job has been retried, the error will be fetched from the previous Execution record.
586 587 588 589 590 591 592 593 |
# File 'app/models/good_job/job.rb', line 586 def recent_error GoodJob.deprecator.warn(<<~DEPRECATION) The `GoodJob::Job#recent_error` method is deprecated and will be removed in the next major release. Replace usage of GoodJob::Job#recent_error with `GoodJob::Job#error`. DEPRECATION error end |
#reschedule_job(scheduled_at = Time.current) ⇒ void
This method returns an undefined value.
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
718 719 720 721 722 723 724 725 |
# File 'app/models/good_job/job.rb', line 718 def reschedule_job(scheduled_at = Time.current) with_advisory_lock do reload raise ActionForStateMismatchError if finished_at.present? update(scheduled_at: scheduled_at) end end |
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded. This action will create a new Execution record for the job.
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 |
# File 'app/models/good_job/job.rb', line 655 def retry_job Rails.application.executor.wrap do with_advisory_lock do reload active_job = self.active_job(ignore_deserialization_errors: true) raise ActiveJobDeserializationError if active_job.nil? raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter raise ActionForStateMismatchError if finished_at.blank? || error.blank? # Update the executions count because the previous execution will not have been preserved # Do not update `exception_executions` because that comes from rescue_from's arguments active_job.executions = (active_job.executions || 0) + 1 begin error_class, = error.split(ERROR_MESSAGE_SEPARATOR).map(&:strip) error = error_class.constantize.new() rescue StandardError error = StandardError.new(error) end new_active_job = nil transaction do Job.defer_after_commit_maybe(active_job.class) do GoodJob::CurrentThread.within do |current_thread| current_thread.job = self current_thread.retry_now = true # NOTE: I18n.with_locale necessary until fixed in rails https://github.com/rails/rails/pull/52121 I18n.with_locale(active_job.locale) do new_active_job = active_job.retry_job(wait: 0, error: error) end end end self.error_event = :retried if error save! end new_active_job end end end |
#running? ⇒ Boolean
Tests whether the job is being executed right now.
625 626 627 628 629 630 631 632 |
# File 'app/models/good_job/job.rb', line 625 def running? # Avoid N+1 Query: `.includes_advisory_locks` if has_attribute?(:locktype) self['locktype'].present? || locked_by_id.present? else advisory_locked? || locked_by_id.present? end end |
#runtime_latency ⇒ Object
Time between when this job started and finished
927 928 929 |
# File 'app/models/good_job/job.rb', line 927 def runtime_latency (finished_at || Time.zone.now) - performed_at if performed_at end |
#succeeded? ⇒ Boolean
Tests whether the job has finished without error
648 649 650 |
# File 'app/models/good_job/job.rb', line 648 def succeeded? finished? && !discarded? end |