Module: Cloudtasker::Worker
- Defined in:
- lib/cloudtasker/worker.rb,
lib/cloudtasker/testing.rb
Overview
Add extra methods for testing purpose
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
-
.clear_all ⇒ Object
Clear all jobs.
-
.drain_all ⇒ Array<any>
Run all the jobs.
-
.from_hash(hash) ⇒ Cloudtasker::Worker?
Return a worker instance from a worker hash description.
-
.from_json(json) ⇒ Cloudtasker::Worker?
Return a worker instance from a serialized worker.
-
.included(base) ⇒ Object
Add class method to including class.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#arguments_missing? ⇒ Boolean
Return true if the job arguments are missing.
-
#dispatch_deadline ⇒ Integer
Return the Dispatch deadline duration.
-
#execute ⇒ Any
Execute the worker by calling the ‘perform` with the args.
-
#initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, job_attempts: 0, task_id: nil) ⇒ Object
Build a new worker instance.
-
#job_class_name ⇒ String
Return the class name of the worker.
-
#job_dead? ⇒ Boolean
Return true if the job has strictly excceeded its maximum number of retries.
-
#job_duration ⇒ Float
Return the time taken (in seconds) to perform the job.
-
#job_duration_ms ⇒ Float
Return the job_duration in milliseconds.
-
#job_max_retries ⇒ Integer
Return the max number of retries allowed for this job.
-
#job_must_die? ⇒ Boolean
Return true if the job must declared dead upon raising an error.
-
#job_queue ⇒ String
Return the queue to use for this worker.
-
#logger ⇒ Logger, any
Return the Cloudtasker logger instance.
-
#new_instance ⇒ Cloudtasker::Worker
Return a new instance of the worker with the same args and metadata but with a different id.
-
#reenqueue(interval) ⇒ Cloudtasker::CloudTask
Helper method used to re-enqueue the job.
-
#run_callback(callback, *args) ⇒ any
Run worker callback.
-
#schedule(**args) ⇒ Cloudtasker::CloudTask?
Enqueue a worker, with or without delay.
-
#schedule_time(interval: nil, time_at: nil) ⇒ Integer?
Return a unix timestamp specifying when to run the task.
-
#to_h ⇒ Hash
Return a hash description of the worker.
-
#to_json(*args) ⇒ String
Return a json representation of the worker.
Class Method Details
.clear_all ⇒ Object
Clear all jobs.
150 151 152 |
# File 'lib/cloudtasker/testing.rb', line 150 def self.clear_all Backend::MemoryTask.clear end |
.drain_all ⇒ Array<any>
Run all the jobs.
159 160 161 |
# File 'lib/cloudtasker/testing.rb', line 159 def self.drain_all Backend::MemoryTask.drain end |
.from_hash(hash) ⇒ Cloudtasker::Worker?
Return a worker instance from a worker hash description. A worker hash description is typically generated by calling ‘MyWorker#to_h`
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/cloudtasker/worker.rb', line 36 def self.from_hash(hash) # Symbolize metadata keys and stringify job arguments payload = JSON.parse(hash.to_json, symbolize_names: true) payload[:job_args] = JSON.parse(payload[:job_args].to_json) # Extract worker parameters klass_name = payload&.dig(:worker) return nil unless klass_name # Check that worker class is a valid worker worker_klass = Object.const_get(klass_name) return nil unless worker_klass.include?(self) # Return instantiated worker worker_klass.new(**payload.slice( :job_queue, :job_args, :job_id, :job_meta, :job_retries, :job_attempts, :task_id )) rescue NameError nil end |
.from_json(json) ⇒ Cloudtasker::Worker?
Return a worker instance from a serialized worker. A worker can be serialized by calling ‘MyWorker#to_json`
22 23 24 25 26 |
# File 'lib/cloudtasker/worker.rb', line 22 def self.from_json(json) from_hash(JSON.parse(json)) rescue JSON::ParserError nil end |
.included(base) ⇒ Object
Add class method to including class
7 8 9 10 11 12 |
# File 'lib/cloudtasker/worker.rb', line 7 def self.included(base) base.extend(ClassMethods) base.attr_writer :job_queue base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries, :job_attempts, :perform_started_at, :perform_ended_at, :task_id end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
363 364 365 |
# File 'lib/cloudtasker/worker.rb', line 363 def ==(other) other.is_a?(self.class) && other.job_id == job_id end |
#arguments_missing? ⇒ Boolean
Return true if the job arguments are missing.
This may happen if a job was successfully run but retried due to Cloud Task dispatch deadline exceeded. If the arguments were stored in Redis then they may have been flushed already after the successful completion.
If job arguments are missing then the job will simply be declared dead.
415 416 417 |
# File 'lib/cloudtasker/worker.rb', line 415 def arguments_missing? job_args.empty? && ![0, -1].include?(method(:perform).arity) end |
#dispatch_deadline ⇒ Integer
Return the Dispatch deadline duration. Cloud Tasks will timeout the job after this duration is elapsed.
222 223 224 225 226 227 228 229 230 |
# File 'lib/cloudtasker/worker.rb', line 222 def dispatch_deadline @dispatch_deadline ||= begin configured_deadline = ( self.class.[:dispatch_deadline] || Cloudtasker.config.dispatch_deadline ).to_i configured_deadline.clamp(Config::MIN_DISPATCH_DEADLINE, Config::MAX_DISPATCH_DEADLINE) end end |
#execute ⇒ Any
Execute the worker by calling the ‘perform` with the args.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/cloudtasker/worker.rb', line 246 def execute logger.info('Starting job...') # Perform job logic resp = execute_middleware_chain # Log job completion and return result logger.info("Job done after #{job_duration}s") { { duration: job_duration_ms } } resp rescue DeadWorkerError => e logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration_ms } } raise(e) rescue RetryWorkerError => e logger.info("Job done after #{job_duration}s (retry requested)") do { duration: job_duration_ms, reason: e. } end raise(e) rescue StandardError => e logger.info("Job failed after #{job_duration}s") { { duration: job_duration_ms } } raise(e) end |
#initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, job_attempts: 0, task_id: nil) ⇒ Object
Build a new worker instance.
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/cloudtasker/worker.rb', line 182 def initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, job_attempts: 0, task_id: nil) @job_args = job_args || [] @job_id = job_id || SecureRandom.uuid @job_meta = MetaStore.new() @job_retries = job_retries || 0 @job_attempts = job_attempts || 0 @job_queue = job_queue @task_id = task_id end |
#job_class_name ⇒ String
Return the class name of the worker.
198 199 200 |
# File 'lib/cloudtasker/worker.rb', line 198 def job_class_name self.class.to_s end |
#job_dead? ⇒ Boolean
Return true if the job has strictly excceeded its maximum number of retries.
Used a preemptive filter when running the job.
399 400 401 |
# File 'lib/cloudtasker/worker.rb', line 399 def job_dead? job_retries > job_max_retries end |
#job_duration ⇒ Float
Return the time taken (in seconds) to perform the job. This duration includes the middlewares and the actual perform method.
425 426 427 428 429 |
# File 'lib/cloudtasker/worker.rb', line 425 def job_duration return 0.0 unless perform_ended_at && perform_started_at @job_duration ||= (perform_ended_at - perform_started_at).ceil(3) end |
#job_duration_ms ⇒ Float
Return the job_duration in milliseconds
436 437 438 |
# File 'lib/cloudtasker/worker.rb', line 436 def job_duration_ms job_duration * 1000 end |
#job_max_retries ⇒ Integer
Return the max number of retries allowed for this job.
The order of precedence for retry lookup is:
-
Worker ‘max_retries` method
-
Class ‘max_retries` option
-
Cloudtasker ‘max_retries` config option
377 378 379 |
# File 'lib/cloudtasker/worker.rb', line 377 def job_max_retries @job_max_retries ||= try(:max_retries, *job_args) || self.class.max_retries end |
#job_must_die? ⇒ Boolean
Return true if the job must declared dead upon raising an error.
387 388 389 |
# File 'lib/cloudtasker/worker.rb', line 387 def job_must_die? job_retries >= job_max_retries end |
#job_queue ⇒ String
Return the queue to use for this worker.
207 208 209 210 211 212 213 214 |
# File 'lib/cloudtasker/worker.rb', line 207 def job_queue ( @job_queue ||= Thread.current[:cloudtasker_propagated_queue] || self.class.[:queue] || Config::DEFAULT_JOB_QUEUE ).to_s end |
#logger ⇒ Logger, any
Return the Cloudtasker logger instance.
237 238 239 |
# File 'lib/cloudtasker/worker.rb', line 237 def logger @logger ||= WorkerLogger.new(self) end |
#new_instance ⇒ Cloudtasker::Worker
Return a new instance of the worker with the same args and metadata but with a different id.
323 324 325 |
# File 'lib/cloudtasker/worker.rb', line 323 def new_instance self.class.new(job_queue: job_queue, job_args: job_args, job_meta: ) end |
#reenqueue(interval) ⇒ Cloudtasker::CloudTask
Helper method used to re-enqueue the job. Re-enqueued jobs keep the same job_id.
This helper may be useful when jobs must pause activity due to external factors such as when a third-party API is throttling the rate of API calls.
312 313 314 315 |
# File 'lib/cloudtasker/worker.rb', line 312 def reenqueue(interval) @job_reenqueued = true schedule(interval: interval) end |
#run_callback(callback, *args) ⇒ any
Run worker callback.
448 449 450 |
# File 'lib/cloudtasker/worker.rb', line 448 def run_callback(callback, *args) try(callback, *args) end |
#schedule(**args) ⇒ Cloudtasker::CloudTask?
Enqueue a worker, with or without delay.
291 292 293 294 295 296 297 298 299 |
# File 'lib/cloudtasker/worker.rb', line 291 def schedule(**args) # Evaluate when to schedule the job time_at = schedule_time(**args) # Schedule job through client middlewares Cloudtasker.config.client_middleware.invoke(self, time_at: time_at) do WorkerHandler.new(self).schedule(time_at: time_at) end end |
#schedule_time(interval: nil, time_at: nil) ⇒ Integer?
Return a unix timestamp specifying when to run the task.
276 277 278 279 280 281 |
# File 'lib/cloudtasker/worker.rb', line 276 def schedule_time(interval: nil, time_at: nil) return nil unless interval || time_at # Generate the complete Unix timestamp (time_at || Time.now).to_i + interval.to_i end |
#to_h ⇒ Hash
Return a hash description of the worker.
332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/cloudtasker/worker.rb', line 332 def to_h { worker: self.class.to_s, job_id: job_id, job_args: job_args, job_meta: .to_h, job_retries: job_retries, job_attempts: job_attempts, job_queue: job_queue, task_id: task_id } end |
#to_json(*args) ⇒ String
Return a json representation of the worker.
352 353 354 |
# File 'lib/cloudtasker/worker.rb', line 352 def to_json(*args) to_h.to_json(*args) end |