Class: GoodJob::Process
Overview
Active Record model that represents a GoodJob capsule/process (either async or CLI).
Constant Summary
collapse
- STALE_INTERVAL =
Interval until the process record being updated
30.seconds
- EXPIRED_INTERVAL =
Interval until the process record is treated as expired
5.minutes
- LOCK_TYPES =
[
LOCK_TYPE_ADVISORY = 'advisory',
].freeze
- LOCK_TYPE_ENUMS =
{
LOCK_TYPE_ADVISORY => 1,
}.freeze
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary
collapse
Instance Method Summary
collapse
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active ⇒ ActiveRecord::Relation
Processes that are inactive and unlocked (e.g. SIGKILLed)
35
36
37
38
39
|
# File 'app/models/good_job/process.rb', line 35
scope :active, (lambda do
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
end)
|
.cleanup ⇒ Object
Deletes all inactive process records.
52
53
54
55
56
57
|
# File 'app/models/good_job/process.rb', line 52
def self.cleanup
inactive.find_each do |process|
GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.columns_hash.key?("locked_by_id") process.delete
end
end
|
.create_record(id:, with_advisory_lock: false) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
|
# File 'app/models/good_job/process.rb', line 59
def self.create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
}
if with_advisory_lock
attributes[:create_with_advisory_lock] = true
attributes[:lock_type] = LOCK_TYPE_ADVISORY
end
create!(attributes)
end
|
.process_state ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'app/models/good_job/process.rb', line 71
def self.process_state
{
hostname: Socket.gethostname,
pid: ::Process.pid,
proctitle: $PROGRAM_NAME,
preserve_job_records: GoodJob.preserve_job_records,
retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
schedulers: GoodJob::Scheduler.instances.map(&:stats),
cron_enabled: GoodJob.configuration.enable_cron?,
total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
database_connection_pool: {
size: connection_pool.size,
active: connection_pool.connections.count(&:in_use?),
},
}
end
|
Instance Method Details
#basename ⇒ Object
120
121
122
|
# File 'app/models/good_job/process.rb', line 120
def basename
File.basename(state.fetch("proctitle", ""))
end
|
#expired? ⇒ Boolean
116
117
118
|
# File 'app/models/good_job/process.rb', line 116
def expired?
updated_at < EXPIRED_INTERVAL.ago
end
|
#lock_type ⇒ Object
128
129
130
131
132
133
|
# File 'app/models/good_job/process.rb', line 128
def lock_type
return unless self.class.columns_hash['lock_type']
enum = super
LOCK_TYPE_ENUMS.key(enum) if enum
end
|
#lock_type=(value) ⇒ Object
135
136
137
138
139
140
141
142
|
# File 'app/models/good_job/process.rb', line 135
def lock_type=(value)
return unless self.class.columns_hash['lock_type']
enum = LOCK_TYPE_ENUMS[value]
raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum
super(enum)
end
|
#refresh ⇒ Object
89
90
91
92
93
94
95
96
97
98
|
# File 'app/models/good_job/process.rb', line 89
def refresh
self.state = self.class.process_state
reload update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end
|
#refresh_if_stale(cleanup: false) ⇒ Object
100
101
102
103
104
105
106
|
# File 'app/models/good_job/process.rb', line 100
def refresh_if_stale(cleanup: false)
return unless stale?
result = refresh
self.class.cleanup if cleanup
result
end
|
#schedulers ⇒ Object
124
125
126
|
# File 'app/models/good_job/process.rb', line 124
def schedulers
state.fetch("schedulers", [])
end
|
#stale? ⇒ Boolean
112
113
114
|
# File 'app/models/good_job/process.rb', line 112
def stale?
updated_at < STALE_INTERVAL.ago
end
|
#state ⇒ Object
108
109
110
|
# File 'app/models/good_job/process.rb', line 108
def state
super || {}
end
|