Class: Cloudtasker::Batch::Job
- Inherits:
-
Object
- Object
- Cloudtasker::Batch::Job
- Defined in:
- lib/cloudtasker/batch/job.rb
Overview
Handle batch management
Constant Summary collapse
- JOBS_NAMESPACE =
Key Namespace used for object saved under this class
'jobs'
- STATES_NAMESPACE =
'states'
- BATCH_STATUSES =
List of sub-job statuses taken into account when evaluating if the batch is complete.
Batch jobs go through the following states:
-
scheduled: the parent batch has enqueued a worker for the child job
-
processing: the child job is running
-
completed: the child job has completed successfully
-
errored: the child job has encountered an error and must retry
-
dead: the child job has exceeded its max number of retries
The ‘dead’ status is considered to be a completion status as it means that the job will never succeed. There is no point in blocking the batch forever so we proceed forward eventually.
-
%w[scheduled processing completed errored dead all].freeze
- COMPLETION_STATUSES =
%w[completed dead].freeze
- IGNORED_ERRORED_CALLBACKS =
These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped
%i[on_child_error on_child_dead].freeze
- BATCH_MAX_LOCK_WAIT =
The maximum number of seconds to wait for a batch state lock to be acquired.
60
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.find(worker_id) ⇒ Cloudtasker::Batch::Job?
Find a batch by id.
-
.for(worker) ⇒ Cloudtasker::Batch::Job
Attach a batch to a worker.
-
.key(val) ⇒ String
Return a namespaced key.
-
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch.
-
#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch using a specific queue.
-
#batch_gid ⇒ String
Return the namespaced worker id.
-
#batch_id ⇒ String
Return the worker id.
-
#batch_state ⇒ Hash
Return the batch state.
-
#batch_state_count(state) ⇒ String
Return the number of jobs in a given state.
-
#batch_state_count_gid(state) ⇒ String
Return the key under which the batch progress is stored for a specific state.
-
#batch_state_gid ⇒ String
Return the key under which the batch state is stored.
-
#cleanup ⇒ Object
Remove all batch and sub-batch keys from Redis.
-
#complete(status = :completed) ⇒ Object
Post-perform logic.
-
#complete? ⇒ Boolean
Return true if all the child workers have completed.
-
#enqueued_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs that have been enqueued as part of the batch.
-
#execute ⇒ Object
Execute the batch.
-
#initialize(worker) ⇒ Job
constructor
Build a new instance of the class.
-
#key(val) ⇒ String
Return a namespaced key.
-
#migrate_batch_state_to_redis_hash ⇒ Object
This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.
-
#migrate_progress_stats_to_redis_counters ⇒ Object
This method initializes the batch job counters if not set already.
-
#on_batch_node_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when any batch in the tree gets completed.
-
#on_child_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when a direct child batch is complete.
-
#on_complete(status = :completed) ⇒ Object
Callback invoked when the batch is complete.
-
#parent_batch ⇒ Cloudtasker::Batch::Job?
Return the parent batch, if any.
-
#pending_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs to be enqueued in the batch.
-
#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress
Calculate the progress of the batch.
-
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
-
#reenqueued? ⇒ Boolean
Return true if the worker has been re-enqueued.
-
#run_worker_callback(callback, *args) ⇒ any
Run worker callback.
-
#save ⇒ Object
Save serialized version of the worker.
-
#schedule_pending_jobs ⇒ Object
Schedule the child workers that were added to the batch.
-
#setup ⇒ Object
Save the batch and enqueue all child workers attached to it.
-
#update_state(batch_id, status) ⇒ Object
Update the batch state.
Constructor Details
#initialize(worker) ⇒ Job
Build a new instance of the class.
103 104 105 |
# File 'lib/cloudtasker/batch/job.rb', line 103 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
7 8 9 |
# File 'lib/cloudtasker/batch/job.rb', line 7 def worker @worker end |
Class Method Details
.find(worker_id) ⇒ Cloudtasker::Batch::Job?
Find a batch by id.
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/cloudtasker/batch/job.rb', line 54 def self.find(worker_id) return nil unless worker_id # Retrieve related worker payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}")) worker = Cloudtasker::Worker.from_hash(payload) return nil unless worker # Build batch job self.for(worker) end |
.for(worker) ⇒ Cloudtasker::Batch::Job
Attach a batch to a worker
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/cloudtasker/batch/job.rb', line 86 def self.for(worker) # Load extension if not loaded already on the worker class worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker # Add batch and parent batch to worker worker.batch = new(worker) worker.parent_batch = worker.batch.parent_batch # Return the batch worker.batch end |
.key(val) ⇒ String
Return a namespaced key.
73 74 75 76 77 |
# File 'lib/cloudtasker/batch/job.rb', line 73 def self.key(val) return nil if val.nil? [to_s.underscore, val.to_s].join('/') end |
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
43 44 45 |
# File 'lib/cloudtasker/batch/job.rb', line 43 def self.redis @redis ||= RedisClient.new end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
133 134 135 |
# File 'lib/cloudtasker/batch/job.rb', line 133 def ==(other) other.is_a?(self.class) && other.batch_id == batch_id end |
#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch
242 243 244 |
# File 'lib/cloudtasker/batch/job.rb', line 242 def add(worker_klass, *args) add_to_queue(worker.job_queue, worker_klass, *args) end |
#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch using a specific queue.
255 256 257 258 259 260 261 |
# File 'lib/cloudtasker/batch/job.rb', line 255 def add_to_queue(queue, worker_klass, *args) pending_jobs << worker_klass.new( job_args: args, job_meta: { key(:parent_id) => batch_id }, job_queue: queue ) end |
#batch_gid ⇒ String
Return the namespaced worker id.
173 174 175 |
# File 'lib/cloudtasker/batch/job.rb', line 173 def batch_gid key("#{JOBS_NAMESPACE}/#{batch_id}") end |
#batch_id ⇒ String
Return the worker id.
164 165 166 |
# File 'lib/cloudtasker/batch/job.rb', line 164 def batch_id worker&.job_id end |
#batch_state ⇒ Hash
Return the batch state
228 229 230 231 232 |
# File 'lib/cloudtasker/batch/job.rb', line 228 def batch_state migrate_batch_state_to_redis_hash redis.hgetall(batch_state_gid) end |
#batch_state_count(state) ⇒ String
Return the number of jobs in a given state
201 202 203 |
# File 'lib/cloudtasker/batch/job.rb', line 201 def batch_state_count(state) redis.get(batch_state_count_gid(state)).to_i end |
#batch_state_count_gid(state) ⇒ String
Return the key under which the batch progress is stored for a specific state.
192 193 194 |
# File 'lib/cloudtasker/batch/job.rb', line 192 def batch_state_count_gid(state) "#{batch_state_gid}/state_count/#{state}" end |
#batch_state_gid ⇒ String
Return the key under which the batch state is stored.
182 183 184 |
# File 'lib/cloudtasker/batch/job.rb', line 182 def batch_state_gid key("#{STATES_NAMESPACE}/#{batch_id}") end |
#cleanup ⇒ Object
Remove all batch and sub-batch keys from Redis.
432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/cloudtasker/batch/job.rb', line 432 def cleanup migrate_batch_state_to_redis_hash # Delete child batches recursively redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup } # Delete batch redis entries redis.multi do |m| m.del(batch_gid) m.del(batch_state_gid) BATCH_STATUSES.each { |e| m.del(batch_state_count_gid(e)) } end end |
#complete(status = :completed) ⇒ Object
Post-perform logic. The parent batch is notified if the job is complete.
520 521 522 523 524 525 526 527 528 |
# File 'lib/cloudtasker/batch/job.rb', line 520 def complete(status = :completed) return true if reenqueued? # Notify the parent batch that a child is complete on_complete(status) if complete? # Notify the parent that a batch node has completed parent_batch&.on_batch_node_complete(self, status) end |
#complete? ⇒ Boolean
Return true if all the child workers have completed.
341 342 343 344 345 346 |
# File 'lib/cloudtasker/batch/job.rb', line 341 def complete? migrate_batch_state_to_redis_hash # Check that all child jobs have completed redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) } end |
#enqueued_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs that have been enqueued as part of the batch
219 220 221 |
# File 'lib/cloudtasker/batch/job.rb', line 219 def enqueued_jobs @enqueued_jobs ||= [] end |
#execute ⇒ Object
Execute the batch.
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 |
# File 'lib/cloudtasker/batch/job.rb', line 533 def execute # Update parent batch state parent_batch&.update_state(batch_id, :processing) # Perform job yield # Setup batch # Only applicable if the batch has pending_jobs setup # Save parent batch if batch was expanded parent_batch&.schedule_pending_jobs # Complete batch complete(:completed) rescue DeadWorkerError => e complete(:dead) raise(e) rescue StandardError => e complete(:errored) raise(e) end |
#key(val) ⇒ String
Return a namespaced key.
144 145 146 |
# File 'lib/cloudtasker/batch/job.rb', line 144 def key(val) self.class.key(val) end |
#migrate_batch_state_to_redis_hash ⇒ Object
This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.
267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/cloudtasker/batch/job.rb', line 267 def migrate_batch_state_to_redis_hash return unless redis.type(batch_state_gid) == 'string' # Migrate batch state to Redis hash if it is still using a legacy string key # We acquire a lock then check again redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do if redis.type(batch_state_gid) == 'string' state = redis.fetch(batch_state_gid) redis.del(batch_state_gid) redis.hset(batch_state_gid, state) if state.any? end end end |
#migrate_progress_stats_to_redis_counters ⇒ Object
This method initializes the batch job counters if not set already
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/cloudtasker/batch/job.rb', line 284 def migrate_progress_stats_to_redis_counters # Abort if counters have already been set. The 'all' counter acts as a feature flag. return if redis.exists?(batch_state_count_gid('all')) # Get all job states values = batch_state.values # Count by value redis.multi do |m| # Per status values.tally.each do |k, v| m.set(batch_state_count_gid(k), v) end # All counter m.set(batch_state_count_gid('all'), values.size) end end |
#on_batch_node_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when any batch in the tree gets completed.
419 420 421 422 423 424 425 426 427 |
# File 'lib/cloudtasker/batch/job.rb', line 419 def on_batch_node_complete(child_batch, status = :completed) return false unless status == :completed # Notify the worker that a batch node worker has completed run_worker_callback(:on_batch_node_complete, child_batch.worker) # Notify the parent batch that a node is complete parent_batch&.on_batch_node_complete(child_batch) end |
#on_child_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when a direct child batch is complete.
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 |
# File 'lib/cloudtasker/batch/job.rb', line 396 def on_child_complete(child_batch, status = :completed) # Update batch state update_state(child_batch.batch_id, status) # Notify the worker that a direct batch child worker has completed case status when :completed run_worker_callback(:on_child_complete, child_batch.worker) when :errored run_worker_callback(:on_child_error, child_batch.worker) when :dead run_worker_callback(:on_child_dead, child_batch.worker) end # Notify the parent batch that we are done with this batch on_complete if status != :errored && complete? end |
#on_complete(status = :completed) ⇒ Object
Callback invoked when the batch is complete
380 381 382 383 384 385 386 387 388 389 |
# File 'lib/cloudtasker/batch/job.rb', line 380 def on_complete(status = :completed) # Invoke worker callback run_worker_callback(:on_batch_complete) if status == :completed # Propagate event parent_batch&.on_child_complete(self, status) # The batch tree is complete. Cleanup the downstream tree. cleanup end |
#parent_batch ⇒ Cloudtasker::Batch::Job?
Return the parent batch, if any.
153 154 155 156 157 |
# File 'lib/cloudtasker/batch/job.rb', line 153 def parent_batch return nil unless (parent_id = worker..get(key(:parent_id))) @parent_batch ||= self.class.find(parent_id) end |
#pending_jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs to be enqueued in the batch
210 211 212 |
# File 'lib/cloudtasker/batch/job.rb', line 210 def pending_jobs @pending_jobs ||= [] end |
#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress
Calculate the progress of the batch.
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/cloudtasker/batch/job.rb', line 454 def progress(depth: 0) depth = depth.to_i # Initialize counters from batch state. This is only applicable to running batches # that started before the counter-based progress was implemented/released. migrate_progress_stats_to_redis_counters # Return immediately if we do not need to go down the tree return BatchProgress.new([self]) if depth <= 0 # Sum batch progress of current batch and sub-batches up to the specified # depth batch_state.to_h.reduce(BatchProgress.new([self])) do |memo, (child_id, _)| memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new) end end |
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
122 123 124 |
# File 'lib/cloudtasker/batch/job.rb', line 122 def redis self.class.redis end |
#reenqueued? ⇒ Boolean
Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.
113 114 115 |
# File 'lib/cloudtasker/batch/job.rb', line 113 def reenqueued? worker.job_reenqueued end |
#run_worker_callback(callback, *args) ⇒ any
Run worker callback. The error and dead callbacks get silenced should they raise an error.
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
# File 'lib/cloudtasker/batch/job.rb', line 357 def run_worker_callback(callback, *args) worker.try(callback, *args).tap do # Enqueue pending jobs if batch was expanded in callback # A completed batch cannot receive additional jobs schedule_pending_jobs if callback.to_sym != :on_batch_complete # Schedule pending jobs on parent if batch was expanded parent_batch&.schedule_pending_jobs end rescue StandardError => e # There is no point in retrying jobs due to failure callbacks failing # Only completion callbacks will trigger a re-run of the job because # these do matter for batch completion raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback) # Log error instead worker.logger.error(e) worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.") end |
#save ⇒ Object
Save serialized version of the worker.
This is required to be able to invoke callback methods in the context of the worker (= instantiated worker) when child workers complete (success or failure).
310 311 312 |
# File 'lib/cloudtasker/batch/job.rb', line 310 def save redis.write(batch_gid, worker.to_h) end |
#schedule_pending_jobs ⇒ Object
Schedule the child workers that were added to the batch
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/cloudtasker/batch/job.rb', line 474 def schedule_pending_jobs ret_list = [] while (j = pending_jobs.shift) # Schedule the job # Skip batch registration if the job was not actually scheduled # E.g. the job was evicted due to uniqueness requirements next unless j.schedule # Initialize the batch state unless the job has already started (and taken # hold of its own status) # The batch state is initialized only after the job is scheduled to avoid # having never-ending batches - which could occur if a batch was crashing # while enqueuing children due to a OOM error and since 'scheduled' is a # blocking status. redis.multi do |m| m.hsetnx(batch_state_gid, j.job_id, 'scheduled') m.incr(batch_state_count_gid('scheduled')) m.incr(batch_state_count_gid('all')) end # Flag job as enqueued ret_list << j enqueued_jobs << j end # Return the list of jobs just enqueued ret_list end |
#setup ⇒ Object
Save the batch and enqueue all child workers attached to it.
507 508 509 510 511 512 513 514 515 |
# File 'lib/cloudtasker/batch/job.rb', line 507 def setup return true if pending_jobs.empty? # Save batch save # Schedule all child workers schedule_pending_jobs end |
#update_state(batch_id, status) ⇒ Object
Update the batch state.
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/cloudtasker/batch/job.rb', line 320 def update_state(batch_id, status) migrate_batch_state_to_redis_hash # Get current status current_status = redis.hget(batch_state_gid, batch_id) return if current_status == status.to_s # Update the batch state batch_id entry with the new status # and update counters redis.multi do |m| m.hset(batch_state_gid, batch_id, status) m.decr(batch_state_count_gid(current_status)) m.incr(batch_state_count_gid(status)) end end |