Class: Cloudtasker::Batch::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/batch/job.rb

Overview

Handle batch management

Constant Summary collapse

JOBS_NAMESPACE =

Key Namespace used for object saved under this class

'jobs'
STATES_NAMESPACE =
'states'
BATCH_STATUSES =

List of sub-job statuses taken into account when evaluating if the batch is complete.

Batch jobs go through the following states:

  • scheduled: the parent batch has enqueued a worker for the child job

  • processing: the child job is running

  • completed: the child job has completed successfully

  • errored: the child job has encountered an error and must retry

  • dead: the child job has exceeded its max number of retries

The ‘dead’ status is considered to be a completion status as it means that the job will never succeed. There is no point in blocking the batch forever so we proceed forward eventually.

%w[scheduled processing completed errored dead all].freeze
COMPLETION_STATUSES =
%w[completed dead].freeze
IGNORED_ERRORED_CALLBACKS =

These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped

%i[on_child_error on_child_dead].freeze
BATCH_MAX_LOCK_WAIT =

The maximum number of seconds to wait for a batch state lock to be acquired.

60
BATCH_COMPLETION_TTL =

TTL for the completion flag that prevents concurrent children from triggering multiple on_complete calls.

100

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Job

Build a new instance of the class.

Parameters:



107
108
109
# File 'lib/cloudtasker/batch/job.rb', line 107

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



7
8
9
# File 'lib/cloudtasker/batch/job.rb', line 7

def worker
  @worker
end

Class Method Details

.find(worker_id) ⇒ Cloudtasker::Batch::Job?

Find a batch by id.

Parameters:

  • batch_id (String)

    The batch id.

Returns:



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/cloudtasker/batch/job.rb', line 58

def self.find(worker_id)
  return nil unless worker_id

  # Retrieve related worker
  payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}"))
  worker = Cloudtasker::Worker.from_hash(payload)
  return nil unless worker

  # Build batch job
  self.for(worker)
end

.for(worker) ⇒ Cloudtasker::Batch::Job

Attach a batch to a worker

Parameters:

Returns:



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/cloudtasker/batch/job.rb', line 90

def self.for(worker)
  # Load extension if not loaded already on the worker class
  worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker

  # Add batch and parent batch to worker
  worker.batch = new(worker)
  worker.parent_batch = worker.batch.parent_batch

  # Return the batch
  worker.batch
end

.key(val) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



77
78
79
80
81
# File 'lib/cloudtasker/batch/job.rb', line 77

def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end

.redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



47
48
49
# File 'lib/cloudtasker/batch/job.rb', line 47

def self.redis
  @redis ||= RedisClient.new
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



137
138
139
# File 'lib/cloudtasker/batch/job.rb', line 137

def ==(other)
  other.is_a?(self.class) && other.batch_id == batch_id
end

#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>

Add a worker to the batch

Parameters:

  • worker_klass (Class)

    The worker class.

  • *args (Array<any>)

    The worker arguments.

Returns:



255
256
257
# File 'lib/cloudtasker/batch/job.rb', line 255

def add(worker_klass, *args)
  add_to_queue(worker.job_queue, worker_klass, *args)
end

#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>

Add a worker to the batch using a specific queue.

Parameters:

  • queue (String, Symbol)

    The name of the queue

  • worker_klass (Class)

    The worker class.

  • *args (Array<any>)

    The worker arguments.

Returns:



268
269
270
271
272
273
274
# File 'lib/cloudtasker/batch/job.rb', line 268

def add_to_queue(queue, worker_klass, *args)
  pending_jobs << worker_klass.new(
    job_args: args,
    job_meta: { key(:parent_id) => batch_id },
    job_queue: queue
  )
end

#batch_completion_gidString

Return the key used to track batch completion.

Returns:

  • (String)

    The batch completion key.



205
206
207
# File 'lib/cloudtasker/batch/job.rb', line 205

def batch_completion_gid
  "#{batch_state_gid}/completed"
end

#batch_gidString

Return the namespaced worker id.

Returns:

  • (String)

    The worker namespaced id.



177
178
179
# File 'lib/cloudtasker/batch/job.rb', line 177

def batch_gid
  key("#{JOBS_NAMESPACE}/#{batch_id}")
end

#batch_idString

Return the worker id.

Returns:

  • (String)

    The worker id.



168
169
170
# File 'lib/cloudtasker/batch/job.rb', line 168

def batch_id
  worker&.job_id
end

#batch_stateHash

Return the batch state

Returns:

  • (Hash)

    The state of each child worker.



241
242
243
244
245
# File 'lib/cloudtasker/batch/job.rb', line 241

def batch_state
  migrate_batch_state_to_redis_hash

  redis.hgetall(batch_state_gid)
end

#batch_state_count(state) ⇒ String

Return the number of jobs in a given state

Returns:

  • (String)

    The batch progress state namespaced id.



214
215
216
# File 'lib/cloudtasker/batch/job.rb', line 214

def batch_state_count(state)
  redis.get(batch_state_count_gid(state)).to_i
end

#batch_state_count_gid(state) ⇒ String

Return the key under which the batch progress is stored for a specific state.

Returns:

  • (String)

    The batch progress state namespaced id.



196
197
198
# File 'lib/cloudtasker/batch/job.rb', line 196

def batch_state_count_gid(state)
  "#{batch_state_gid}/state_count/#{state}"
end

#batch_state_gidString

Return the key under which the batch state is stored.

Returns:

  • (String)

    The batch state namespaced id.



186
187
188
# File 'lib/cloudtasker/batch/job.rb', line 186

def batch_state_gid
  key("#{STATES_NAMESPACE}/#{batch_id}")
end

#cleanupObject

Remove all batch and sub-batch keys from Redis.



472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/cloudtasker/batch/job.rb', line 472

def cleanup
  migrate_batch_state_to_redis_hash

  # Delete child batches recursively
  redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup }

  # Delete batch redis entries
  redis.multi do |m|
    m.del(batch_gid)
    m.del(batch_state_gid)
    m.del(batch_completion_gid)
    BATCH_STATUSES.each { |e| m.del(batch_state_count_gid(e)) }
  end
end

#complete(status = :completed) ⇒ Object

Post-perform logic. The parent batch is notified if the job is complete.



561
562
563
564
565
566
567
568
569
# File 'lib/cloudtasker/batch/job.rb', line 561

def complete(status = :completed)
  return true if reenqueued?

  # Notify the parent batch that a child is complete
  on_complete(status) if complete?

  # Notify the parent that a batch node has completed
  parent_batch&.on_batch_node_complete(self, status)
end

#complete?Boolean

Return true if all the child workers have completed.

Returns:

  • (Boolean)

    True if the batch is complete.



367
368
369
370
371
372
# File 'lib/cloudtasker/batch/job.rb', line 367

def complete?
  migrate_batch_state_to_redis_hash

  # Check that all child jobs have completed
  redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) }
end

#enqueued_jobsArray<Cloudtasker::Worker>

The list of jobs that have been enqueued as part of the batch

Returns:



232
233
234
# File 'lib/cloudtasker/batch/job.rb', line 232

def enqueued_jobs
  @enqueued_jobs ||= []
end

#executeObject

Execute the batch.



574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
# File 'lib/cloudtasker/batch/job.rb', line 574

def execute
  # Update parent batch state
  parent_batch&.update_state(batch_id, :processing)

  # Perform job
  yield

  # Setup batch
  # Only applicable if the batch has pending_jobs
  setup

  # Save parent batch if batch was expanded
  parent_batch&.schedule_pending_jobs

  # Complete batch
  complete(:completed)
rescue DeadWorkerError => e
  complete(:dead)
  raise(e)
rescue StandardError => e
  complete(:errored)
  raise(e)
end

#key(val) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



148
149
150
# File 'lib/cloudtasker/batch/job.rb', line 148

def key(val)
  self.class.key(val)
end

#migrate_batch_state_to_redis_hashObject

This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.



280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/cloudtasker/batch/job.rb', line 280

def migrate_batch_state_to_redis_hash
  return unless redis.type(batch_state_gid) == 'string'

  # Migrate batch state to Redis hash if it is still using a legacy string key
  # We acquire a lock then check again
  redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do
    if redis.type(batch_state_gid) == 'string'
      state = redis.fetch(batch_state_gid)
      redis.del(batch_state_gid)
      redis.hset(batch_state_gid, state) if state.any?
    end
  end
end

#migrate_progress_stats_to_redis_countersObject

This method initializes the batch job counters if not set already



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/cloudtasker/batch/job.rb', line 297

def migrate_progress_stats_to_redis_counters
  # Abort if counters have already been set. The 'all' counter acts as a feature flag.
  return if redis.exists?(batch_state_count_gid('all'))

  # Get all job states
  values = batch_state.values

  # Count by value
  redis.multi do |m|
    # Per status
    values.tally.each do |k, v|
      m.set(batch_state_count_gid(k), v)
    end

    # All counter
    m.set(batch_state_count_gid('all'), values.size)
  end
end

#on_batch_node_complete(child_batch, status = :completed) ⇒ Object

Callback invoked when any batch in the tree gets completed.

Parameters:



459
460
461
462
463
464
465
466
467
# File 'lib/cloudtasker/batch/job.rb', line 459

def on_batch_node_complete(child_batch, status = :completed)
  return false unless status == :completed

  # Notify the worker that a batch node worker has completed
  run_worker_callback(:on_batch_node_complete, child_batch.worker)

  # Notify the parent batch that a node is complete
  parent_batch&.on_batch_node_complete(child_batch)
end

#on_child_complete(child_batch, status = :completed) ⇒ Object

Callback invoked when a direct child batch is complete.

Parameters:



422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/cloudtasker/batch/job.rb', line 422

def on_child_complete(child_batch, status = :completed)
  # Update batch state
  update_state(child_batch.batch_id, status)

  # Notify the worker that a direct batch child worker has completed
  case status
  when :completed
    run_worker_callback(:on_child_complete, child_batch.worker)
  when :errored
    run_worker_callback(:on_child_error, child_batch.worker)
  when :dead
    run_worker_callback(:on_child_dead, child_batch.worker)
  end

  return if status == :errored
  return unless complete?

  # Notify the parent batch that we are done with this batch. Use SETNX to ensure
  # only the first concurrent child to complete triggers on_complete.
  return unless redis.set(batch_completion_gid, true, nx: true, ex: BATCH_COMPLETION_TTL)

  begin
    on_complete
  rescue StandardError
    # Clear key on error so completion can be reattempted
    redis.del(batch_completion_gid)

    # Re-raise
    raise
  end
end

#on_complete(status = :completed) ⇒ Object

Callback invoked when the batch is complete



406
407
408
409
410
411
412
413
414
415
# File 'lib/cloudtasker/batch/job.rb', line 406

def on_complete(status = :completed)
  # Invoke worker callback
  run_worker_callback(:on_batch_complete) if status == :completed

  # Propagate event
  parent_batch&.on_child_complete(self, status)

  # The batch tree is complete. Cleanup the downstream tree.
  cleanup
end

#parent_batchCloudtasker::Batch::Job?

Return the parent batch, if any.

Returns:



157
158
159
160
161
# File 'lib/cloudtasker/batch/job.rb', line 157

def parent_batch
  return nil unless (parent_id = worker.job_meta.get(key(:parent_id)))

  @parent_batch ||= self.class.find(parent_id)
end

#pending_jobsArray<Cloudtasker::Worker>

The list of jobs to be enqueued in the batch

Returns:



223
224
225
# File 'lib/cloudtasker/batch/job.rb', line 223

def pending_jobs
  @pending_jobs ||= []
end

#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress

Calculate the progress of the batch.

Parameters:

  • depth (Integer) (defaults to: 0)

    The depth of calculation. Zero (default) means only immediate children will be taken into account.

Returns:



495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/cloudtasker/batch/job.rb', line 495

def progress(depth: 0)
  depth = depth.to_i

  # Initialize counters from batch state. This is only applicable to running batches
  # that started before the counter-based progress was implemented/released.
  migrate_progress_stats_to_redis_counters

  # Return immediately if we do not need to go down the tree
  return BatchProgress.new([self]) if depth <= 0

  # Sum batch progress of current batch and sub-batches up to the specified
  # depth
  batch_state.to_h.reduce(BatchProgress.new([self])) do |memo, (child_id, _)|
    memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new)
  end
end

#redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



126
127
128
# File 'lib/cloudtasker/batch/job.rb', line 126

def redis
  self.class.redis
end

#reenqueued?Boolean

Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.

Returns:

  • (Boolean)

    Return true if the job was reequeued.



117
118
119
# File 'lib/cloudtasker/batch/job.rb', line 117

def reenqueued?
  worker.job_reenqueued
end

#run_worker_callback(callback, *args) ⇒ any

Run worker callback. The error and dead callbacks get silenced should they raise an error.

Parameters:

  • callback (String, Symbol)

    The callback to run.

  • *args (Array<any>)

    The callback arguments.

Returns:

  • (any)

    The callback return value



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/cloudtasker/batch/job.rb', line 383

def run_worker_callback(callback, *args)
  worker.try(callback, *args).tap do
    # Enqueue pending jobs if batch was expanded in callback
    # A completed batch cannot receive additional jobs
    schedule_pending_jobs if callback.to_sym != :on_batch_complete

    # Schedule pending jobs on parent if batch was expanded
    parent_batch&.schedule_pending_jobs
  end
rescue StandardError => e
  # There is no point in retrying jobs due to failure callbacks failing
  # Only completion callbacks will trigger a re-run of the job because
  # these do matter for batch completion
  raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback)

  # Log error instead
  worker.logger.error(e)
  worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.")
end

#saveObject

Save serialized version of the worker.

This is required to be able to invoke callback methods in the context of the worker (= instantiated worker) when child workers complete (success or failure).



323
324
325
# File 'lib/cloudtasker/batch/job.rb', line 323

def save
  redis.write(batch_gid, worker.to_h)
end

#schedule_pending_jobsObject

Schedule the child workers that were added to the batch



515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# File 'lib/cloudtasker/batch/job.rb', line 515

def schedule_pending_jobs
  ret_list = []

  while (j = pending_jobs.shift)
    # Schedule the job
    # Skip batch registration if the job was not actually scheduled
    # E.g. the job was evicted due to uniqueness requirements
    next unless j.schedule

    # Initialize the batch state unless the job has already started (and taken
    # hold of its own status)
    # The batch state is initialized only after the job is scheduled to avoid
    # having never-ending batches - which could occur if a batch was crashing
    # while enqueuing children due to a OOM error and since 'scheduled' is a
    # blocking status.
    redis.multi do |m|
      m.hsetnx(batch_state_gid, j.job_id, 'scheduled')
      m.incr(batch_state_count_gid('scheduled'))
      m.incr(batch_state_count_gid('all'))
    end

    # Flag job as enqueued
    ret_list << j
    enqueued_jobs << j
  end

  # Return the list of jobs just enqueued
  ret_list
end

#setupObject

Save the batch and enqueue all child workers attached to it.



548
549
550
551
552
553
554
555
556
# File 'lib/cloudtasker/batch/job.rb', line 548

def setup
  return true if pending_jobs.empty?

  # Save batch
  save

  # Schedule all child workers
  schedule_pending_jobs
end

#update_state(batch_id, status, force: false) ⇒ Object

Update the batch state.

Parameters:

  • batch_id (String)

    The batch id.

  • status (String)

    The status of the sub-batch.

  • force (Boolean) (defaults to: false)

    Force update the status even if the registered status is a completion status.



334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/cloudtasker/batch/job.rb', line 334

def update_state(batch_id, status, force: false)
  migrate_batch_state_to_redis_hash

  # Get stored status and abort if no changes
  stored_status = redis.hget(batch_state_gid, batch_id)
  return if stored_status == status.to_s

  # Abort if the job has already been flagged as completed
  #
  # A job may be duplicated and run concurrently if Cloud Task times out
  # before the job completes.
  #
  # In this case, the original job keeps running in the background while Cloud Task triggers a retry.
  # This retry runs in parallel of the hung job. The retry may complete before the hung job.
  # The hung job may eventually raise an error (e.g. timeout error) after the retried job has completed,
  # which would leave the job status as "errored" instead of "completed in the batch state without
  # the failsafe below.
  return if COMPLETION_STATUSES.include?(stored_status) && !force

  # Update the batch state batch_id entry with the new status
  # and update counters
  redis.multi do |m|
    m.hset(batch_state_gid, batch_id, status)
    m.decr(batch_state_count_gid(stored_status))
    m.incr(batch_state_count_gid(status))
  end
end