Module: Cloudtasker::Worker
- Defined in:
- lib/cloudtasker/worker.rb,
lib/cloudtasker/testing.rb
Overview
Add extra methods for testing purpose
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
-
.clear_all ⇒ Object
Clear all jobs.
-
.drain_all ⇒ Array<any>
Run all the jobs.
-
.from_hash(hash) ⇒ Cloudtasker::Worker?
Return a worker instance from a worker hash description.
-
.from_json(json) ⇒ Cloudtasker::Worker?
Return a worker instance from a serialized worker.
-
.included(base) ⇒ Object
Add class method to including class.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#arguments_missing? ⇒ Boolean
Return true if the job arguments are missing.
-
#dispatch_deadline ⇒ Integer
Return the Dispatch deadline duration.
-
#execute ⇒ Any
Execute the worker by calling the ‘perform` with the args.
-
#initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil) ⇒ Object
Build a new worker instance.
-
#job_class_name ⇒ String
Return the class name of the worker.
-
#job_dead? ⇒ Boolean
Return true if the job has strictly excceeded its maximum number of retries.
-
#job_duration ⇒ Float
Return the time taken (in seconds) to perform the job.
-
#job_max_retries ⇒ Integer
Return the max number of retries allowed for this job.
-
#job_must_die? ⇒ Boolean
Return true if the job must declared dead upon raising an error.
-
#job_queue ⇒ String
Return the queue to use for this worker.
-
#logger ⇒ Logger, any
Return the Cloudtasker logger instance.
-
#new_instance ⇒ Cloudtasker::Worker
Return a new instance of the worker with the same args and metadata but with a different id.
-
#reenqueue(interval) ⇒ Cloudtasker::CloudTask
Helper method used to re-enqueue the job.
-
#run_callback(callback, *args) ⇒ any
Run worker callback.
-
#schedule(**args) ⇒ Cloudtasker::CloudTask?
Enqueue a worker, with or without delay.
-
#schedule_time(interval: nil, time_at: nil) ⇒ Integer?
Return a unix timestamp specifying when to run the task.
-
#to_h ⇒ Hash
Return a hash description of the worker.
-
#to_json(*args) ⇒ String
Return a json representation of the worker.
Class Method Details
.clear_all ⇒ Object
Clear all jobs.
99 100 101 |
# File 'lib/cloudtasker/testing.rb', line 99 def self.clear_all Backend::MemoryTask.clear end |
.drain_all ⇒ Array<any>
Run all the jobs.
108 109 110 |
# File 'lib/cloudtasker/testing.rb', line 108 def self.drain_all Backend::MemoryTask.drain end |
.from_hash(hash) ⇒ Cloudtasker::Worker?
Return a worker instance from a worker hash description. A worker hash description is typically generated by calling ‘MyWorker#to_h`
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/cloudtasker/worker.rb', line 36 def self.from_hash(hash) # Symbolize metadata keys and stringify job arguments payload = JSON.parse(hash.to_json, symbolize_names: true) payload[:job_args] = JSON.parse(payload[:job_args].to_json) # Extract worker parameters klass_name = payload&.dig(:worker) return nil unless klass_name # Check that worker class is a valid worker worker_klass = Object.const_get(klass_name) return nil unless worker_klass.include?(self) # Return instantiated worker worker_klass.new(**payload.slice(:job_queue, :job_args, :job_id, :job_meta, :job_retries, :task_id)) rescue NameError nil end |
.from_json(json) ⇒ Cloudtasker::Worker?
Return a worker instance from a serialized worker. A worker can be serialized by calling ‘MyWorker#to_json`
22 23 24 25 26 |
# File 'lib/cloudtasker/worker.rb', line 22 def self.from_json(json) from_hash(JSON.parse(json)) rescue JSON::ParserError nil end |
.included(base) ⇒ Object
Add class method to including class
7 8 9 10 11 12 |
# File 'lib/cloudtasker/worker.rb', line 7 def self.included(base) base.extend(ClassMethods) base.attr_writer :job_queue base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries, :perform_started_at, :perform_ended_at, :task_id end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
334 335 336 |
# File 'lib/cloudtasker/worker.rb', line 334 def ==(other) other.is_a?(self.class) && other.job_id == job_id end |
#arguments_missing? ⇒ Boolean
Return true if the job arguments are missing.
This may happen if a job was successfully run but retried due to Cloud Task dispatch deadline exceeded. If the arguments were stored in Redis then they may have been flushed already after the successful completion.
If job arguments are missing then the job will simply be declared dead.
386 387 388 |
# File 'lib/cloudtasker/worker.rb', line 386 def arguments_missing? job_args.empty? && ![0, -1].include?(method(:perform).arity) end |
#dispatch_deadline ⇒ Integer
Return the Dispatch deadline duration. Cloud Tasks will timeout the job after this duration is elapsed.
196 197 198 199 200 201 202 203 204 |
# File 'lib/cloudtasker/worker.rb', line 196 def dispatch_deadline @dispatch_deadline ||= begin configured_deadline = ( self.class.[:dispatch_deadline] || Cloudtasker.config.dispatch_deadline ).to_i configured_deadline.clamp(Config::MIN_DISPATCH_DEADLINE, Config::MAX_DISPATCH_DEADLINE) end end |
#execute ⇒ Any
Execute the worker by calling the ‘perform` with the args.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/cloudtasker/worker.rb', line 220 def execute logger.info('Starting job...') # Perform job logic resp = execute_middleware_chain # Log job completion and return result logger.info("Job done after #{job_duration}s") { { duration: job_duration * 1000 } } resp rescue DeadWorkerError => e logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration * 1000 } } raise(e) rescue RetryWorkerError => e logger.info("Job done after #{job_duration}s (retry requested)") { { duration: job_duration * 1000 } } raise(e) rescue StandardError => e logger.info("Job failed after #{job_duration}s") { { duration: job_duration * 1000 } } raise(e) end |
#initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil) ⇒ Object
Build a new worker instance.
163 164 165 166 167 168 169 170 |
# File 'lib/cloudtasker/worker.rb', line 163 def initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil) @job_args = job_args || [] @job_id = job_id || SecureRandom.uuid @job_meta = MetaStore.new() @job_retries = job_retries || 0 @job_queue = job_queue @task_id = task_id end |
#job_class_name ⇒ String
Return the class name of the worker.
177 178 179 |
# File 'lib/cloudtasker/worker.rb', line 177 def job_class_name self.class.to_s end |
#job_dead? ⇒ Boolean
Return true if the job has strictly excceeded its maximum number of retries.
Used a preemptive filter when running the job.
370 371 372 |
# File 'lib/cloudtasker/worker.rb', line 370 def job_dead? job_retries > job_max_retries end |
#job_duration ⇒ Float
Return the time taken (in seconds) to perform the job. This duration includes the middlewares and the actual perform method.
396 397 398 399 400 |
# File 'lib/cloudtasker/worker.rb', line 396 def job_duration return 0.0 unless perform_ended_at && perform_started_at @job_duration ||= (perform_ended_at - perform_started_at).ceil(3) end |
#job_max_retries ⇒ Integer
Return the max number of retries allowed for this job.
The order of precedence for retry lookup is:
-
Worker ‘max_retries` method
-
Class ‘max_retries` option
-
Cloudtasker ‘max_retries` config option
348 349 350 |
# File 'lib/cloudtasker/worker.rb', line 348 def job_max_retries @job_max_retries ||= try(:max_retries, *job_args) || self.class.max_retries end |
#job_must_die? ⇒ Boolean
Return true if the job must declared dead upon raising an error.
358 359 360 |
# File 'lib/cloudtasker/worker.rb', line 358 def job_must_die? job_retries >= job_max_retries end |
#job_queue ⇒ String
Return the queue to use for this worker.
186 187 188 |
# File 'lib/cloudtasker/worker.rb', line 186 def job_queue (@job_queue ||= self.class.[:queue] || Config::DEFAULT_JOB_QUEUE).to_s end |
#logger ⇒ Logger, any
Return the Cloudtasker logger instance.
211 212 213 |
# File 'lib/cloudtasker/worker.rb', line 211 def logger @logger ||= WorkerLogger.new(self) end |
#new_instance ⇒ Cloudtasker::Worker
Return a new instance of the worker with the same args and metadata but with a different id.
295 296 297 |
# File 'lib/cloudtasker/worker.rb', line 295 def new_instance self.class.new(job_queue: job_queue, job_args: job_args, job_meta: ) end |
#reenqueue(interval) ⇒ Cloudtasker::CloudTask
Helper method used to re-enqueue the job. Re-enqueued jobs keep the same job_id.
This helper may be useful when jobs must pause activity due to external factors such as when a third-party API is throttling the rate of API calls.
284 285 286 287 |
# File 'lib/cloudtasker/worker.rb', line 284 def reenqueue(interval) @job_reenqueued = true schedule(interval: interval) end |
#run_callback(callback, *args) ⇒ any
Run worker callback.
410 411 412 |
# File 'lib/cloudtasker/worker.rb', line 410 def run_callback(callback, *args) try(callback, *args) end |
#schedule(**args) ⇒ Cloudtasker::CloudTask?
Enqueue a worker, with or without delay.
263 264 265 266 267 268 269 270 271 |
# File 'lib/cloudtasker/worker.rb', line 263 def schedule(**args) # Evaluate when to schedule the job time_at = schedule_time(**args) # Schedule job through client middlewares Cloudtasker.config.client_middleware.invoke(self, time_at: time_at) do WorkerHandler.new(self).schedule(time_at: time_at) end end |
#schedule_time(interval: nil, time_at: nil) ⇒ Integer?
Return a unix timestamp specifying when to run the task.
248 249 250 251 252 253 |
# File 'lib/cloudtasker/worker.rb', line 248 def schedule_time(interval: nil, time_at: nil) return nil unless interval || time_at # Generate the complete Unix timestamp (time_at || Time.now).to_i + interval.to_i end |
#to_h ⇒ Hash
Return a hash description of the worker.
304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/cloudtasker/worker.rb', line 304 def to_h { worker: self.class.to_s, job_id: job_id, job_args: job_args, job_meta: .to_h, job_retries: job_retries, job_queue: job_queue, task_id: task_id } end |
#to_json(*args) ⇒ String
Return a json representation of the worker.
323 324 325 |
# File 'lib/cloudtasker/worker.rb', line 323 def to_json(*args) to_h.to_json(*args) end |