Class: Cloudtasker::UniqueJob::Job
- Inherits:
-
Object
- Object
- Cloudtasker::UniqueJob::Job
- Defined in:
- lib/cloudtasker/unique_job/job.rb
Overview
Wrapper class for Cloudtasker::Worker delegating to lock and conflict strategies
Constant Summary collapse
- DEFAULT_LOCK =
The default lock strategy to use. Defaults to “no lock”.
UniqueJob::Lock::NoOp
- LOCK_FINALIZATION_WARNING =
Warning message when final lock cannot be acquired after scheduling
'A provisional lock was acquired before enqueuing the job but the ' \ 'lock could not be finalized after enqueuing the job. This means that ' \ 'it took longer than lock_provisional_ttl to enqueue the job. See ' \ 'Worker#lock_provisional_ttl option.'
Instance Attribute Summary collapse
-
#call_opts ⇒ Object
readonly
Returns the value of attribute call_opts.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#base_unique_scope ⇒ Hash
The base unique scope generated from lock options.
-
#digest_hash ⇒ Hash
Return a unique description of the job in hash format.
-
#id ⇒ String
Return the worker job ID.
-
#initialize(worker, opts = {}) ⇒ Job
constructor
Build a new instance of the class.
-
#lock! ⇒ Object
Acquire a new unique job lock or check that the lock is currently allocated to this job.
-
#lock_for_scheduling! ⇒ Any
Acquire a provisional lock, yield, then set a final lock.
-
#lock_instance ⇒ Any
Return the instantiated lock.
-
#lock_provisional_ttl ⇒ Integer
A provisional lock uses a very short duration and aims at covering the time it takes for the job to be enqueued through the client middleware chain.
-
#lock_ttl ⇒ Integer
Return the Time To Live (TTL) that should be set in Redis for the lock key.
-
#options ⇒ Hash
Return the worker configuration options.
-
#redis ⇒ Cloudtasker::RedisClient
Return the Cloudtasker redis client.
-
#unique_args ⇒ Array<any>
Return the list of arguments used for job uniqueness.
-
#unique_gid ⇒ String
Return the Global ID of the unique job.
-
#unique_id ⇒ String
Return the ID of the unique job.
-
#unique_scope ⇒ Hash
Return a scope to be included in the digest hash.
-
#unlock! ⇒ Object
Delete the job lock.
Constructor Details
#initialize(worker, opts = {}) ⇒ Job
Build a new instance of the class.
25 26 27 28 |
# File 'lib/cloudtasker/unique_job/job.rb', line 25 def initialize(worker, opts = {}) @worker = worker @call_opts = opts.to_h end |
Instance Attribute Details
#call_opts ⇒ Object (readonly)
Returns the value of attribute call_opts.
8 9 10 |
# File 'lib/cloudtasker/unique_job/job.rb', line 8 def call_opts @call_opts end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/cloudtasker/unique_job/job.rb', line 8 def worker @worker end |
Instance Method Details
#base_unique_scope ⇒ Hash
The base unique scope generated from lock options
125 126 127 128 129 130 131 132 |
# File 'lib/cloudtasker/unique_job/job.rb', line 125 def base_unique_scope if [:lock_per_batch] && defined?(Cloudtasker::Batch::Job) key = Cloudtasker::Batch::Job.key(:parent_id).to_sym worker..to_h.slice(key) else {} end end |
#digest_hash ⇒ Hash
Return a unique description of the job in hash format.
148 149 150 151 152 153 154 |
# File 'lib/cloudtasker/unique_job/job.rb', line 148 def digest_hash @digest_hash ||= { class: worker.class.to_s, unique_args: unique_args, unique_scope: unique_scope.presence }.compact end |
#id ⇒ String
Return the worker job ID.
161 162 163 |
# File 'lib/cloudtasker/unique_job/job.rb', line 161 def id worker.job_id end |
#lock! ⇒ Object
Acquire a new unique job lock or check that the lock is currently allocated to this job.
Raise a ‘Cloudtasker::UniqueJob::LockError` if the lock if taken by another job.
200 201 202 203 204 205 |
# File 'lib/cloudtasker/unique_job/job.rb', line 200 def lock! lock_acquired = redis.set(unique_gid, id, nx: true, ex: lock_ttl) lock_already_acquired = !lock_acquired && redis.get(unique_gid) == id raise(LockError) unless lock_acquired || lock_already_acquired end |
#lock_for_scheduling! ⇒ Any
Acquire a provisional lock, yield, then set a final lock.
This method is designed for scheduling operations where you need to:
-
Acquire a provisional lock to prevent concurrent scheduling
-
Perform the scheduling operation (yield)
-
Set a final lock with proper TTL after scheduling succeeds
Raises a ‘Cloudtasker::UniqueJob::LockError` if the provisional lock cannot be acquired.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/cloudtasker/unique_job/job.rb', line 220 def lock_for_scheduling! # Step 1: Acquire provisional lock # Check if the lock is already acquired from a previous run acquired = redis.get(unique_gid) == id # Set the lock exclusively, if not acquired already. # Refresh the duration otherwise. lock_acquired = redis.set(unique_gid, id, nx: !acquired, ex: lock_provisional_ttl) raise(LockError) unless lock_acquired # Step 2: Yield to perform scheduling operation result = yield # Step 3: Set final lock # Check if the lock is still held by this job acquired = redis.get(unique_gid) == id # Set the lock with final duration # If already acquired, refresh with final TTL # If not acquired (expired or taken), try to acquire exclusively final_lock_acquired = redis.set(unique_gid, id, nx: !acquired, ex: lock_ttl) # Log a warning if final lock could not be acquired # The job has already been enqueued at this point, so raising an error is useless worker.logger.warn(LOCK_FINALIZATION_WARNING) unless final_lock_acquired # Return the result of the block result end |
#lock_instance ⇒ Any
Return the instantiated lock.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/cloudtasker/unique_job/job.rb', line 99 def lock_instance @lock_instance ||= begin # Infer lock class and get instance lock_name = [:lock] lock_klass = Lock.const_get(lock_name.to_s.split('_').collect(&:capitalize).join) lock_klass.new(self) rescue NameError DEFAULT_LOCK.new(self) end end |
#lock_provisional_ttl ⇒ Integer
A provisional lock uses a very short duration and aims at covering the time it takes for the job to be enqueued through the client middleware chain.
If the application crashes during this time (e.g. OOM), at least the job won’t be locked for an extended period of time (which may span across a parent job retry, for instance)
This TTL can be configured via the ‘lock_provisional_ttl` option on the job itself.
90 91 92 |
# File 'lib/cloudtasker/unique_job/job.rb', line 90 def lock_provisional_ttl ([:lock_provisional_ttl] || Cloudtasker::UniqueJob.lock_provisional_ttl).to_i end |
#lock_ttl ⇒ Integer
Return the Time To Live (TTL) that should be set in Redis for the lock key. Having a TTL on lock keys ensures that jobs do not end up stuck due to a dead lock situation.
The TTL is calculated using schedule time + expected max job duration.
The expected max job duration is set to 10 minutes by default. This value was chosen because it’s twice the default request timeout value in Cloud Run. This leaves enough room for queue lag (5 minutes) + job processing (5 minutes).
Queue lag is certainly the most unpredictable factor here. Job processing time is less of a factor. Jobs running for more than 5 minutes should be split into sub-jobs to limit invocation time over HTTP. Cloudtasker batch jobs can help achieve that if you need to make one big job split into sub-jobs “atomic”.
The default lock key expiration of “time_at + 10 minutes” may look aggressive but it is still a better choice than potentially having real-time jobs stuck for X hours.
The expected max job duration can be configured via the ‘lock_ttl` option on the job itself.
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/cloudtasker/unique_job/job.rb', line 65 def lock_ttl now = Time.now.to_i # Get scheduled at and lock duration scheduled_at = [call_opts[:time_at].to_i, now].compact.max lock_duration = ([:lock_ttl] || Cloudtasker::UniqueJob.lock_ttl).to_i # Return the TTL, which is the configured lock_duration at minima [lock_duration, scheduled_at + lock_duration - now].max end |
#options ⇒ Hash
Return the worker configuration options.
35 36 37 |
# File 'lib/cloudtasker/unique_job/job.rb', line 35 def worker.class. end |
#redis ⇒ Cloudtasker::RedisClient
Return the Cloudtasker redis client.
189 190 191 |
# File 'lib/cloudtasker/unique_job/job.rb', line 189 def redis @redis ||= Cloudtasker::RedisClient.new end |
#unique_args ⇒ Array<any>
Return the list of arguments used for job uniqueness.
116 117 118 |
# File 'lib/cloudtasker/unique_job/job.rb', line 116 def unique_args worker.try(:unique_args, worker.job_args) || worker.job_args end |
#unique_gid ⇒ String
Return the Global ID of the unique job. The gid includes the UniqueJob namespace.
180 181 182 |
# File 'lib/cloudtasker/unique_job/job.rb', line 180 def unique_gid [self.class.to_s.underscore, unique_id].join('/') end |
#unique_id ⇒ String
Return the ID of the unique job.
170 171 172 |
# File 'lib/cloudtasker/unique_job/job.rb', line 170 def unique_id Digest::SHA256.hexdigest(digest_hash.to_json) end |
#unique_scope ⇒ Hash
Return a scope to be included in the digest hash
139 140 141 |
# File 'lib/cloudtasker/unique_job/job.rb', line 139 def unique_scope base_unique_scope.to_h.merge(worker.try(:unique_scope).to_h) end |
#unlock! ⇒ Object
Delete the job lock.
253 254 255 256 |
# File 'lib/cloudtasker/unique_job/job.rb', line 253 def unlock! locked_id = redis.get(unique_gid) redis.del(unique_gid) if locked_id == id end |