Module: Cloudtasker::Worker

Included in:
ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper, WorkerWrapper
Defined in:
lib/cloudtasker/worker.rb,
lib/cloudtasker/testing.rb

Overview

Add extra methods for testing purpose

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.clear_allObject

Clear all jobs.



150
151
152
# File 'lib/cloudtasker/testing.rb', line 150

def self.clear_all
  Backend::MemoryTask.clear
end

.drain_allArray<any>

Run all the jobs.

Returns:

  • (Array<any>)

    The return values of the workers perform method.



159
160
161
# File 'lib/cloudtasker/testing.rb', line 159

def self.drain_all
  Backend::MemoryTask.drain
end

.from_hash(hash) ⇒ Cloudtasker::Worker?

Return a worker instance from a worker hash description. A worker hash description is typically generated by calling ‘MyWorker#to_h`

Parameters:

  • hash (Hash)

    A worker hash description.

Returns:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/cloudtasker/worker.rb', line 36

def self.from_hash(hash)
  # Symbolize metadata keys and stringify job arguments
  payload = JSON.parse(hash.to_json, symbolize_names: true)
  payload[:job_args] = JSON.parse(payload[:job_args].to_json)

  # Extract worker parameters
  klass_name = payload&.dig(:worker)
  return nil unless klass_name

  # Check that worker class is a valid worker
  worker_klass = Object.const_get(klass_name)
  return nil unless worker_klass.include?(self)

  # Return instantiated worker
  worker_klass.new(**payload.slice(
    :job_queue, :job_args, :job_id, :job_meta,
    :job_retries, :job_attempts, :task_id
  ))
rescue NameError
  nil
end

.from_json(json) ⇒ Cloudtasker::Worker?

Return a worker instance from a serialized worker. A worker can be serialized by calling ‘MyWorker#to_json`

Parameters:

  • json (String)

    Worker serialized as json.

Returns:



22
23
24
25
26
# File 'lib/cloudtasker/worker.rb', line 22

def self.from_json(json)
  from_hash(JSON.parse(json))
rescue JSON::ParserError
  nil
end

.included(base) ⇒ Object

Add class method to including class



7
8
9
10
11
12
# File 'lib/cloudtasker/worker.rb', line 7

def self.included(base)
  base.extend(ClassMethods)
  base.attr_writer :job_queue
  base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries,
                     :job_attempts, :perform_started_at, :perform_ended_at, :task_id
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



363
364
365
# File 'lib/cloudtasker/worker.rb', line 363

def ==(other)
  other.is_a?(self.class) && other.job_id == job_id
end

#arguments_missing?Boolean

Return true if the job arguments are missing.

This may happen if a job was successfully run but retried due to Cloud Task dispatch deadline exceeded. If the arguments were stored in Redis then they may have been flushed already after the successful completion.

If job arguments are missing then the job will simply be declared dead.

Returns:

  • (Boolean)

    True if the arguments are missing.



415
416
417
# File 'lib/cloudtasker/worker.rb', line 415

def arguments_missing?
  job_args.empty? && ![0, -1].include?(method(:perform).arity)
end

#dispatch_deadlineInteger

Return the Dispatch deadline duration. Cloud Tasks will timeout the job after this duration is elapsed.

Returns:

  • (Integer)

    The value in seconds.



222
223
224
225
226
227
228
229
230
# File 'lib/cloudtasker/worker.rb', line 222

def dispatch_deadline
  @dispatch_deadline ||= begin
    configured_deadline = (
      self.class.cloudtasker_options_hash[:dispatch_deadline] ||
      Cloudtasker.config.dispatch_deadline
    ).to_i
    configured_deadline.clamp(Config::MIN_DISPATCH_DEADLINE, Config::MAX_DISPATCH_DEADLINE)
  end
end

#executeAny

Execute the worker by calling the ‘perform` with the args.

Returns:

  • (Any)

    The result of the worker perform method.



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/cloudtasker/worker.rb', line 246

def execute
  logger.info('Starting job...')

  # Perform job logic
  resp = execute_middleware_chain

  # Log job completion and return result
  logger.info("Job done after #{job_duration}s") { { duration: job_duration_ms } }
  resp
rescue DeadWorkerError => e
  logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration_ms } }
  raise(e)
rescue RetryWorkerError => e
  logger.info("Job done after #{job_duration}s (retry requested)") do
    { duration: job_duration_ms, reason: e.message }
  end
  raise(e)
rescue StandardError => e
  logger.info("Job failed after #{job_duration}s") { { duration: job_duration_ms } }
  raise(e)
end

#initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, job_attempts: 0, task_id: nil) ⇒ Object

Build a new worker instance.

Parameters:

  • job_args (Array<any>) (defaults to: nil)

    The list of perform args.

  • job_id (String) (defaults to: nil)

    A unique ID identifying this job.



182
183
184
185
186
187
188
189
190
191
# File 'lib/cloudtasker/worker.rb', line 182

def initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, job_attempts: 0,
               task_id: nil)
  @job_args = job_args || []
  @job_id = job_id || SecureRandom.uuid
  @job_meta = MetaStore.new(job_meta)
  @job_retries = job_retries || 0
  @job_attempts = job_attempts || 0
  @job_queue = job_queue
  @task_id = task_id
end

#job_class_nameString

Return the class name of the worker.

Returns:

  • (String)

    The class name.



198
199
200
# File 'lib/cloudtasker/worker.rb', line 198

def job_class_name
  self.class.to_s
end

#job_dead?Boolean

Return true if the job has strictly excceeded its maximum number of retries.

Used a preemptive filter when running the job.

Returns:

  • (Boolean)

    True if the job is dead



399
400
401
# File 'lib/cloudtasker/worker.rb', line 399

def job_dead?
  job_retries > job_max_retries
end

#job_durationFloat

Return the time taken (in seconds) to perform the job. This duration includes the middlewares and the actual perform method.

Returns:

  • (Float)

    The time taken in seconds as a floating point number.



425
426
427
428
429
# File 'lib/cloudtasker/worker.rb', line 425

def job_duration
  return 0.0 unless perform_ended_at && perform_started_at

  @job_duration ||= (perform_ended_at - perform_started_at).ceil(3)
end

#job_duration_msFloat

Return the job_duration in milliseconds

Returns:

  • (Float)

    The time taken in milliseconds as a floating point number.



436
437
438
# File 'lib/cloudtasker/worker.rb', line 436

def job_duration_ms
  job_duration * 1000
end

#job_max_retriesInteger

Return the max number of retries allowed for this job.

The order of precedence for retry lookup is:

  • Worker ‘max_retries` method

  • Class ‘max_retries` option

  • Cloudtasker ‘max_retries` config option

Returns:

  • (Integer)

    The number of retries



377
378
379
# File 'lib/cloudtasker/worker.rb', line 377

def job_max_retries
  @job_max_retries ||= try(:max_retries, *job_args) || self.class.max_retries
end

#job_must_die?Boolean

Return true if the job must declared dead upon raising an error.

Returns:

  • (Boolean)

    True if the job must die on error.



387
388
389
# File 'lib/cloudtasker/worker.rb', line 387

def job_must_die?
  job_retries >= job_max_retries
end

#job_queueString

Return the queue to use for this worker.

Returns:

  • (String)

    The name of queue.



207
208
209
210
211
212
213
214
# File 'lib/cloudtasker/worker.rb', line 207

def job_queue
  (
    @job_queue ||=
      Thread.current[:cloudtasker_propagated_queue] ||
      self.class.cloudtasker_options_hash[:queue] ||
      Config::DEFAULT_JOB_QUEUE
  ).to_s
end

#loggerLogger, any

Return the Cloudtasker logger instance.

Returns:

  • (Logger, any)

    The cloudtasker logger.



237
238
239
# File 'lib/cloudtasker/worker.rb', line 237

def logger
  @logger ||= WorkerLogger.new(self)
end

#new_instanceCloudtasker::Worker

Return a new instance of the worker with the same args and metadata but with a different id.

Returns:



323
324
325
# File 'lib/cloudtasker/worker.rb', line 323

def new_instance
  self.class.new(job_queue: job_queue, job_args: job_args, job_meta: job_meta)
end

#reenqueue(interval) ⇒ Cloudtasker::CloudTask

Helper method used to re-enqueue the job. Re-enqueued jobs keep the same job_id.

This helper may be useful when jobs must pause activity due to external factors such as when a third-party API is throttling the rate of API calls.

Parameters:

  • interval (Integer)

    Delay to wait before processing the job again (in seconds).

Returns:



312
313
314
315
# File 'lib/cloudtasker/worker.rb', line 312

def reenqueue(interval)
  @job_reenqueued = true
  schedule(interval: interval)
end

#run_callback(callback, *args) ⇒ any

Run worker callback.

Parameters:

  • callback (String, Symbol)

    The callback to run.

  • *args (Array<any>)

    The callback arguments.

Returns:

  • (any)

    The callback return value



448
449
450
# File 'lib/cloudtasker/worker.rb', line 448

def run_callback(callback, *args)
  try(callback, *args)
end

#schedule(**args) ⇒ Cloudtasker::CloudTask?

Enqueue a worker, with or without delay.

Parameters:

  • interval (Integer)

    The delay in seconds.

  • interval (Time, Integer)

    The time at which the job should run

Returns:



291
292
293
294
295
296
297
298
299
# File 'lib/cloudtasker/worker.rb', line 291

def schedule(**args)
  # Evaluate when to schedule the job
  time_at = schedule_time(**args)

  # Schedule job through client middlewares
  Cloudtasker.config.client_middleware.invoke(self, time_at: time_at) do
    WorkerHandler.new(self).schedule(time_at: time_at)
  end
end

#schedule_time(interval: nil, time_at: nil) ⇒ Integer?

Return a unix timestamp specifying when to run the task.

Parameters:

  • interval (Integer, nil) (defaults to: nil)

    The time to wait.

  • time_at (Integer, nil) (defaults to: nil)

    The time at which the job should run.

Returns:

  • (Integer, nil)

    The Unix timestamp.



276
277
278
279
280
281
# File 'lib/cloudtasker/worker.rb', line 276

def schedule_time(interval: nil, time_at: nil)
  return nil unless interval || time_at

  # Generate the complete Unix timestamp
  (time_at || Time.now).to_i + interval.to_i
end

#to_hHash

Return a hash description of the worker.

Returns:

  • (Hash)

    The worker hash description.



332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/cloudtasker/worker.rb', line 332

def to_h
  {
    worker: self.class.to_s,
    job_id: job_id,
    job_args: job_args,
    job_meta: job_meta.to_h,
    job_retries: job_retries,
    job_attempts: job_attempts,
    job_queue: job_queue,
    task_id: task_id
  }
end

#to_json(*args) ⇒ String

Return a json representation of the worker.

Parameters:

  • *args (Array<any>)

    Arguments passed to to_json.

Returns:

  • (String)

    The worker json representation.



352
353
354
# File 'lib/cloudtasker/worker.rb', line 352

def to_json(*args)
  to_h.to_json(*args)
end