Class: GoodJob::Process

Inherits:
BaseRecord
  • Object
show all
Includes:
AdvisoryLockable, OverridableConnection
Defined in:
app/models/good_job/process.rb

Overview

Active Record model that represents a GoodJob capsule/process (either async or CLI).

Constant Summary collapse

STALE_INTERVAL =

Interval until the process record being updated

30.seconds
EXPIRED_INTERVAL =

Interval until the process record is treated as expired

5.minutes
LOCK_TYPES =
[
  LOCK_TYPE_ADVISORY = 'advisory',
].freeze
LOCK_TYPE_ENUMS =
{
  LOCK_TYPE_ADVISORY => 1,
}.freeze

Constants included from AdvisoryLockable

AdvisoryLockable::RecordAlreadyAdvisoryLockedError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from AdvisoryLockable

#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock

Methods inherited from BaseRecord

bind_value, migrated?, migration_pending_warning!, with_logger_silenced

Class Method Details

.activeActiveRecord::Relation

Processes that are inactive and unlocked (e.g. SIGKILLed)

Returns:

  • (ActiveRecord::Relation)


35
36
37
38
39
# File 'app/models/good_job/process.rb', line 35

scope :active, (lambda do
  query = joins_advisory_locks
  query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked
    .or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
end)

.cleanupObject

Deletes all inactive process records.



52
53
54
55
56
57
# File 'app/models/good_job/process.rb', line 52

def self.cleanup
  inactive.find_each do |process|
    GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.columns_hash.key?("locked_by_id") # rubocop:disable Rails/SkipsModelValidations
    process.delete
  end
end

.create_record(id:, with_advisory_lock: false) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
# File 'app/models/good_job/process.rb', line 59

def self.create_record(id:, with_advisory_lock: false)
  attributes = {
    id: id,
    state: process_state,
  }
  if with_advisory_lock
    attributes[:create_with_advisory_lock] = true
    attributes[:lock_type] = LOCK_TYPE_ADVISORY
  end
  create!(attributes)
end

.process_stateObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'app/models/good_job/process.rb', line 71

def self.process_state
  {
    hostname: Socket.gethostname,
    pid: ::Process.pid,
    proctitle: $PROGRAM_NAME,
    preserve_job_records: GoodJob.preserve_job_records,
    retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
    schedulers: GoodJob::Scheduler.instances.map(&:stats),
    cron_enabled: GoodJob.configuration.enable_cron?,
    total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
    total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
    database_connection_pool: {
      size: connection_pool.size,
      active: connection_pool.connections.count(&:in_use?),
    },
  }
end

Instance Method Details

#basenameObject



120
121
122
# File 'app/models/good_job/process.rb', line 120

def basename
  File.basename(state.fetch("proctitle", ""))
end

#expired?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'app/models/good_job/process.rb', line 116

def expired?
  updated_at < EXPIRED_INTERVAL.ago
end

#lock_typeObject



128
129
130
131
132
133
# File 'app/models/good_job/process.rb', line 128

def lock_type
  return unless self.class.columns_hash['lock_type']

  enum = super
  LOCK_TYPE_ENUMS.key(enum) if enum
end

#lock_type=(value) ⇒ Object

Raises:

  • (ArgumentError)


135
136
137
138
139
140
141
142
# File 'app/models/good_job/process.rb', line 135

def lock_type=(value)
  return unless self.class.columns_hash['lock_type']

  enum = LOCK_TYPE_ENUMS[value]
  raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum

  super(enum)
end

#refreshObject



89
90
91
92
93
94
95
96
97
98
# File 'app/models/good_job/process.rb', line 89

def refresh
  self.state = self.class.process_state
  reload # verify the record still exists in the database
  update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
  @new_record = true
  self.created_at = self.updated_at = nil
  state_will_change!
  save
end

#refresh_if_stale(cleanup: false) ⇒ Object



100
101
102
103
104
105
106
# File 'app/models/good_job/process.rb', line 100

def refresh_if_stale(cleanup: false)
  return unless stale?

  result = refresh
  self.class.cleanup if cleanup
  result
end

#schedulersObject



124
125
126
# File 'app/models/good_job/process.rb', line 124

def schedulers
  state.fetch("schedulers", [])
end

#stale?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'app/models/good_job/process.rb', line 112

def stale?
  updated_at < STALE_INTERVAL.ago
end

#stateObject



108
109
110
# File 'app/models/good_job/process.rb', line 108

def state
  super || {}
end