Class: GoodJob::Process
Overview
Active Record model that represents a GoodJob capsule/process (either async or CLI).
Constant Summary
collapse
- STALE_INTERVAL =
Interval until the process record being updated
30.seconds
- EXPIRED_INTERVAL =
Interval until the process record is treated as expired
5.minutes
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary
collapse
Instance Method Summary
collapse
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active ⇒ ActiveRecord::Relation
Processes that are inactive and unlocked (e.g. SIGKILLed)
35
36
37
38
39
|
# File 'app/models/good_job/process.rb', line 35
scope :active, (lambda do
query = joins_advisory_locks
query.where(lock_type: :advisory).advisory_locked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
end)
|
.cleanup ⇒ Object
Deletes all inactive process records.
52
53
54
55
56
57
|
# File 'app/models/good_job/process.rb', line 52
def self.cleanup
inactive.find_each do |process|
GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) process.delete
end
end
|
.find_or_create_record(id:, with_advisory_lock: false) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'app/models/good_job/process.rb', line 59
def self.find_or_create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
}
if with_advisory_lock
attributes[:create_with_advisory_lock] = true
attributes[:lock_type] = :advisory
end
create!(attributes)
rescue ActiveRecord::RecordNotUnique
find_by(id: id).tap do |existing_record|
next unless existing_record
if with_advisory_lock
existing_record.advisory_lock!
existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current)
else
existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current)
end
end
end
|
.process_state ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'app/models/good_job/process.rb', line 82
def self.process_state
{
hostname: Socket.gethostname,
pid: ::Process.pid,
proctitle: $PROGRAM_NAME,
preserve_job_records: GoodJob.preserve_job_records,
retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
schedulers: GoodJob::Scheduler.instances.map(&:stats),
cron_enabled: GoodJob.configuration.enable_cron?,
total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
database_connection_pool: {
size: connection_pool.size,
active: connection_pool.connections.count(&:in_use?),
},
}
end
|
Instance Method Details
#basename ⇒ Object
131
132
133
|
# File 'app/models/good_job/process.rb', line 131
def basename
File.basename(state.fetch("proctitle", ""))
end
|
#expired? ⇒ Boolean
127
128
129
|
# File 'app/models/good_job/process.rb', line 127
def expired?
updated_at < EXPIRED_INTERVAL.ago
end
|
#refresh ⇒ Object
100
101
102
103
104
105
106
107
108
109
|
# File 'app/models/good_job/process.rb', line 100
def refresh
self.state = self.class.process_state
reload update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end
|
#refresh_if_stale(cleanup: false) ⇒ Object
111
112
113
114
115
116
117
|
# File 'app/models/good_job/process.rb', line 111
def refresh_if_stale(cleanup: false)
return unless stale?
result = refresh
self.class.cleanup if cleanup
result
end
|
#schedulers ⇒ Object
135
136
137
|
# File 'app/models/good_job/process.rb', line 135
def schedulers
state.fetch("schedulers", [])
end
|
#stale? ⇒ Boolean
123
124
125
|
# File 'app/models/good_job/process.rb', line 123
def stale?
updated_at < STALE_INTERVAL.ago
end
|
#state ⇒ Object
119
120
121
|
# File 'app/models/good_job/process.rb', line 119
def state
super || {}
end
|