Class: Cloudtasker::Batch::Job
- Inherits:
-
Object
- Object
- Cloudtasker::Batch::Job
- Defined in:
- lib/cloudtasker/batch/job.rb
Overview
Handle batch management
Constant Summary collapse
- JOBS_NAMESPACE =
Key Namespace used for object saved under this class
'jobs'- STATES_NAMESPACE =
'states'- BATCH_STATUSES =
List of sub-job statuses taken into account when evaluating if the batch is complete.
Batch jobs go through the following states:
-
scheduled: the parent batch has enqueued a worker for the child job
-
processing: the child job is running
-
completed: the child job has completed successfully
-
errored: the child job has encountered an error and must retry
-
dead: the child job has exceeded its max number of retries
The ‘dead’ status is considered to be a completion status as it means that the job will never succeed. There is no point in blocking the batch forever so we proceed forward eventually.
-
%w[scheduled processing completed errored dead all].freeze
- COMPLETION_STATUSES =
%w[completed dead].freeze
- IGNORED_ERRORED_CALLBACKS =
These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped
%i[on_child_error on_child_dead].freeze
- BATCH_MAX_LOCK_WAIT =
The maximum number of seconds to wait for a batch state lock to be acquired.
60- BATCH_COMPLETION_TTL =
TTL for the completion flag that prevents concurrent children from triggering multiple on_complete calls.
100
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.find(worker_id) ⇒ Cloudtasker::Batch::Job?
Find a batch by id.
-
.for(worker) ⇒ Cloudtasker::Batch::Job
Attach a batch to a worker.
-
.key(val) ⇒ String
Return a namespaced key.
-
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch.
-
#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch using a specific queue.
-
#batch_completion_gid ⇒ String
Return the key used to track batch completion.
-
#batch_gid ⇒ String
Return the namespaced worker id.
-
#batch_id ⇒ String
Return the worker id.
-
#batch_state ⇒ Hash
Return the batch state.
-
#batch_state_count(state) ⇒ String
Return the number of jobs in a given state.
-
#batch_state_count_gid(state) ⇒ String
Return the key under which the batch progress is stored for a specific state.
-
#batch_state_gid ⇒ String
Return the key under which the batch state is stored.
-
#cleanup ⇒ Object
Remove all batch and sub-batch keys from Redis.
-
#complete(status = :completed) ⇒ Object
Post-perform logic.
-
#complete? ⇒ Boolean
Return true if all the child workers have completed.
-
#enqueued_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs that have been enqueued as part of the batch.
-
#execute ⇒ Object
Execute the batch.
-
#initialize(worker) ⇒ Job
constructor
Build a new instance of the class.
-
#key(val) ⇒ String
Return a namespaced key.
-
#migrate_batch_state_to_redis_hash ⇒ Object
This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.
-
#migrate_progress_stats_to_redis_counters ⇒ Object
This method initializes the batch job counters if not set already.
-
#on_batch_node_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when any batch in the tree gets completed.
-
#on_child_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when a direct child batch is complete.
-
#on_complete(status = :completed) ⇒ Object
Callback invoked when the batch is complete.
-
#parent_batch ⇒ Cloudtasker::Batch::Job?
Return the parent batch, if any.
-
#pending_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs to be enqueued in the batch.
-
#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress
Calculate the progress of the batch.
-
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
-
#reenqueued? ⇒ Boolean
Return true if the worker has been re-enqueued.
-
#run_worker_callback(callback, *args) ⇒ any
Run worker callback.
-
#save ⇒ Object
Save serialized version of the worker.
-
#schedule_pending_jobs ⇒ Object
Schedule the child workers that were added to the batch.
-
#setup ⇒ Object
Save the batch and enqueue all child workers attached to it.
-
#update_state(batch_id, status, force: false) ⇒ Object
Update the batch state.
Constructor Details
#initialize(worker) ⇒ Job
Build a new instance of the class.
107 108 109 |
# File 'lib/cloudtasker/batch/job.rb', line 107 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
7 8 9 |
# File 'lib/cloudtasker/batch/job.rb', line 7 def worker @worker end |
Class Method Details
.find(worker_id) ⇒ Cloudtasker::Batch::Job?
Find a batch by id.
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/cloudtasker/batch/job.rb', line 58 def self.find(worker_id) return nil unless worker_id # Retrieve related worker payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}")) worker = Cloudtasker::Worker.from_hash(payload) return nil unless worker # Build batch job self.for(worker) end |
.for(worker) ⇒ Cloudtasker::Batch::Job
Attach a batch to a worker
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/cloudtasker/batch/job.rb', line 90 def self.for(worker) # Load extension if not loaded already on the worker class worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker # Add batch and parent batch to worker worker.batch = new(worker) worker.parent_batch = worker.batch.parent_batch # Return the batch worker.batch end |
.key(val) ⇒ String
Return a namespaced key.
77 78 79 80 81 |
# File 'lib/cloudtasker/batch/job.rb', line 77 def self.key(val) return nil if val.nil? [to_s.underscore, val.to_s].join('/') end |
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
47 48 49 |
# File 'lib/cloudtasker/batch/job.rb', line 47 def self.redis @redis ||= RedisClient.new end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
137 138 139 |
# File 'lib/cloudtasker/batch/job.rb', line 137 def ==(other) other.is_a?(self.class) && other.batch_id == batch_id end |
#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch
255 256 257 |
# File 'lib/cloudtasker/batch/job.rb', line 255 def add(worker_klass, *args) add_to_queue(worker.job_queue, worker_klass, *args) end |
#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch using a specific queue.
268 269 270 271 272 273 274 |
# File 'lib/cloudtasker/batch/job.rb', line 268 def add_to_queue(queue, worker_klass, *args) pending_jobs << worker_klass.new( job_args: args, job_meta: { key(:parent_id) => batch_id }, job_queue: queue ) end |
#batch_completion_gid ⇒ String
Return the key used to track batch completion.
205 206 207 |
# File 'lib/cloudtasker/batch/job.rb', line 205 def batch_completion_gid "#{batch_state_gid}/completed" end |
#batch_gid ⇒ String
Return the namespaced worker id.
177 178 179 |
# File 'lib/cloudtasker/batch/job.rb', line 177 def batch_gid key("#{JOBS_NAMESPACE}/#{batch_id}") end |
#batch_id ⇒ String
Return the worker id.
168 169 170 |
# File 'lib/cloudtasker/batch/job.rb', line 168 def batch_id worker&.job_id end |
#batch_state ⇒ Hash
Return the batch state
241 242 243 244 245 |
# File 'lib/cloudtasker/batch/job.rb', line 241 def batch_state migrate_batch_state_to_redis_hash redis.hgetall(batch_state_gid) end |
#batch_state_count(state) ⇒ String
Return the number of jobs in a given state
214 215 216 |
# File 'lib/cloudtasker/batch/job.rb', line 214 def batch_state_count(state) redis.get(batch_state_count_gid(state)).to_i end |
#batch_state_count_gid(state) ⇒ String
Return the key under which the batch progress is stored for a specific state.
196 197 198 |
# File 'lib/cloudtasker/batch/job.rb', line 196 def batch_state_count_gid(state) "#{batch_state_gid}/state_count/#{state}" end |
#batch_state_gid ⇒ String
Return the key under which the batch state is stored.
186 187 188 |
# File 'lib/cloudtasker/batch/job.rb', line 186 def batch_state_gid key("#{STATES_NAMESPACE}/#{batch_id}") end |
#cleanup ⇒ Object
Remove all batch and sub-batch keys from Redis.
472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'lib/cloudtasker/batch/job.rb', line 472 def cleanup migrate_batch_state_to_redis_hash # Delete child batches recursively redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup } # Delete batch redis entries redis.multi do |m| m.del(batch_gid) m.del(batch_state_gid) m.del(batch_completion_gid) BATCH_STATUSES.each { |e| m.del(batch_state_count_gid(e)) } end end |
#complete(status = :completed) ⇒ Object
Post-perform logic. The parent batch is notified if the job is complete.
561 562 563 564 565 566 567 568 569 |
# File 'lib/cloudtasker/batch/job.rb', line 561 def complete(status = :completed) return true if reenqueued? # Notify the parent batch that a child is complete on_complete(status) if complete? # Notify the parent that a batch node has completed parent_batch&.on_batch_node_complete(self, status) end |
#complete? ⇒ Boolean
Return true if all the child workers have completed.
367 368 369 370 371 372 |
# File 'lib/cloudtasker/batch/job.rb', line 367 def complete? migrate_batch_state_to_redis_hash # Check that all child jobs have completed redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) } end |
#enqueued_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs that have been enqueued as part of the batch
232 233 234 |
# File 'lib/cloudtasker/batch/job.rb', line 232 def enqueued_jobs @enqueued_jobs ||= [] end |
#execute ⇒ Object
Execute the batch.
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 |
# File 'lib/cloudtasker/batch/job.rb', line 574 def execute # Update parent batch state parent_batch&.update_state(batch_id, :processing) # Perform job yield # Setup batch # Only applicable if the batch has pending_jobs setup # Save parent batch if batch was expanded parent_batch&.schedule_pending_jobs # Complete batch complete(:completed) rescue DeadWorkerError => e complete(:dead) raise(e) rescue StandardError => e complete(:errored) raise(e) end |
#key(val) ⇒ String
Return a namespaced key.
148 149 150 |
# File 'lib/cloudtasker/batch/job.rb', line 148 def key(val) self.class.key(val) end |
#migrate_batch_state_to_redis_hash ⇒ Object
This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.
280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/cloudtasker/batch/job.rb', line 280 def migrate_batch_state_to_redis_hash return unless redis.type(batch_state_gid) == 'string' # Migrate batch state to Redis hash if it is still using a legacy string key # We acquire a lock then check again redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do if redis.type(batch_state_gid) == 'string' state = redis.fetch(batch_state_gid) redis.del(batch_state_gid) redis.hset(batch_state_gid, state) if state.any? end end end |
#migrate_progress_stats_to_redis_counters ⇒ Object
This method initializes the batch job counters if not set already
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/cloudtasker/batch/job.rb', line 297 def migrate_progress_stats_to_redis_counters # Abort if counters have already been set. The 'all' counter acts as a feature flag. return if redis.exists?(batch_state_count_gid('all')) # Get all job states values = batch_state.values # Count by value redis.multi do |m| # Per status values.tally.each do |k, v| m.set(batch_state_count_gid(k), v) end # All counter m.set(batch_state_count_gid('all'), values.size) end end |
#on_batch_node_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when any batch in the tree gets completed.
459 460 461 462 463 464 465 466 467 |
# File 'lib/cloudtasker/batch/job.rb', line 459 def on_batch_node_complete(child_batch, status = :completed) return false unless status == :completed # Notify the worker that a batch node worker has completed run_worker_callback(:on_batch_node_complete, child_batch.worker) # Notify the parent batch that a node is complete parent_batch&.on_batch_node_complete(child_batch) end |
#on_child_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when a direct child batch is complete.
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/cloudtasker/batch/job.rb', line 422 def on_child_complete(child_batch, status = :completed) # Update batch state update_state(child_batch.batch_id, status) # Notify the worker that a direct batch child worker has completed case status when :completed run_worker_callback(:on_child_complete, child_batch.worker) when :errored run_worker_callback(:on_child_error, child_batch.worker) when :dead run_worker_callback(:on_child_dead, child_batch.worker) end return if status == :errored return unless complete? # Notify the parent batch that we are done with this batch. Use SETNX to ensure # only the first concurrent child to complete triggers on_complete. return unless redis.set(batch_completion_gid, true, nx: true, ex: BATCH_COMPLETION_TTL) begin on_complete rescue StandardError # Clear key on error so completion can be reattempted redis.del(batch_completion_gid) # Re-raise raise end end |
#on_complete(status = :completed) ⇒ Object
Callback invoked when the batch is complete
406 407 408 409 410 411 412 413 414 415 |
# File 'lib/cloudtasker/batch/job.rb', line 406 def on_complete(status = :completed) # Invoke worker callback run_worker_callback(:on_batch_complete) if status == :completed # Propagate event parent_batch&.on_child_complete(self, status) # The batch tree is complete. Cleanup the downstream tree. cleanup end |
#parent_batch ⇒ Cloudtasker::Batch::Job?
Return the parent batch, if any.
157 158 159 160 161 |
# File 'lib/cloudtasker/batch/job.rb', line 157 def parent_batch return nil unless (parent_id = worker..get(key(:parent_id))) @parent_batch ||= self.class.find(parent_id) end |
#pending_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs to be enqueued in the batch
223 224 225 |
# File 'lib/cloudtasker/batch/job.rb', line 223 def pending_jobs @pending_jobs ||= [] end |
#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress
Calculate the progress of the batch.
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/cloudtasker/batch/job.rb', line 495 def progress(depth: 0) depth = depth.to_i # Initialize counters from batch state. This is only applicable to running batches # that started before the counter-based progress was implemented/released. migrate_progress_stats_to_redis_counters # Return immediately if we do not need to go down the tree return BatchProgress.new([self]) if depth <= 0 # Sum batch progress of current batch and sub-batches up to the specified # depth batch_state.to_h.reduce(BatchProgress.new([self])) do |memo, (child_id, _)| memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new) end end |
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
126 127 128 |
# File 'lib/cloudtasker/batch/job.rb', line 126 def redis self.class.redis end |
#reenqueued? ⇒ Boolean
Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.
117 118 119 |
# File 'lib/cloudtasker/batch/job.rb', line 117 def reenqueued? worker.job_reenqueued end |
#run_worker_callback(callback, *args) ⇒ any
Run worker callback. The error and dead callbacks get silenced should they raise an error.
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/cloudtasker/batch/job.rb', line 383 def run_worker_callback(callback, *args) worker.try(callback, *args).tap do # Enqueue pending jobs if batch was expanded in callback # A completed batch cannot receive additional jobs schedule_pending_jobs if callback.to_sym != :on_batch_complete # Schedule pending jobs on parent if batch was expanded parent_batch&.schedule_pending_jobs end rescue StandardError => e # There is no point in retrying jobs due to failure callbacks failing # Only completion callbacks will trigger a re-run of the job because # these do matter for batch completion raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback) # Log error instead worker.logger.error(e) worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.") end |
#save ⇒ Object
Save serialized version of the worker.
This is required to be able to invoke callback methods in the context of the worker (= instantiated worker) when child workers complete (success or failure).
323 324 325 |
# File 'lib/cloudtasker/batch/job.rb', line 323 def save redis.write(batch_gid, worker.to_h) end |
#schedule_pending_jobs ⇒ Object
Schedule the child workers that were added to the batch
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 |
# File 'lib/cloudtasker/batch/job.rb', line 515 def schedule_pending_jobs ret_list = [] while (j = pending_jobs.shift) # Schedule the job # Skip batch registration if the job was not actually scheduled # E.g. the job was evicted due to uniqueness requirements next unless j.schedule # Initialize the batch state unless the job has already started (and taken # hold of its own status) # The batch state is initialized only after the job is scheduled to avoid # having never-ending batches - which could occur if a batch was crashing # while enqueuing children due to a OOM error and since 'scheduled' is a # blocking status. redis.multi do |m| m.hsetnx(batch_state_gid, j.job_id, 'scheduled') m.incr(batch_state_count_gid('scheduled')) m.incr(batch_state_count_gid('all')) end # Flag job as enqueued ret_list << j enqueued_jobs << j end # Return the list of jobs just enqueued ret_list end |
#setup ⇒ Object
Save the batch and enqueue all child workers attached to it.
548 549 550 551 552 553 554 555 556 |
# File 'lib/cloudtasker/batch/job.rb', line 548 def setup return true if pending_jobs.empty? # Save batch save # Schedule all child workers schedule_pending_jobs end |
#update_state(batch_id, status, force: false) ⇒ Object
Update the batch state.
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/cloudtasker/batch/job.rb', line 334 def update_state(batch_id, status, force: false) migrate_batch_state_to_redis_hash # Get stored status and abort if no changes stored_status = redis.hget(batch_state_gid, batch_id) return if stored_status == status.to_s # Abort if the job has already been flagged as completed # # A job may be duplicated and run concurrently if Cloud Task times out # before the job completes. # # In this case, the original job keeps running in the background while Cloud Task triggers a retry. # This retry runs in parallel of the hung job. The retry may complete before the hung job. # The hung job may eventually raise an error (e.g. timeout error) after the retried job has completed, # which would leave the job status as "errored" instead of "completed in the batch state without # the failsafe below. return if COMPLETION_STATUSES.include?(stored_status) && !force # Update the batch state batch_id entry with the new status # and update counters redis.multi do |m| m.hset(batch_state_gid, batch_id, status) m.decr(batch_state_count_gid(stored_status)) m.incr(batch_state_count_gid(status)) end end |