Class: GoodJob::Process

Inherits:
BaseRecord
  • Object
show all
Includes:
AdvisoryLockable, OverridableConnection
Defined in:
app/models/good_job/process.rb

Overview

Active Record model that represents a GoodJob capsule/process (either async or CLI).

Constant Summary collapse

STALE_INTERVAL =

Interval until the process record being updated

30.seconds
EXPIRED_INTERVAL =

Interval until the process record is treated as expired

5.minutes

Constants included from AdvisoryLockable

AdvisoryLockable::RecordAlreadyAdvisoryLockedError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from AdvisoryLockable

#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock

Methods inherited from BaseRecord

bind_value, migrated?, migration_pending_warning!, with_logger_silenced

Class Method Details

.activeActiveRecord::Relation

Processes that are inactive and unlocked (e.g. SIGKILLed)

Returns:

  • (ActiveRecord::Relation)


35
36
37
38
39
# File 'app/models/good_job/process.rb', line 35

scope :active, (lambda do
  query = joins_advisory_locks
  query.where(lock_type: :advisory).advisory_locked
    .or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
end)

.cleanupObject

Deletes all inactive process records.



52
53
54
55
56
57
# File 'app/models/good_job/process.rb', line 52

def self.cleanup
  inactive.find_each do |process|
    GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) # rubocop:disable Rails/SkipsModelValidations
    process.delete
  end
end

.find_or_create_record(id:, with_advisory_lock: false) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'app/models/good_job/process.rb', line 59

def self.find_or_create_record(id:, with_advisory_lock: false)
  attributes = {
    id: id,
    state: process_state,
  }
  if with_advisory_lock
    attributes[:create_with_advisory_lock] = true
    attributes[:lock_type] = :advisory
  end
  create!(attributes)
rescue ActiveRecord::RecordNotUnique
  find_by(id: id).tap do |existing_record|
    next unless existing_record

    if with_advisory_lock
      existing_record.advisory_lock!
      existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current)
    else
      existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current)
    end
  end
end

.process_stateObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'app/models/good_job/process.rb', line 82

def self.process_state
  {
    hostname: Socket.gethostname,
    pid: ::Process.pid,
    proctitle: $PROGRAM_NAME,
    preserve_job_records: GoodJob.preserve_job_records,
    retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
    schedulers: GoodJob::Scheduler.instances.map(&:stats),
    cron_enabled: GoodJob.configuration.enable_cron?,
    total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
    total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
    database_connection_pool: {
      size: connection_pool.size,
      active: connection_pool.connections.count(&:in_use?),
    },
  }
end

Instance Method Details

#basenameObject



131
132
133
# File 'app/models/good_job/process.rb', line 131

def basename
  File.basename(state.fetch("proctitle", ""))
end

#expired?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'app/models/good_job/process.rb', line 127

def expired?
  updated_at < EXPIRED_INTERVAL.ago
end

#refreshObject



100
101
102
103
104
105
106
107
108
109
# File 'app/models/good_job/process.rb', line 100

def refresh
  self.state = self.class.process_state
  reload # verify the record still exists in the database
  update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
  @new_record = true
  self.created_at = self.updated_at = nil
  state_will_change!
  save
end

#refresh_if_stale(cleanup: false) ⇒ Object



111
112
113
114
115
116
117
# File 'app/models/good_job/process.rb', line 111

def refresh_if_stale(cleanup: false)
  return unless stale?

  result = refresh
  self.class.cleanup if cleanup
  result
end

#schedulersObject



135
136
137
# File 'app/models/good_job/process.rb', line 135

def schedulers
  state.fetch("schedulers", [])
end

#stale?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'app/models/good_job/process.rb', line 123

def stale?
  updated_at < STALE_INTERVAL.ago
end

#stateObject



119
120
121
# File 'app/models/good_job/process.rb', line 119

def state
  super || {}
end