Module: Cloudtasker::Worker

Included in:
ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper, WorkerWrapper
Defined in:
lib/cloudtasker/worker.rb,
lib/cloudtasker/testing.rb

Overview

Add extra methods for testing purpose

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.clear_allObject

Clear all jobs.



99
100
101
# File 'lib/cloudtasker/testing.rb', line 99

def self.clear_all
  Backend::MemoryTask.clear
end

.drain_allArray<any>

Run all the jobs.

Returns:

  • (Array<any>)

    The return values of the workers perform method.



108
109
110
# File 'lib/cloudtasker/testing.rb', line 108

def self.drain_all
  Backend::MemoryTask.drain
end

.from_hash(hash) ⇒ Cloudtasker::Worker?

Return a worker instance from a worker hash description. A worker hash description is typically generated by calling ‘MyWorker#to_h`

Parameters:

  • hash (Hash)

    A worker hash description.

Returns:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/cloudtasker/worker.rb', line 36

def self.from_hash(hash)
  # Symbolize metadata keys and stringify job arguments
  payload = JSON.parse(hash.to_json, symbolize_names: true)
  payload[:job_args] = JSON.parse(payload[:job_args].to_json)

  # Extract worker parameters
  klass_name = payload&.dig(:worker)
  return nil unless klass_name

  # Check that worker class is a valid worker
  worker_klass = Object.const_get(klass_name)
  return nil unless worker_klass.include?(self)

  # Return instantiated worker
  worker_klass.new(**payload.slice(:job_queue, :job_args, :job_id, :job_meta, :job_retries, :task_id))
rescue NameError
  nil
end

.from_json(json) ⇒ Cloudtasker::Worker?

Return a worker instance from a serialized worker. A worker can be serialized by calling ‘MyWorker#to_json`

Parameters:

  • json (String)

    Worker serialized as json.

Returns:



22
23
24
25
26
# File 'lib/cloudtasker/worker.rb', line 22

def self.from_json(json)
  from_hash(JSON.parse(json))
rescue JSON::ParserError
  nil
end

.included(base) ⇒ Object

Add class method to including class



7
8
9
10
11
12
# File 'lib/cloudtasker/worker.rb', line 7

def self.included(base)
  base.extend(ClassMethods)
  base.attr_writer :job_queue
  base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries,
                     :perform_started_at, :perform_ended_at, :task_id
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



334
335
336
# File 'lib/cloudtasker/worker.rb', line 334

def ==(other)
  other.is_a?(self.class) && other.job_id == job_id
end

#arguments_missing?Boolean

Return true if the job arguments are missing.

This may happen if a job was successfully run but retried due to Cloud Task dispatch deadline exceeded. If the arguments were stored in Redis then they may have been flushed already after the successful completion.

If job arguments are missing then the job will simply be declared dead.

Returns:

  • (Boolean)

    True if the arguments are missing.



386
387
388
# File 'lib/cloudtasker/worker.rb', line 386

def arguments_missing?
  job_args.empty? && ![0, -1].include?(method(:perform).arity)
end

#dispatch_deadlineInteger

Return the Dispatch deadline duration. Cloud Tasks will timeout the job after this duration is elapsed.

Returns:

  • (Integer)

    The value in seconds.



196
197
198
199
200
201
202
203
204
# File 'lib/cloudtasker/worker.rb', line 196

def dispatch_deadline
  @dispatch_deadline ||= begin
    configured_deadline = (
      self.class.cloudtasker_options_hash[:dispatch_deadline] ||
      Cloudtasker.config.dispatch_deadline
    ).to_i
    configured_deadline.clamp(Config::MIN_DISPATCH_DEADLINE, Config::MAX_DISPATCH_DEADLINE)
  end
end

#executeAny

Execute the worker by calling the ‘perform` with the args.

Returns:

  • (Any)

    The result of the perform.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/cloudtasker/worker.rb', line 220

def execute
  logger.info('Starting job...')

  # Perform job logic
  resp = execute_middleware_chain

  # Log job completion and return result
  logger.info("Job done after #{job_duration}s") { { duration: job_duration * 1000 } }
  resp
rescue DeadWorkerError => e
  logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration * 1000 } }
  raise(e)
rescue RetryWorkerError => e
  logger.info("Job done after #{job_duration}s (retry requested)") { { duration: job_duration * 1000 } }
  raise(e)
rescue StandardError => e
  logger.info("Job failed after #{job_duration}s") { { duration: job_duration * 1000 } }
  raise(e)
end

#initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil) ⇒ Object

Build a new worker instance.

Parameters:

  • job_args (Array<any>) (defaults to: nil)

    The list of perform args.

  • job_id (String) (defaults to: nil)

    A unique ID identifying this job.



163
164
165
166
167
168
169
170
# File 'lib/cloudtasker/worker.rb', line 163

def initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil)
  @job_args = job_args || []
  @job_id = job_id || SecureRandom.uuid
  @job_meta = MetaStore.new(job_meta)
  @job_retries = job_retries || 0
  @job_queue = job_queue
  @task_id = task_id
end

#job_class_nameString

Return the class name of the worker.

Returns:

  • (String)

    The class name.



177
178
179
# File 'lib/cloudtasker/worker.rb', line 177

def job_class_name
  self.class.to_s
end

#job_dead?Boolean

Return true if the job has strictly excceeded its maximum number of retries.

Used a preemptive filter when running the job.

Returns:

  • (Boolean)

    True if the job is dead



370
371
372
# File 'lib/cloudtasker/worker.rb', line 370

def job_dead?
  job_retries > job_max_retries
end

#job_durationFloat

Return the time taken (in seconds) to perform the job. This duration includes the middlewares and the actual perform method.

Returns:

  • (Float)

    The time taken in seconds as a floating point number.



396
397
398
399
400
# File 'lib/cloudtasker/worker.rb', line 396

def job_duration
  return 0.0 unless perform_ended_at && perform_started_at

  @job_duration ||= (perform_ended_at - perform_started_at).ceil(3)
end

#job_max_retriesInteger

Return the max number of retries allowed for this job.

The order of precedence for retry lookup is:

  • Worker ‘max_retries` method

  • Class ‘max_retries` option

  • Cloudtasker ‘max_retries` config option

Returns:

  • (Integer)

    The number of retries



348
349
350
# File 'lib/cloudtasker/worker.rb', line 348

def job_max_retries
  @job_max_retries ||= try(:max_retries, *job_args) || self.class.max_retries
end

#job_must_die?Boolean

Return true if the job must declared dead upon raising an error.

Returns:

  • (Boolean)

    True if the job must die on error.



358
359
360
# File 'lib/cloudtasker/worker.rb', line 358

def job_must_die?
  job_retries >= job_max_retries
end

#job_queueString

Return the queue to use for this worker.

Returns:

  • (String)

    The name of queue.



186
187
188
# File 'lib/cloudtasker/worker.rb', line 186

def job_queue
  (@job_queue ||= self.class.cloudtasker_options_hash[:queue] || Config::DEFAULT_JOB_QUEUE).to_s
end

#loggerLogger, any

Return the Cloudtasker logger instance.

Returns:

  • (Logger, any)

    The cloudtasker logger.



211
212
213
# File 'lib/cloudtasker/worker.rb', line 211

def logger
  @logger ||= WorkerLogger.new(self)
end

#new_instanceCloudtasker::Worker

Return a new instance of the worker with the same args and metadata but with a different id.

Returns:



295
296
297
# File 'lib/cloudtasker/worker.rb', line 295

def new_instance
  self.class.new(job_queue: job_queue, job_args: job_args, job_meta: job_meta)
end

#reenqueue(interval) ⇒ Cloudtasker::CloudTask

Helper method used to re-enqueue the job. Re-enqueued jobs keep the same job_id.

This helper may be useful when jobs must pause activity due to external factors such as when a third-party API is throttling the rate of API calls.

Parameters:

  • interval (Integer)

    Delay to wait before processing the job again (in seconds).

Returns:



284
285
286
287
# File 'lib/cloudtasker/worker.rb', line 284

def reenqueue(interval)
  @job_reenqueued = true
  schedule(interval: interval)
end

#run_callback(callback, *args) ⇒ any

Run worker callback.

Parameters:

  • callback (String, Symbol)

    The callback to run.

  • *args (Array<any>)

    The callback arguments.

Returns:

  • (any)

    The callback return value



410
411
412
# File 'lib/cloudtasker/worker.rb', line 410

def run_callback(callback, *args)
  try(callback, *args)
end

#schedule(**args) ⇒ Cloudtasker::CloudTask?

Enqueue a worker, with or without delay.

Parameters:

  • interval (Integer)

    The delay in seconds.

  • interval (Time, Integer)

    The time at which the job should run

Returns:



263
264
265
266
267
268
269
270
271
# File 'lib/cloudtasker/worker.rb', line 263

def schedule(**args)
  # Evaluate when to schedule the job
  time_at = schedule_time(**args)

  # Schedule job through client middlewares
  Cloudtasker.config.client_middleware.invoke(self, time_at: time_at) do
    WorkerHandler.new(self).schedule(time_at: time_at)
  end
end

#schedule_time(interval: nil, time_at: nil) ⇒ Integer?

Return a unix timestamp specifying when to run the task.

Parameters:

  • interval (Integer, nil) (defaults to: nil)

    The time to wait.

  • time_at (Integer, nil) (defaults to: nil)

    The time at which the job should run.

Returns:

  • (Integer, nil)

    The Unix timestamp.



248
249
250
251
252
253
# File 'lib/cloudtasker/worker.rb', line 248

def schedule_time(interval: nil, time_at: nil)
  return nil unless interval || time_at

  # Generate the complete Unix timestamp
  (time_at || Time.now).to_i + interval.to_i
end

#to_hHash

Return a hash description of the worker.

Returns:

  • (Hash)

    The worker hash description.



304
305
306
307
308
309
310
311
312
313
314
# File 'lib/cloudtasker/worker.rb', line 304

def to_h
  {
    worker: self.class.to_s,
    job_id: job_id,
    job_args: job_args,
    job_meta: job_meta.to_h,
    job_retries: job_retries,
    job_queue: job_queue,
    task_id: task_id
  }
end

#to_json(*args) ⇒ String

Return a json representation of the worker.

Parameters:

  • *args (Array<any>)

    Arguments passed to to_json.

Returns:

  • (String)

    The worker json representation.



323
324
325
# File 'lib/cloudtasker/worker.rb', line 323

def to_json(*args)
  to_h.to_json(*args)
end