Class: Cloudtasker::UniqueJob::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/unique_job/job.rb

Overview

Wrapper class for Cloudtasker::Worker delegating to lock and conflict strategies

Constant Summary collapse

DEFAULT_LOCK =

The default lock strategy to use. Defaults to “no lock”.

UniqueJob::Lock::NoOp
LOCK_FINALIZATION_WARNING =

Warning message when final lock cannot be acquired after scheduling

'A provisional lock was acquired before enqueuing the job but the ' \
'lock could not be finalized after enqueuing the job. This means that ' \
'it took longer than lock_provisional_ttl to enqueue the job. See ' \
'Worker#lock_provisional_ttl option.'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, opts = {}) ⇒ Job

Build a new instance of the class.

Parameters:



25
26
27
28
# File 'lib/cloudtasker/unique_job/job.rb', line 25

def initialize(worker, opts = {})
  @worker = worker
  @call_opts = opts.to_h
end

Instance Attribute Details

#call_optsObject (readonly)

Returns the value of attribute call_opts.



8
9
10
# File 'lib/cloudtasker/unique_job/job.rb', line 8

def call_opts
  @call_opts
end

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/cloudtasker/unique_job/job.rb', line 8

def worker
  @worker
end

Instance Method Details

#base_unique_scopeHash

The base unique scope generated from lock options

Returns:

  • (Hash)

    A scope hash



125
126
127
128
129
130
131
132
# File 'lib/cloudtasker/unique_job/job.rb', line 125

def base_unique_scope
  if options[:lock_per_batch] && defined?(Cloudtasker::Batch::Job)
    key = Cloudtasker::Batch::Job.key(:parent_id).to_sym
    worker.job_meta.to_h.slice(key)
  else
    {}
  end
end

#digest_hashHash

Return a unique description of the job in hash format.

Returns:

  • (Hash)

    Representation of the unique job in hash format.



148
149
150
151
152
153
154
# File 'lib/cloudtasker/unique_job/job.rb', line 148

def digest_hash
  @digest_hash ||= {
    class: worker.class.to_s,
    unique_args: unique_args,
    unique_scope: unique_scope.presence
  }.compact
end

#idString

Return the worker job ID.

Returns:

  • (String)

    The worker job ID.



161
162
163
# File 'lib/cloudtasker/unique_job/job.rb', line 161

def id
  worker.job_id
end

#lock!Object

Acquire a new unique job lock or check that the lock is currently allocated to this job.

Raise a ‘Cloudtasker::UniqueJob::LockError` if the lock if taken by another job.

Raises:



200
201
202
203
204
205
# File 'lib/cloudtasker/unique_job/job.rb', line 200

def lock!
  lock_acquired = redis.set(unique_gid, id, nx: true, ex: lock_ttl)
  lock_already_acquired = !lock_acquired && redis.get(unique_gid) == id

  raise(LockError) unless lock_acquired || lock_already_acquired
end

#lock_for_scheduling!Any

Acquire a provisional lock, yield, then set a final lock.

This method is designed for scheduling operations where you need to:

  1. Acquire a provisional lock to prevent concurrent scheduling

  2. Perform the scheduling operation (yield)

  3. Set a final lock with proper TTL after scheduling succeeds

Raises a ‘Cloudtasker::UniqueJob::LockError` if the provisional lock cannot be acquired.

Returns:

  • (Any)

    The return value of the block

Raises:



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/cloudtasker/unique_job/job.rb', line 220

def lock_for_scheduling!
  # Step 1: Acquire provisional lock
  # Check if the lock is already acquired from a previous run
  acquired = redis.get(unique_gid) == id

  # Set the lock exclusively, if not acquired already.
  # Refresh the duration otherwise.
  lock_acquired = redis.set(unique_gid, id, nx: !acquired, ex: lock_provisional_ttl)
  raise(LockError) unless lock_acquired

  # Step 2: Yield to perform scheduling operation
  result = yield

  # Step 3: Set final lock
  # Check if the lock is still held by this job
  acquired = redis.get(unique_gid) == id

  # Set the lock with final duration
  # If already acquired, refresh with final TTL
  # If not acquired (expired or taken), try to acquire exclusively
  final_lock_acquired = redis.set(unique_gid, id, nx: !acquired, ex: lock_ttl)

  # Log a warning if final lock could not be acquired
  # The job has already been enqueued at this point, so raising an error is useless
  worker.logger.warn(LOCK_FINALIZATION_WARNING) unless final_lock_acquired

  # Return the result of the block
  result
end

#lock_instanceAny

Return the instantiated lock.

Returns:

  • (Any)

    The instantiated lock



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/cloudtasker/unique_job/job.rb', line 99

def lock_instance
  @lock_instance ||=
    begin
      # Infer lock class and get instance
      lock_name = options[:lock]
      lock_klass = Lock.const_get(lock_name.to_s.split('_').collect(&:capitalize).join)
      lock_klass.new(self)
    rescue NameError
      DEFAULT_LOCK.new(self)
    end
end

#lock_provisional_ttlInteger

A provisional lock uses a very short duration and aims at covering the time it takes for the job to be enqueued through the client middleware chain.

If the application crashes during this time (e.g. OOM), at least the job won’t be locked for an extended period of time (which may span across a parent job retry, for instance)

This TTL can be configured via the ‘lock_provisional_ttl` option on the job itself.

Returns:

  • (Integer)

    The TTL in seconds



90
91
92
# File 'lib/cloudtasker/unique_job/job.rb', line 90

def lock_provisional_ttl
  (options[:lock_provisional_ttl] || Cloudtasker::UniqueJob.lock_provisional_ttl).to_i
end

#lock_ttlInteger

Return the Time To Live (TTL) that should be set in Redis for the lock key. Having a TTL on lock keys ensures that jobs do not end up stuck due to a dead lock situation.

The TTL is calculated using schedule time + expected max job duration.

The expected max job duration is set to 10 minutes by default. This value was chosen because it’s twice the default request timeout value in Cloud Run. This leaves enough room for queue lag (5 minutes) + job processing (5 minutes).

Queue lag is certainly the most unpredictable factor here. Job processing time is less of a factor. Jobs running for more than 5 minutes should be split into sub-jobs to limit invocation time over HTTP. Cloudtasker batch jobs can help achieve that if you need to make one big job split into sub-jobs “atomic”.

The default lock key expiration of “time_at + 10 minutes” may look aggressive but it is still a better choice than potentially having real-time jobs stuck for X hours.

The expected max job duration can be configured via the ‘lock_ttl` option on the job itself.

Returns:

  • (Integer)

    The TTL in seconds



65
66
67
68
69
70
71
72
73
74
# File 'lib/cloudtasker/unique_job/job.rb', line 65

def lock_ttl
  now = Time.now.to_i

  # Get scheduled at and lock duration
  scheduled_at = [call_opts[:time_at].to_i, now].compact.max
  lock_duration = (options[:lock_ttl] || Cloudtasker::UniqueJob.lock_ttl).to_i

  # Return the TTL, which is the configured lock_duration at minima
  [lock_duration, scheduled_at + lock_duration - now].max
end

#optionsHash

Return the worker configuration options.

Returns:

  • (Hash)

    The worker configuration options.



35
36
37
# File 'lib/cloudtasker/unique_job/job.rb', line 35

def options
  worker.class.cloudtasker_options_hash
end

#redisCloudtasker::RedisClient

Return the Cloudtasker redis client.

Returns:



189
190
191
# File 'lib/cloudtasker/unique_job/job.rb', line 189

def redis
  @redis ||= Cloudtasker::RedisClient.new
end

#unique_argsArray<any>

Return the list of arguments used for job uniqueness.

Returns:

  • (Array<any>)

    The list of unique arguments



116
117
118
# File 'lib/cloudtasker/unique_job/job.rb', line 116

def unique_args
  worker.try(:unique_args, worker.job_args) || worker.job_args
end

#unique_gidString

Return the Global ID of the unique job. The gid includes the UniqueJob namespace.

Returns:

  • (String)

    The global ID of the job



180
181
182
# File 'lib/cloudtasker/unique_job/job.rb', line 180

def unique_gid
  [self.class.to_s.underscore, unique_id].join('/')
end

#unique_idString

Return the ID of the unique job.

Returns:

  • (String)

    The ID of the job.



170
171
172
# File 'lib/cloudtasker/unique_job/job.rb', line 170

def unique_id
  Digest::SHA256.hexdigest(digest_hash.to_json)
end

#unique_scopeHash

Return a scope to be included in the digest hash

Returns:

  • (Hash)

    A scope hash



139
140
141
# File 'lib/cloudtasker/unique_job/job.rb', line 139

def unique_scope
  base_unique_scope.to_h.merge(worker.try(:unique_scope).to_h)
end

#unlock!Object

Delete the job lock.



253
254
255
256
# File 'lib/cloudtasker/unique_job/job.rb', line 253

def unlock!
  locked_id = redis.get(unique_gid)
  redis.del(unique_gid) if locked_id == id
end