Class: GoodJob::Job
- Inherits:
-
BaseRecord
- Object
- ActiveRecord::Base
- BaseRecord
- GoodJob::Job
- Includes:
- AdvisoryLockable, ErrorEvents, Filterable, Reportable
- Defined in:
- app/models/good_job/job.rb
Overview
Active Record model that represents an ActiveJob
job.
Constant Summary collapse
- PreviouslyPerformedError =
Raised if something attempts to execute a previously completed Execution again.
Class.new(StandardError)
- ERROR_MESSAGE_SEPARATOR =
String separating Error Class from Error Message
": "
- DEFAULT_QUEUE_NAME =
ActiveJob jobs without a
queue_name
attribute are placed on this queue. 'default'
- DEFAULT_PRIORITY =
ActiveJob jobs without a
priority
attribute are given this priority. 0
- ActionForStateMismatchError =
Raised when an inappropriate action is applied to a Job based on its state.
Class.new(StandardError)
- AdapterNotGoodJobError =
Raised when GoodJob is not configured as the Active Job Queue Adapter
Class.new(StandardError)
- DiscardJobError =
Attached to a Job’s Execution when the Job is discarded.
Class.new(StandardError)
- ActiveJobDeserializationError =
Raised when Active Job data cannot be deserialized
Class.new(StandardError)
Constants included from AdvisoryLockable
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary collapse
-
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID.
- .build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
- .coalesce_scheduled_at_created_at ⇒ Object
-
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out.
-
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing.
-
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
-
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
-
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
- .format_error(error) ⇒ Object
-
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name.
- .json_string(json, attr) ⇒ Object
-
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
-
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
- .params_execution_count ⇒ Object
- .params_job_class ⇒ Object
-
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
-
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
-
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param.
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
-
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
-
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
-
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
Instance Method Summary collapse
-
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
-
#destroy_job ⇒ void
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
-
#discard_job(message) ⇒ void
Discard a job so that it will not be executed further.
-
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
-
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
-
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
-
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard.
-
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
- #executions_count ⇒ Object
-
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
-
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further.
- #job_state ⇒ Object
- #number ⇒ Object
-
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
-
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running.
-
#recent_error ⇒ String
The most recent error message.
-
#reschedule_job(scheduled_at = Time.current) ⇒ void
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
-
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded.
-
#running? ⇒ Boolean
Tests whether the job is being executed right now.
-
#runtime_latency ⇒ Object
Time between when this job started and finished.
-
#succeeded? ⇒ Boolean
Tests whether the job has finished without error.
Methods included from Reportable
Methods included from AdvisoryLockable
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID
83 |
# File 'app/models/good_job/job.rb', line 83 scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } |
.build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
230 231 232 |
# File 'app/models/good_job/job.rb', line 230 def self.build_for_enqueue(active_job, scheduled_at: nil) new(**enqueue_args(active_job, scheduled_at: scheduled_at)) end |
.coalesce_scheduled_at_created_at ⇒ Object
225 226 227 |
# File 'app/models/good_job/job.rb', line 225 def coalesce_scheduled_at_created_at arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) end |
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out
108 |
# File 'app/models/good_job/job.rb', line 108 scope :creation_ordered, -> { order(created_at: :asc) } |
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing
117 118 119 120 121 122 123 |
# File 'app/models/good_job/job.rb', line 117 scope :dequeueing_ordered, (lambda do |parsed_queues| relation = self relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] relation = relation.priority_ordered.creation_ordered relation end) |
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'app/models/good_job/job.rb', line 339 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| current_job = CurrentThread.job retried = current_job && current_job.active_job_id == active_job.job_id if retried job = current_job job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) job.scheduled_at ||= Time.current # TODO: these values ideally shouldn't be persisted until the current_job is finished # which will require handling `retry_job` being called from outside the job context. job.performed_at = nil job.finished_at = nil else job = build_for_enqueue(active_job, scheduled_at: scheduled_at) end if create_with_advisory_lock if job.persisted? job.advisory_lock else job.create_with_advisory_lock = true end end instrument_payload[:job] = job job.save! CurrentThread.retried_job = job if retried active_job.provider_job_id = job.id raise "These should be equal" if active_job.provider_job_id != active_job.job_id job end end |
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'app/models/good_job/job.rb', line 235 def self.enqueue_args(active_job, scheduled_at: nil) execution_args = { id: active_job.job_id, active_job_id: active_job.job_id, job_class: active_job.class.name, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, created_at: Time.current, } execution_args[:scheduled_at] = if scheduled_at scheduled_at elsif active_job.scheduled_at Time.zone.at(active_job.scheduled_at) else execution_args[:created_at] end execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? labels = active_job.good_job_labels.dup labels.map! { |label| label.to_s.strip.presence } labels.tap(&:compact!).tap(&:uniq!) execution_args[:labels] = labels end reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_job = CurrentThread.job if reenqueued_current_job execution_args[:batch_id] = current_job.batch_id execution_args[:batch_callback_id] = current_job.batch_callback_id execution_args[:cron_key] = current_job.cron_key else execution_args[:batch_id] = GoodJob::Batch.current_batch_id execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at end execution_args end |
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
53 |
# File 'app/models/good_job/job.rb', line 53 scope :finished_before, ->() { where(arel_table['finished_at'].lteq(bind_value('finished_at', , ActiveRecord::Type::DateTime))) } |
.format_error(error) ⇒ Object
376 377 378 379 380 |
# File 'app/models/good_job/job.rb', line 376 def self.format_error(error) raise ArgumentError unless error.is_a?(Exception) [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.].join end |
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name
75 |
# File 'app/models/good_job/job.rb', line 75 scope :job_class, ->(name) { where(params_job_class.eq(name)) } |
.json_string(json, attr) ⇒ Object
209 210 211 |
# File 'app/models/good_job/job.rb', line 209 def json_string(json, attr) Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) end |
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'app/models/good_job/job.rb', line 314 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered after ||= Time.current after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind)) after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { || .compact.first } if now_limit&.positive? now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil) now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { || .compact.first } end Array(now_at) + after_at end |
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
96 |
# File 'app/models/good_job/job.rb', line 96 scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } |
.params_execution_count ⇒ Object
217 218 219 220 221 222 223 |
# File 'app/models/good_job/job.rb', line 217 def params_execution_count Arel::Nodes::InfixOperation.new( '::', json_string(arel_table['serialized_params'], 'executions'), Arel.sql('integer') ) end |
.params_job_class ⇒ Object
213 214 215 |
# File 'app/models/good_job/job.rb', line 213 def params_job_class json_string(arel_table['serialized_params'], 'job_class') end |
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'app/models/good_job/job.rb', line 288 def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) job = nil result = nil unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs| job = jobs.first if job&.executable? yield(job) if block_given? result = job.perform(lock_id: lock_id) else job = nil yield(nil) if block_given? end end job&.run_callbacks(:perform_unlocked) result end |
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
102 |
# File 'app/models/good_job/job.rb', line 102 scope :priority_ordered, -> { order('priority ASC NULLS LAST') } |
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param
130 131 132 133 134 135 |
# File 'app/models/good_job/job.rb', line 130 scope :queue_ordered, (lambda do |queues| clauses = queues.map.with_index do |queue_name, index| sanitize_sql_array(["WHEN queue_name = ? THEN ?", queue_name, index]) end order(Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.size} END)")) end) |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'app/models/good_job/job.rb', line 181 def queue_parser(string) string = string.strip.presence || '*' case string.first when '-' exclude_queues = true string = string[1..] when '+' ordered_queues = true string = string[1..] end queues = string.split(',').map(&:strip) if queues.include?('*') { all: true } elsif exclude_queues { exclude: queues } elsif ordered_queues { include: queues, ordered_queues: true, } else { include: queues } end end |
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
153 154 155 156 157 158 159 160 161 162 163 |
# File 'app/models/good_job/job.rb', line 153 scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all elsif parsed[:exclude] where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) elsif parsed[:include] where(queue_name: parsed[:include]) end end) |
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
141 |
# File 'app/models/good_job/job.rb', line 141 scope :schedule_ordered, -> { order(coalesce_scheduled_at_created_at.asc) } |
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
89 |
# File 'app/models/good_job/job.rb', line 89 scope :unfinished, -> { where(finished_at: nil) } |
Instance Method Details
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
534 535 536 537 538 539 540 541 542 |
# File 'app/models/good_job/job.rb', line 534 def active_job(ignore_deserialization_errors: false) ActiveJob::Base.deserialize(active_job_data).tap do |aj| aj.send(:deserialize_arguments_if_needed) rescue ActiveJob::DeserializationError raise unless ignore_deserialization_errors end rescue NameError raise unless ignore_deserialization_errors end |
#destroy_job ⇒ void
This method returns an undefined value.
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
521 522 523 524 525 526 527 |
# File 'app/models/good_job/job.rb', line 521 def destroy_job with_advisory_lock do raise ActionForStateMismatchError if finished_at.blank? destroy end end |
#discard_job(message) ⇒ void
This method returns an undefined value.
Discard a job so that it will not be executed further. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
494 495 496 497 498 |
# File 'app/models/good_job/job.rb', line 494 def discard_job() with_advisory_lock do _discard_job() end end |
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
442 443 444 |
# File 'app/models/good_job/job.rb', line 442 def discarded? finished? && error.present? end |
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
397 398 399 400 401 402 403 |
# File 'app/models/good_job/job.rb', line 397 def display_error return error if error.present? serialized_params.fetch('exception_executions', {}).map do |exception, count| "#{exception}: #{count}" end.join(', ') end |
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
415 416 417 |
# File 'app/models/good_job/job.rb', line 415 def display_name job_class end |
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard
407 408 409 410 411 |
# File 'app/models/good_job/job.rb', line 407 def display_serialized_params serialized_params.merge({ _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), }) end |
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
686 687 688 689 690 |
# File 'app/models/good_job/job.rb', line 686 def executable? reload.finished_at.blank? rescue ActiveRecord::RecordNotFound false end |
#executions_count ⇒ Object
419 420 421 |
# File 'app/models/good_job/job.rb', line 419 def executions_count super || 0 end |
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
436 437 438 |
# File 'app/models/good_job/job.rb', line 436 def finished? finished_at.present? end |
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further. Force discard allows discarding a running job. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
503 504 505 |
# File 'app/models/good_job/job.rb', line 503 def force_discard_job() _discard_job() end |
#job_state ⇒ Object
710 711 712 713 714 |
# File 'app/models/good_job/job.rb', line 710 def job_state state = { queue_name: queue_name } state[:scheduled_at] = scheduled_at if scheduled_at state end |
#number ⇒ Object
692 693 694 |
# File 'app/models/good_job/job.rb', line 692 def number serialized_params.fetch('executions', 0) + 1 end |
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'app/models/good_job/job.rb', line 549 def perform(lock_id:) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at job_performed_at = Time.current monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.job = self existing_performed_at = performed_at if existing_performed_at current_thread.execution_interrupted = existing_performed_at interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) self.error = interrupt_error_string self.error_event = :interrupted monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds execution_attrs = { error: interrupt_error_string, finished_at: job_performed_at, error_event: :interrupted, duration: monotonic_duration, } executions.where(finished_at: nil).where.not(performed_at: nil).update_all(execution_attrs) # rubocop:disable Rails/SkipsModelValidations end transaction do execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: scheduled_at || created_at, created_at: job_performed_at, process_id: lock_id, } job_attrs = { performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1), locked_by_id: lock_id, locked_at: Time.current, } execution = executions.create!(execution_attrs) update!(job_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) if value.is_a?(Exception) handled_error = value value = nil end handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard error_event = if handled_error == current_thread.error_on_discard :discarded elsif handled_error == current_thread.error_on_retry :retried elsif handled_error == current_thread.error_on_retry_stopped :retry_stopped elsif handled_error :handled end instrument_payload.merge!( value: value, handled_error: handled_error, retried: current_thread.retried_job.present?, error_event: error_event ) ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) rescue StandardError => e error_event = if e.is_a?(GoodJob::InterruptError) :interrupted elsif e == current_thread.error_on_retry_stopped :retry_stopped else :unhandled end instrument_payload[:unhandled_error] = e ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) end end job_attributes = { locked_by_id: nil, locked_at: nil } job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error) job_attributes[:error] = error_string job_attributes[:error_event] = result.error_event execution.error = error_string execution.error_event = result.error_event execution.error_backtrace = job_error.backtrace else job_attributes[:error] = nil job_attributes[:error_event] = nil end job_finished_at = Time.current monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds job_attributes[:finished_at] = job_finished_at execution.finished_at = job_finished_at execution.duration = monotonic_duration retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error reenqueued = result.retried? || retry_unhandled_error if reenqueued job_attributes[:performed_at] = nil job_attributes[:finished_at] = nil end assign_attributes(job_attributes) preserve_unhandled = result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error) if finished_at.blank? || GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? transaction do execution.save! save! end else destroy! end result end end |
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running
697 698 699 700 701 702 703 |
# File 'app/models/good_job/job.rb', line 697 def queue_latency now = Time.zone.now expected_start = scheduled_at || created_at actual_start = performed_at || finished_at || now actual_start - expected_start unless expected_start >= now end |
#recent_error ⇒ String
The most recent error message. If the job has been retried, the error will be fetched from the previous Execution record.
391 392 393 |
# File 'app/models/good_job/job.rb', line 391 def recent_error error || executions[-2]&.error end |
#reschedule_job(scheduled_at = Time.current) ⇒ void
This method returns an undefined value.
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
510 511 512 513 514 515 516 517 |
# File 'app/models/good_job/job.rb', line 510 def reschedule_job(scheduled_at = Time.current) with_advisory_lock do reload raise ActionForStateMismatchError if finished_at.present? update(scheduled_at: scheduled_at) end end |
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded. This action will create a new Execution record for the job.
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'app/models/good_job/job.rb', line 455 def retry_job with_advisory_lock do reload active_job = self.active_job(ignore_deserialization_errors: true) raise ActiveJobDeserializationError if active_job.nil? raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter raise ActionForStateMismatchError if finished_at.blank? || error.blank? # Update the executions count because the previous execution will not have been preserved # Do not update `exception_executions` because that comes from rescue_from's arguments active_job.executions = (active_job.executions || 0) + 1 begin error_class, = error.split(ERROR_MESSAGE_SEPARATOR).map(&:strip) error = error_class.constantize.new() rescue StandardError error = StandardError.new(error) end new_active_job = nil GoodJob::CurrentThread.within do |current_thread| current_thread.job = self current_thread.retry_now = true self.class.transaction(joinable: false, requires_new: true) do new_active_job = active_job.retry_job(wait: 0, error: error) self.error_event = :retried if error save! end end new_active_job end end |
#running? ⇒ Boolean
Tests whether the job is being executed right now.
425 426 427 428 429 430 431 432 |
# File 'app/models/good_job/job.rb', line 425 def running? # Avoid N+1 Query: `.includes_advisory_locks` if has_attribute?(:locktype) self['locktype'].present? else advisory_locked? end end |
#runtime_latency ⇒ Object
Time between when this job started and finished
706 707 708 |
# File 'app/models/good_job/job.rb', line 706 def runtime_latency (finished_at || Time.zone.now) - performed_at if performed_at end |
#succeeded? ⇒ Boolean
Tests whether the job has finished without error
448 449 450 |
# File 'app/models/good_job/job.rb', line 448 def succeeded? finished? && !discarded? end |