Class: Cloudtasker::WorkerHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/worker_handler.rb

Overview

Build, serialize and schedule tasks on the processing backend.

Constant Summary collapse

JWT_ALG =

Alrogith used to sign the verification token

'HS256'
REDIS_PAYLOAD_NAMESPACE =

Sub-namespace to use for redis keys when storing payloads in Redis

'payload'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ WorkerHandler

Prepare a new cloud task.

Parameters:



144
145
146
# File 'lib/cloudtasker/worker_handler.rb', line 144

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/cloudtasker/worker_handler.rb', line 8

def worker
  @worker
end

Class Method Details

.execute_from_payload!(input_payload) ⇒ Any

Execute a task worker from a task payload

Parameters:

  • input_payload (Hash)

    The Cloud Task payload.

Returns:

  • (Any)

    The return value of the worker perform method.



74
75
76
# File 'lib/cloudtasker/worker_handler.rb', line 74

def self.execute_from_payload!(input_payload)
  with_worker_handling(input_payload, &:execute)
end

.extract_payload(input_payload) ⇒ Hash

Return the argument payload key (if present) along with the actual worker payload.

If the payload was stored in Redis then retrieve it.

Returns:

  • (Hash)

    Hash



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/cloudtasker/worker_handler.rb', line 123

def self.extract_payload(input_payload)
  # Get references
  payload = JSON.parse(input_payload.to_json, symbolize_names: true)
  args_payload_id = payload.delete(:job_args_payload_id)
  args_payload_key = args_payload_id ? key([REDIS_PAYLOAD_NAMESPACE, args_payload_id].join('/')) : nil

  # Retrieve the actual worker args payload
  args_payload = args_payload_key ? redis.fetch(args_payload_key) : payload[:job_args]

  # Return the payload
  {
    args_payload_key: args_payload_key,
    payload: payload.merge(job_args: args_payload)
  }
end

.key(val) ⇒ String

Return a namespaced key

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



24
25
26
27
28
# File 'lib/cloudtasker/worker_handler.rb', line 24

def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end

.log_execution_error(worker, error) ⇒ Object

Log error on execution failure.

Parameters:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/cloudtasker/worker_handler.rb', line 50

def self.log_execution_error(worker, error)
  # ActiveJob has its own error logging. No need to double log the error.
  # Note: we use string matching instead of class matching as
  # ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper might not be loaded
  return if worker.class.to_s =~ /^ActiveJob::/

  # Do not log error when a retry was specifically requested
  return if error.is_a?(RetryWorkerError)

  # Choose logger to use based on context
  # Worker will be nil on InvalidWorkerError - in that case we use generic logging
  logger = worker&.logger || Cloudtasker.logger

  # Log error
  logger.error(error)
end

.redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



35
36
37
38
39
40
# File 'lib/cloudtasker/worker_handler.rb', line 35

def self.redis
  @redis ||= begin
    require 'cloudtasker/redis_client'
    RedisClient.new
  end
end

.with_worker_handling(input_payload) {|Hash| ... } ⇒ Any

Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.

Parameters:

  • payload (Hash)

    The full job payload

Yields:

  • (Hash)

    The actual payload to use to process the job.

Returns:

  • (Any)

    The block result



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/cloudtasker/worker_handler.rb', line 88

def self.with_worker_handling(input_payload)
  # Extract payload information
  extracted_payload = extract_payload(input_payload)
  payload = extracted_payload[:payload]
  args_payload_key = extracted_payload[:args_payload_key]

  # Build worker
  worker = Cloudtasker::Worker.from_hash(payload) || raise(InvalidWorkerError)

  # Yied worker
  resp = yield(worker)

  # Delete stored args payload if job has completed
  redis.del(args_payload_key) if args_payload_key && !worker.job_reenqueued

  resp
rescue DeadWorkerError => e
  # Delete stored args payload if job is dead
  redis.del(args_payload_key) if args_payload_key
  log_execution_error(worker, e)
  Cloudtasker.config.on_dead.call(e, worker)
  raise(e)
rescue StandardError => e
  log_execution_error(worker, e)
  Cloudtasker.config.on_error.call(e, worker)
  raise(e)
end

Instance Method Details

#schedule(time_at: nil) ⇒ Cloudtasker::CloudTask

Schedule the task on GCP Cloud Task.

Parameters:

  • time_at (Integer, nil) (defaults to: nil)

    A unix timestamp specifying when to run the job. Leave to ‘nil` to run now.

Returns:



242
243
244
245
246
247
248
# File 'lib/cloudtasker/worker_handler.rb', line 242

def schedule(time_at: nil)
  # Generate task payload
  task = task_payload.merge(schedule_time: time_at).compact

  # Create and return remote task
  CloudTask.create(task)
end

#store_payload_in_redis?Boolean

Return true if the worker args must be stored in Redis.

Returns:

  • (Boolean)

    True if the payload must be stored in redis.



179
180
181
182
# File 'lib/cloudtasker/worker_handler.rb', line 179

def store_payload_in_redis?
  Cloudtasker.config.redis_payload_storage_threshold &&
    worker.job_args.to_json.bytesize > (Cloudtasker.config.redis_payload_storage_threshold * 1024)
end

#task_payloadHash

Return the full task configuration sent to Cloud Task

Returns:

  • (Hash)

    The task body



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/cloudtasker/worker_handler.rb', line 153

def task_payload
  # Generate content
  worker_payload_json = worker_payload.to_json

  # Build payload
  {
    http_request: {
      http_method: 'POST',
      url: Cloudtasker.config.processor_url,
      headers: {
        Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json',
        Cloudtasker::Config::CT_SIGNATURE_HEADER => Authenticator.sign_payload(worker_payload_json)
      }.compact,
      oidc_token: Cloudtasker.config.oidc,
      body: worker_payload_json
    }.compact,
    dispatch_deadline: worker.dispatch_deadline.to_i,
    queue: worker.job_queue
  }
end

#worker_args_payloadHash

Return the payload to use for job arguments. This payload is merged inside the #worker_payload.

If the argument payload must be stored in Redis then returns: ‘{ job_args_payload_id: <worker_id> }`

If the argument payload must be natively handled by the backend then returns: ‘{ job_args: […] }`

Returns:

  • (Hash)

    The worker args payload.



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/cloudtasker/worker_handler.rb', line 197

def worker_args_payload
  @worker_args_payload ||= if store_payload_in_redis?
                             # Store payload in Redis
                             self.class.redis.write(
                               self.class.key([REDIS_PAYLOAD_NAMESPACE, worker.job_id].join('/')),
                               worker.job_args
                             )

                             # Return reference to args payload
                             { job_args_payload_id: worker.job_id }
                           else
                             # Return regular job args payload
                             { job_args: worker.job_args }
                           end
end

#worker_payloadHash

Return the task payload that Google Task will eventually send to the job processor.

The payload includes the worker name and the arguments to pass to the worker.

The worker arguments should use primitive types as much as possible as all arguments will be serialized to JSON.

Returns:

  • (Hash)

    The job payload



225
226
227
228
229
230
231
232
# File 'lib/cloudtasker/worker_handler.rb', line 225

def worker_payload
  @worker_payload ||= {
    worker: worker.job_class_name,
    job_queue: worker.job_queue,
    job_id: worker.job_id,
    job_meta: worker.job_meta.to_h
  }.merge(worker_args_payload)
end