Class: Cloudtasker::Batch::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/batch/job.rb

Overview

Handle batch management

Constant Summary collapse

JOBS_NAMESPACE =

Key Namespace used for object saved under this class

'jobs'
STATES_NAMESPACE =
'states'
BATCH_STATUSES =

List of sub-job statuses taken into account when evaluating if the batch is complete.

Batch jobs go through the following states:

  • scheduled: the parent batch has enqueued a worker for the child job

  • processing: the child job is running

  • completed: the child job has completed successfully

  • errored: the child job has encountered an error and must retry

  • dead: the child job has exceeded its max number of retries

The ‘dead’ status is considered to be a completion status as it means that the job will never succeed. There is no point in blocking the batch forever so we proceed forward eventually.

%w[scheduled processing completed errored dead all].freeze
COMPLETION_STATUSES =
%w[completed dead].freeze
IGNORED_ERRORED_CALLBACKS =

These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped

%i[on_child_error on_child_dead].freeze
BATCH_MAX_LOCK_WAIT =

The maximum number of seconds to wait for a batch state lock to be acquired.

60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Job

Build a new instance of the class.

Parameters:



103
104
105
# File 'lib/cloudtasker/batch/job.rb', line 103

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



7
8
9
# File 'lib/cloudtasker/batch/job.rb', line 7

def worker
  @worker
end

Class Method Details

.find(worker_id) ⇒ Cloudtasker::Batch::Job?

Find a batch by id.

Parameters:

  • batch_id (String)

    The batch id.

Returns:



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/cloudtasker/batch/job.rb', line 54

def self.find(worker_id)
  return nil unless worker_id

  # Retrieve related worker
  payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}"))
  worker = Cloudtasker::Worker.from_hash(payload)
  return nil unless worker

  # Build batch job
  self.for(worker)
end

.for(worker) ⇒ Cloudtasker::Batch::Job

Attach a batch to a worker

Parameters:

Returns:



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/cloudtasker/batch/job.rb', line 86

def self.for(worker)
  # Load extension if not loaded already on the worker class
  worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker

  # Add batch and parent batch to worker
  worker.batch = new(worker)
  worker.parent_batch = worker.batch.parent_batch

  # Return the batch
  worker.batch
end

.key(val) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



73
74
75
76
77
# File 'lib/cloudtasker/batch/job.rb', line 73

def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end

.redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



43
44
45
# File 'lib/cloudtasker/batch/job.rb', line 43

def self.redis
  @redis ||= RedisClient.new
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



133
134
135
# File 'lib/cloudtasker/batch/job.rb', line 133

def ==(other)
  other.is_a?(self.class) && other.batch_id == batch_id
end

#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>

Add a worker to the batch

Parameters:

  • worker_klass (Class)

    The worker class.

  • *args (Array<any>)

    The worker arguments.

Returns:



242
243
244
# File 'lib/cloudtasker/batch/job.rb', line 242

def add(worker_klass, *args)
  add_to_queue(worker.job_queue, worker_klass, *args)
end

#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>

Add a worker to the batch using a specific queue.

Parameters:

  • queue (String, Symbol)

    The name of the queue

  • worker_klass (Class)

    The worker class.

  • *args (Array<any>)

    The worker arguments.

Returns:



255
256
257
258
259
260
261
# File 'lib/cloudtasker/batch/job.rb', line 255

def add_to_queue(queue, worker_klass, *args)
  pending_jobs << worker_klass.new(
    job_args: args,
    job_meta: { key(:parent_id) => batch_id },
    job_queue: queue
  )
end

#batch_gidString

Return the namespaced worker id.

Returns:

  • (String)

    The worker namespaced id.



173
174
175
# File 'lib/cloudtasker/batch/job.rb', line 173

def batch_gid
  key("#{JOBS_NAMESPACE}/#{batch_id}")
end

#batch_idString

Return the worker id.

Returns:

  • (String)

    The worker id.



164
165
166
# File 'lib/cloudtasker/batch/job.rb', line 164

def batch_id
  worker&.job_id
end

#batch_stateHash

Return the batch state

Returns:

  • (Hash)

    The state of each child worker.



228
229
230
231
232
# File 'lib/cloudtasker/batch/job.rb', line 228

def batch_state
  migrate_batch_state_to_redis_hash

  redis.hgetall(batch_state_gid)
end

#batch_state_count(state) ⇒ String

Return the number of jobs in a given state

Returns:

  • (String)

    The batch progress state namespaced id.



201
202
203
# File 'lib/cloudtasker/batch/job.rb', line 201

def batch_state_count(state)
  redis.get(batch_state_count_gid(state)).to_i
end

#batch_state_count_gid(state) ⇒ String

Return the key under which the batch progress is stored for a specific state.

Returns:

  • (String)

    The batch progress state namespaced id.



192
193
194
# File 'lib/cloudtasker/batch/job.rb', line 192

def batch_state_count_gid(state)
  "#{batch_state_gid}/state_count/#{state}"
end

#batch_state_gidString

Return the key under which the batch state is stored.

Returns:

  • (String)

    The batch state namespaced id.



182
183
184
# File 'lib/cloudtasker/batch/job.rb', line 182

def batch_state_gid
  key("#{STATES_NAMESPACE}/#{batch_id}")
end

#cleanupObject

Remove all batch and sub-batch keys from Redis.



432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/cloudtasker/batch/job.rb', line 432

def cleanup
  migrate_batch_state_to_redis_hash

  # Delete child batches recursively
  redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup }

  # Delete batch redis entries
  redis.multi do |m|
    m.del(batch_gid)
    m.del(batch_state_gid)
    BATCH_STATUSES.each { |e| m.del(batch_state_count_gid(e)) }
  end
end

#complete(status = :completed) ⇒ Object

Post-perform logic. The parent batch is notified if the job is complete.



520
521
522
523
524
525
526
527
528
# File 'lib/cloudtasker/batch/job.rb', line 520

def complete(status = :completed)
  return true if reenqueued?

  # Notify the parent batch that a child is complete
  on_complete(status) if complete?

  # Notify the parent that a batch node has completed
  parent_batch&.on_batch_node_complete(self, status)
end

#complete?Boolean

Return true if all the child workers have completed.

Returns:

  • (Boolean)

    True if the batch is complete.



341
342
343
344
345
346
# File 'lib/cloudtasker/batch/job.rb', line 341

def complete?
  migrate_batch_state_to_redis_hash

  # Check that all child jobs have completed
  redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) }
end

#enqueued_jobsArray<Cloudtasker::Worker>

The list of jobs that have been enqueued as part of the batch

Returns:



219
220
221
# File 'lib/cloudtasker/batch/job.rb', line 219

def enqueued_jobs
  @enqueued_jobs ||= []
end

#executeObject

Execute the batch.



533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
# File 'lib/cloudtasker/batch/job.rb', line 533

def execute
  # Update parent batch state
  parent_batch&.update_state(batch_id, :processing)

  # Perform job
  yield

  # Setup batch
  # Only applicable if the batch has pending_jobs
  setup

  # Save parent batch if batch was expanded
  parent_batch&.schedule_pending_jobs

  # Complete batch
  complete(:completed)
rescue DeadWorkerError => e
  complete(:dead)
  raise(e)
rescue StandardError => e
  complete(:errored)
  raise(e)
end

#key(val) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



144
145
146
# File 'lib/cloudtasker/batch/job.rb', line 144

def key(val)
  self.class.key(val)
end

#migrate_batch_state_to_redis_hashObject

This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.



267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/cloudtasker/batch/job.rb', line 267

def migrate_batch_state_to_redis_hash
  return unless redis.type(batch_state_gid) == 'string'

  # Migrate batch state to Redis hash if it is still using a legacy string key
  # We acquire a lock then check again
  redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do
    if redis.type(batch_state_gid) == 'string'
      state = redis.fetch(batch_state_gid)
      redis.del(batch_state_gid)
      redis.hset(batch_state_gid, state) if state.any?
    end
  end
end

#migrate_progress_stats_to_redis_countersObject

This method initializes the batch job counters if not set already



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/cloudtasker/batch/job.rb', line 284

def migrate_progress_stats_to_redis_counters
  # Abort if counters have already been set. The 'all' counter acts as a feature flag.
  return if redis.exists?(batch_state_count_gid('all'))

  # Get all job states
  values = batch_state.values

  # Count by value
  redis.multi do |m|
    # Per status
    values.tally.each do |k, v|
      m.set(batch_state_count_gid(k), v)
    end

    # All counter
    m.set(batch_state_count_gid('all'), values.size)
  end
end

#on_batch_node_complete(child_batch, status = :completed) ⇒ Object

Callback invoked when any batch in the tree gets completed.

Parameters:



419
420
421
422
423
424
425
426
427
# File 'lib/cloudtasker/batch/job.rb', line 419

def on_batch_node_complete(child_batch, status = :completed)
  return false unless status == :completed

  # Notify the worker that a batch node worker has completed
  run_worker_callback(:on_batch_node_complete, child_batch.worker)

  # Notify the parent batch that a node is complete
  parent_batch&.on_batch_node_complete(child_batch)
end

#on_child_complete(child_batch, status = :completed) ⇒ Object

Callback invoked when a direct child batch is complete.

Parameters:



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/cloudtasker/batch/job.rb', line 396

def on_child_complete(child_batch, status = :completed)
  # Update batch state
  update_state(child_batch.batch_id, status)

  # Notify the worker that a direct batch child worker has completed
  case status
  when :completed
    run_worker_callback(:on_child_complete, child_batch.worker)
  when :errored
    run_worker_callback(:on_child_error, child_batch.worker)
  when :dead
    run_worker_callback(:on_child_dead, child_batch.worker)
  end

  # Notify the parent batch that we are done with this batch
  on_complete if status != :errored && complete?
end

#on_complete(status = :completed) ⇒ Object

Callback invoked when the batch is complete



380
381
382
383
384
385
386
387
388
389
# File 'lib/cloudtasker/batch/job.rb', line 380

def on_complete(status = :completed)
  # Invoke worker callback
  run_worker_callback(:on_batch_complete) if status == :completed

  # Propagate event
  parent_batch&.on_child_complete(self, status)

  # The batch tree is complete. Cleanup the downstream tree.
  cleanup
end

#parent_batchCloudtasker::Batch::Job?

Return the parent batch, if any.

Returns:



153
154
155
156
157
# File 'lib/cloudtasker/batch/job.rb', line 153

def parent_batch
  return nil unless (parent_id = worker.job_meta.get(key(:parent_id)))

  @parent_batch ||= self.class.find(parent_id)
end

#pending_jobsArray<Cloudtasker::Worker>

The list of jobs to be enqueued in the batch

Returns:



210
211
212
# File 'lib/cloudtasker/batch/job.rb', line 210

def pending_jobs
  @pending_jobs ||= []
end

#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress

Calculate the progress of the batch.

Parameters:

  • depth (Integer) (defaults to: 0)

    The depth of calculation. Zero (default) means only immediate children will be taken into account.

Returns:



454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/cloudtasker/batch/job.rb', line 454

def progress(depth: 0)
  depth = depth.to_i

  # Initialize counters from batch state. This is only applicable to running batches
  # that started before the counter-based progress was implemented/released.
  migrate_progress_stats_to_redis_counters

  # Return immediately if we do not need to go down the tree
  return BatchProgress.new([self]) if depth <= 0

  # Sum batch progress of current batch and sub-batches up to the specified
  # depth
  batch_state.to_h.reduce(BatchProgress.new([self])) do |memo, (child_id, _)|
    memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new)
  end
end

#redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



122
123
124
# File 'lib/cloudtasker/batch/job.rb', line 122

def redis
  self.class.redis
end

#reenqueued?Boolean

Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.

Returns:

  • (Boolean)

    Return true if the job was reequeued.



113
114
115
# File 'lib/cloudtasker/batch/job.rb', line 113

def reenqueued?
  worker.job_reenqueued
end

#run_worker_callback(callback, *args) ⇒ any

Run worker callback. The error and dead callbacks get silenced should they raise an error.

Parameters:

  • callback (String, Symbol)

    The callback to run.

  • *args (Array<any>)

    The callback arguments.

Returns:

  • (any)

    The callback return value



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/cloudtasker/batch/job.rb', line 357

def run_worker_callback(callback, *args)
  worker.try(callback, *args).tap do
    # Enqueue pending jobs if batch was expanded in callback
    # A completed batch cannot receive additional jobs
    schedule_pending_jobs if callback.to_sym != :on_batch_complete

    # Schedule pending jobs on parent if batch was expanded
    parent_batch&.schedule_pending_jobs
  end
rescue StandardError => e
  # There is no point in retrying jobs due to failure callbacks failing
  # Only completion callbacks will trigger a re-run of the job because
  # these do matter for batch completion
  raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback)

  # Log error instead
  worker.logger.error(e)
  worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.")
end

#saveObject

Save serialized version of the worker.

This is required to be able to invoke callback methods in the context of the worker (= instantiated worker) when child workers complete (success or failure).



310
311
312
# File 'lib/cloudtasker/batch/job.rb', line 310

def save
  redis.write(batch_gid, worker.to_h)
end

#schedule_pending_jobsObject

Schedule the child workers that were added to the batch



474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'lib/cloudtasker/batch/job.rb', line 474

def schedule_pending_jobs
  ret_list = []

  while (j = pending_jobs.shift)
    # Schedule the job
    # Skip batch registration if the job was not actually scheduled
    # E.g. the job was evicted due to uniqueness requirements
    next unless j.schedule

    # Initialize the batch state unless the job has already started (and taken
    # hold of its own status)
    # The batch state is initialized only after the job is scheduled to avoid
    # having never-ending batches - which could occur if a batch was crashing
    # while enqueuing children due to a OOM error and since 'scheduled' is a
    # blocking status.
    redis.multi do |m|
      m.hsetnx(batch_state_gid, j.job_id, 'scheduled')
      m.incr(batch_state_count_gid('scheduled'))
      m.incr(batch_state_count_gid('all'))
    end

    # Flag job as enqueued
    ret_list << j
    enqueued_jobs << j
  end

  # Return the list of jobs just enqueued
  ret_list
end

#setupObject

Save the batch and enqueue all child workers attached to it.



507
508
509
510
511
512
513
514
515
# File 'lib/cloudtasker/batch/job.rb', line 507

def setup
  return true if pending_jobs.empty?

  # Save batch
  save

  # Schedule all child workers
  schedule_pending_jobs
end

#update_state(batch_id, status) ⇒ Object

Update the batch state.

Parameters:

  • job_id (String)

    The batch id.

  • status (String)

    The status of the sub-batch.



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/cloudtasker/batch/job.rb', line 320

def update_state(batch_id, status)
  migrate_batch_state_to_redis_hash

  # Get current status
  current_status = redis.hget(batch_state_gid, batch_id)
  return if current_status == status.to_s

  # Update the batch state batch_id entry with the new status
  # and update counters
  redis.multi do |m|
    m.hset(batch_state_gid, batch_id, status)
    m.decr(batch_state_count_gid(current_status))
    m.incr(batch_state_count_gid(status))
  end
end