Class: GoodJob::BaseExecution
- Inherits:
-
BaseRecord
- Object
- ActiveRecord::Base
- BaseRecord
- GoodJob::BaseExecution
- Includes:
- AdvisoryLockable, ErrorEvents, Filterable, Reportable
- Defined in:
- app/models/good_job/base_execution.rb
Overview
Direct Known Subclasses
Constant Summary collapse
- PreviouslyPerformedError =
Raised if something attempts to execute a previously completed Execution again.
Class.new(StandardError)
- ERROR_MESSAGE_SEPARATOR =
String separating Error Class from Error Message
": "
- DEFAULT_QUEUE_NAME =
ActiveJob jobs without a
queue_name
attribute are placed on this queue. 'default'
- DEFAULT_PRIORITY =
ActiveJob jobs without a
priority
attribute are given this priority. 0
Constants included from ErrorEvents
ErrorEvents::ERROR_EVENTS, ErrorEvents::ERROR_EVENT_ENUMS
Constants included from AdvisoryLockable
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary collapse
-
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID.
- .build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
- .coalesce_scheduled_at_created_at ⇒ Object
-
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out.
-
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing.
-
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
-
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
-
.finished(timestamp = nil) ⇒ ActiveRecord::Relation
Get completed jobs before the given timestamp.
- .format_error(error) ⇒ Object
-
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name.
- .json_string(json, attr) ⇒ Object
-
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
-
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
- .params_execution_count ⇒ Object
- .params_job_class ⇒ Object
-
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
-
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
-
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param.
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
-
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
-
.running ⇒ ActiveRecord::Relation
Get Jobs that started but not finished yet.
-
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
-
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
Instance Method Summary collapse
-
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
-
#destroy_job ⇒ Object
Destroys this execution and all executions within the same job.
-
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard.
-
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
- #job_state ⇒ Object
- #number ⇒ Object
-
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
-
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running.
- #running? ⇒ Boolean
-
#runtime_latency ⇒ Object
Time between when this job started and finished.
Methods included from Reportable
Methods included from ErrorEvents
Methods included from AdvisoryLockable
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID
92 |
# File 'app/models/good_job/base_execution.rb', line 92 scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } |
.build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
238 239 240 |
# File 'app/models/good_job/base_execution.rb', line 238 def self.build_for_enqueue(active_job, scheduled_at: nil) new(**enqueue_args(active_job, scheduled_at: scheduled_at)) end |
.coalesce_scheduled_at_created_at ⇒ Object
218 219 220 |
# File 'app/models/good_job/base_execution.rb', line 218 def coalesce_scheduled_at_created_at arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) end |
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out
123 |
# File 'app/models/good_job/base_execution.rb', line 123 scope :creation_ordered, -> { order(created_at: :asc) } |
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing
132 133 134 135 136 137 138 |
# File 'app/models/good_job/base_execution.rb', line 132 scope :dequeueing_ordered, (lambda do |parsed_queues| relation = self relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] relation = relation.priority_ordered.creation_ordered relation end) |
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
# File 'app/models/good_job/base_execution.rb', line 347 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| current_job = CurrentThread.job retried = current_job && current_job.active_job_id == active_job.job_id if retried job = current_job job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) job.scheduled_at ||= Time.current # TODO: these values ideally shouldn't be persisted until the current_job is finished # which will require handling `retry_job` being called from outside the job context. job.performed_at = nil job.finished_at = nil else job = build_for_enqueue(active_job, scheduled_at: scheduled_at) end if create_with_advisory_lock if job.persisted? job.advisory_lock else job.create_with_advisory_lock = true end end instrument_payload[:job] = job job.save! CurrentThread.execution_retried = (job if retried) active_job.provider_job_id = job.id raise "These should be equal" if active_job.provider_job_id != active_job.job_id job end end |
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'app/models/good_job/base_execution.rb', line 243 def self.enqueue_args(active_job, scheduled_at: nil) execution_args = { id: active_job.job_id, active_job_id: active_job.job_id, job_class: active_job.class.name, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, created_at: Time.current, } execution_args[:scheduled_at] = if scheduled_at scheduled_at elsif active_job.scheduled_at Time.zone.at(active_job.scheduled_at) else execution_args[:created_at] end execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? labels = active_job.good_job_labels.dup labels.map! { |label| label.to_s.strip.presence } labels.tap(&:compact!).tap(&:uniq!) execution_args[:labels] = labels end reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_job = CurrentThread.job if reenqueued_current_job execution_args[:batch_id] = current_job.batch_id execution_args[:batch_callback_id] = current_job.batch_callback_id execution_args[:cron_key] = current_job.cron_key else execution_args[:batch_id] = GoodJob::Batch.current_batch_id execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at end execution_args end |
.finished(timestamp = nil) ⇒ ActiveRecord::Relation
Get completed jobs before the given timestamp. If no timestamp is provided, get all completed jobs. By default, GoodJob destroys jobs after they’re completed, meaning this returns no jobs. However, if you have changed GoodJob.preserve_job_records, this may find completed Jobs.
171 |
# File 'app/models/good_job/base_execution.rb', line 171 scope :finished, ->( = nil) { ? where(arel_table['finished_at'].lteq(bind_value('finished_at', , ActiveRecord::Type::DateTime))) : where.not(finished_at: nil) } |
.format_error(error) ⇒ Object
384 385 386 387 388 |
# File 'app/models/good_job/base_execution.rb', line 384 def self.format_error(error) raise ArgumentError unless error.is_a?(Exception) [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.].join end |
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name
84 |
# File 'app/models/good_job/base_execution.rb', line 84 scope :job_class, ->(name) { where(params_job_class.eq(name)) } |
.json_string(json, attr) ⇒ Object
202 203 204 |
# File 'app/models/good_job/base_execution.rb', line 202 def json_string(json, attr) Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) end |
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'app/models/good_job/base_execution.rb', line 322 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered after ||= Time.current after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind)) after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { || .compact.first } if now_limit&.positive? now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil) now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { || .compact.first } end Array(now_at) + after_at end |
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
105 |
# File 'app/models/good_job/base_execution.rb', line 105 scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } |
.params_execution_count ⇒ Object
210 211 212 213 214 215 216 |
# File 'app/models/good_job/base_execution.rb', line 210 def params_execution_count Arel::Nodes::InfixOperation.new( '::', json_string(arel_table['serialized_params'], 'executions'), Arel.sql('integer') ) end |
.params_job_class ⇒ Object
206 207 208 |
# File 'app/models/good_job/base_execution.rb', line 206 def params_job_class json_string(arel_table['serialized_params'], 'job_class') end |
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'app/models/good_job/base_execution.rb', line 296 def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) job = nil result = nil unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs| job = jobs.first if job&.executable? yield(job) if block_given? result = job.perform(lock_id: lock_id) else job = nil yield(nil) if block_given? end end job&.run_callbacks(:perform_unlocked) result end |
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
111 112 113 114 115 116 117 |
# File 'app/models/good_job/base_execution.rb', line 111 scope :priority_ordered, (lambda do if GoodJob.configuration.smaller_number_is_higher_priority order('priority ASC NULLS LAST') else order('priority DESC NULLS LAST') end end) |
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param
145 146 147 148 149 150 151 152 153 |
# File 'app/models/good_job/base_execution.rb', line 145 scope :queue_ordered, (lambda do |queues| clauses = queues.map.with_index do |queue_name, index| "WHEN queue_name = '#{queue_name}' THEN #{index}" end order( Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.length} END)") ) end) |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'app/models/good_job/base_execution.rb', line 51 def self.queue_parser(string) string = string.strip.presence || '*' case string.first when '-' exclude_queues = true string = string[1..] when '+' ordered_queues = true string = string[1..] end queues = string.split(',').map(&:strip) if queues.include?('*') { all: true } elsif exclude_queues { exclude: queues } elsif ordered_queues { include: queues, ordered_queues: true, } else { include: queues } end end |
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
189 190 191 192 193 194 195 196 197 198 199 |
# File 'app/models/good_job/base_execution.rb', line 189 scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all elsif parsed[:exclude] where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) elsif parsed[:include] where(queue_name: parsed[:include]) end end) |
.running ⇒ ActiveRecord::Relation
Get Jobs that started but not finished yet.
177 |
# File 'app/models/good_job/base_execution.rb', line 177 scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } |
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
159 |
# File 'app/models/good_job/base_execution.rb', line 159 scope :schedule_ordered, -> { order(coalesce_scheduled_at_created_at.asc) } |
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
98 |
# File 'app/models/good_job/base_execution.rb', line 98 scope :unfinished, -> { where(finished_at: nil) } |
Instance Method Details
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
228 229 230 231 232 233 234 235 236 |
# File 'app/models/good_job/base_execution.rb', line 228 def active_job(ignore_deserialization_errors: false) ActiveJob::Base.deserialize(active_job_data).tap do |aj| aj.send(:deserialize_arguments_if_needed) rescue ActiveJob::DeserializationError raise unless ignore_deserialization_errors end rescue NameError raise unless ignore_deserialization_errors end |
#destroy_job ⇒ Object
Destroys this execution and all executions within the same job
577 578 579 580 581 582 |
# File 'app/models/good_job/base_execution.rb', line 577 def destroy_job @_destroy_job = true destroy! ensure @_destroy_job = false end |
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard
544 545 546 547 548 |
# File 'app/models/good_job/base_execution.rb', line 544 def display_serialized_params serialized_params.merge({ _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), }) end |
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
536 537 538 539 540 |
# File 'app/models/good_job/base_execution.rb', line 536 def executable? reload.finished_at.blank? rescue ActiveRecord::RecordNotFound false end |
#job_state ⇒ Object
584 585 586 587 588 |
# File 'app/models/good_job/base_execution.rb', line 584 def job_state state = { queue_name: queue_name } state[:scheduled_at] = scheduled_at if scheduled_at state end |
#number ⇒ Object
558 559 560 |
# File 'app/models/good_job/base_execution.rb', line 558 def number serialized_params.fetch('executions', 0) + 1 end |
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 |
# File 'app/models/good_job/base_execution.rb', line 395 def perform(lock_id:) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at job_performed_at = Time.current monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) discrete_execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.job = self existing_performed_at = performed_at if existing_performed_at current_thread.execution_interrupted = existing_performed_at interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) self.error = interrupt_error_string self.error_event = ERROR_EVENT_INTERRUPTED monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds discrete_execution_attrs = { error: interrupt_error_string, finished_at: job_performed_at, } discrete_execution_attrs[:error_event] = GoodJob::ErrorEvents::ERROR_EVENT_ENUMS[GoodJob::ErrorEvents::ERROR_EVENT_INTERRUPTED] discrete_execution_attrs[:duration] = monotonic_duration discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all(discrete_execution_attrs) # rubocop:disable Rails/SkipsModelValidations end transaction do discrete_execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), created_at: job_performed_at, process_id: lock_id, } job_attrs = { performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1), locked_by_id: lock_id, locked_at: Time.current, } discrete_execution = discrete_executions.create!(discrete_execution_attrs) update!(job_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: discrete_execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) if value.is_a?(Exception) handled_error = value value = nil end handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard error_event = if handled_error == current_thread.error_on_discard ERROR_EVENT_DISCARDED elsif handled_error == current_thread.error_on_retry ERROR_EVENT_RETRIED elsif handled_error == current_thread.error_on_retry_stopped ERROR_EVENT_RETRY_STOPPED elsif handled_error ERROR_EVENT_HANDLED end instrument_payload.merge!( value: value, handled_error: handled_error, retried: current_thread.execution_retried.present?, error_event: error_event ) ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried: current_thread.execution_retried) rescue StandardError => e error_event = if e.is_a?(GoodJob::InterruptError) ERROR_EVENT_INTERRUPTED elsif e == current_thread.error_on_retry_stopped ERROR_EVENT_RETRY_STOPPED else ERROR_EVENT_UNHANDLED end instrument_payload[:unhandled_error] = e ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) end end job_attributes = if self.class.columns_hash.key?("locked_by_id") { locked_by_id: nil, locked_at: nil } else {} end job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error) job_attributes[:error] = error_string job_attributes[:error_event] = result.error_event discrete_execution.error = error_string discrete_execution.error_event = result.error_event discrete_execution.error_backtrace = job_error.backtrace else job_attributes[:error] = nil job_attributes[:error_event] = nil end job_finished_at = Time.current monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds job_attributes[:finished_at] = job_finished_at discrete_execution.finished_at = job_finished_at discrete_execution.duration = monotonic_duration retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error if reenqueued job_attributes[:performed_at] = nil job_attributes[:finished_at] = nil end assign_attributes(job_attributes) preserve_unhandled = (result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error)) if finished_at.blank? || GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? transaction do discrete_execution.save! save! end else destroy! end result end end |
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running
563 564 565 566 567 568 569 |
# File 'app/models/good_job/base_execution.rb', line 563 def queue_latency now = Time.zone.now expected_start = scheduled_at || created_at actual_start = performed_at || finished_at || now actual_start - expected_start unless expected_start >= now end |
#running? ⇒ Boolean
550 551 552 553 554 555 556 |
# File 'app/models/good_job/base_execution.rb', line 550 def running? if has_attribute?(:locktype) self['locktype'].present? else advisory_locked? end end |
#runtime_latency ⇒ Object
Time between when this job started and finished
572 573 574 |
# File 'app/models/good_job/base_execution.rb', line 572 def runtime_latency (finished_at || Time.zone.now) - performed_at if performed_at end |