Class: GoodJob::CapsuleTracker
- Inherits:
-
Object
- Object
- GoodJob::CapsuleTracker
- Defined in:
- lib/good_job/capsule_tracker.rb
Overview
CapsuleTracker save a record in the database and periodically refreshes it. The intention is to create a heartbeat that can be used to determine whether a capsule/process is still active and use that to lock (or unlock) jobs.
Class Attribute Summary collapse
-
.instances ⇒ Array<GoodJob::CapsuleTracker>?
readonly
List of all instantiated CapsuleTrackers in the current process.
Instance Attribute Summary collapse
-
#advisory_locks ⇒ Integer
readonly
Number of tracked job executions with advisory locks.
-
#locks ⇒ Object
readonly
Number of tracked job executions.
-
#record ⇒ GoodJob::Process?
readonly
The database record used for tracking.
Instance Method Summary collapse
-
#advisory_locked? ⇒ Boolean
Tests whether an active advisory lock has been taken on the record.
-
#id_for_lock ⇒ String?
The UUID to use for locking.
-
#initialize(executor: Concurrent.global_io_executor) ⇒ CapsuleTracker
constructor
A new instance of CapsuleTracker.
-
#process_id ⇒ String
The expected UUID of the process for use in inspection.
-
#register(with_advisory_lock: false) {|void| ... } ⇒ void
Registers the current process around a job execution site.
-
#renew(silent: false) ⇒ void
Refreshes the process record in the database.
-
#unregister(with_advisory_lock: false) ⇒ void
Unregisters the current process from the database.
Constructor Details
#initialize(executor: Concurrent.global_io_executor) ⇒ CapsuleTracker
Returns a new instance of CapsuleTracker.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/good_job/capsule_tracker.rb', line 26 def initialize(executor: Concurrent.global_io_executor) @executor = executor @mutex = Mutex.new @locks = 0 @advisory_locked_connection = nil @record_id = SecureRandom.uuid @record = nil @refresh_task = nil # AS::ForkTracker is only present on Rails v6.1+. # Fall back to PID checking if ForkTracker is not available if defined?(ActiveSupport::ForkTracker) ActiveSupport::ForkTracker.after_fork { reset } @forktracker = true else @ruby_pid = ::Process.pid @forktracker = false end self.class.instances << self end |
Class Attribute Details
.instances ⇒ Array<GoodJob::CapsuleTracker>? (readonly)
List of all instantiated CapsuleTrackers in the current process.
23 |
# File 'lib/good_job/capsule_tracker.rb', line 23 cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false |
Instance Attribute Details
#advisory_locks ⇒ Integer (readonly)
Number of tracked job executions with advisory locks.
17 18 19 |
# File 'lib/good_job/capsule_tracker.rb', line 17 def advisory_locks @advisory_locks end |
#locks ⇒ Object (readonly)
Number of tracked job executions.
13 14 15 |
# File 'lib/good_job/capsule_tracker.rb', line 13 def locks @locks end |
#record ⇒ GoodJob::Process? (readonly)
The database record used for tracking.
10 11 12 |
# File 'lib/good_job/capsule_tracker.rb', line 10 def record @record end |
Instance Method Details
#advisory_locked? ⇒ Boolean
Tests whether an active advisory lock has been taken on the record.
155 156 157 |
# File 'lib/good_job/capsule_tracker.rb', line 155 def advisory_locked? @advisory_locked_connection&.weakref_alive? && @advisory_locked_connection&.active? end |
#id_for_lock ⇒ String?
The UUID to use for locking. May be nil if the process is not registered or is unusable/expired. If UUID has not yet been persisted to the database, this method will make a query to insert or update it.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/good_job/capsule_tracker.rb', line 51 def id_for_lock value = nil synchronize do next if @locks.zero? reset_on_fork if @record @record.refresh_if_stale else @record = GoodJob::Process.find_or_create_record(id: @record_id) create_refresh_task end value = @record&.id end value end |
#process_id ⇒ String
The expected UUID of the process for use in inspection. Use #id_for_lock if using this as a lock key.
71 72 73 |
# File 'lib/good_job/capsule_tracker.rb', line 71 def process_id @record_id end |
#register(with_advisory_lock: false) {|void| ... } ⇒ void
This method returns an undefined value.
Registers the current process around a job execution site. register
is expected to be called multiple times in a process, but should be advisory locked only once (in a single thread).
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/good_job/capsule_tracker.rb', line 80 def register(with_advisory_lock: false) synchronize do if with_advisory_lock if @record if !advisory_locked? || !advisory_locked_connection? @record.class.transaction do @record.advisory_lock! @record.update(lock_type: :advisory) end @advisory_locked_connection = WeakRef.new(@record.class.connection) end else @record = GoodJob::Process.find_or_create_record(id: @record_id, with_advisory_lock: true) @advisory_locked_connection = WeakRef.new(@record.class.connection) create_refresh_task end end @locks += 1 end return unless block_given? begin yield ensure unregister(with_advisory_lock: with_advisory_lock) end end |
#renew(silent: false) ⇒ void
This method returns an undefined value.
Refreshes the process record in the database.
145 146 147 148 149 150 151 |
# File 'lib/good_job/capsule_tracker.rb', line 145 def renew(silent: false) synchronize do GoodJob::Process.with_logger_silenced(silent: silent) do @record&.refresh_if_stale(cleanup: true) end end end |
#unregister(with_advisory_lock: false) ⇒ void
This method returns an undefined value.
Unregisters the current process from the database.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/good_job/capsule_tracker.rb', line 112 def unregister(with_advisory_lock: false) synchronize do if @locks.zero? return elsif @locks == 1 if @record if with_advisory_lock && advisory_locked? && advisory_locked_connection? @record.class.transaction do @record.advisory_unlock @record.destroy end @advisory_locked_connection = nil else @record.destroy end @record = nil end cancel_refresh_task elsif with_advisory_lock && advisory_locked? && advisory_locked_connection? @record.class.transaction do @record.advisory_unlock @record.update(lock_type: nil) end @advisory_locked_connection = nil end @locks -= 1 unless @locks.zero? end end |