Class: Cloudtasker::WorkerHandler
- Inherits:
-
Object
- Object
- Cloudtasker::WorkerHandler
- Defined in:
- lib/cloudtasker/worker_handler.rb
Overview
Build, serialize and schedule tasks on the processing backend.
Constant Summary collapse
- JWT_ALG =
Alrogith used to sign the verification token
'HS256'
- REDIS_PAYLOAD_NAMESPACE =
Sub-namespace to use for redis keys when storing payloads in Redis
'payload'
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.execute_from_payload!(input_payload) ⇒ Any
Execute a task worker from a task payload.
-
.extract_payload(input_payload) ⇒ Hash
Return the argument payload key (if present) along with the actual worker payload.
-
.key(val) ⇒ String
Return a namespaced key.
-
.log_execution_error(worker, error) ⇒ Object
Log error on execution failure.
-
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
-
.with_worker_handling(input_payload) {|Hash| ... } ⇒ Any
Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.
Instance Method Summary collapse
-
#initialize(worker) ⇒ WorkerHandler
constructor
Prepare a new cloud task.
-
#schedule(time_at: nil) ⇒ Cloudtasker::CloudTask
Schedule the task on GCP Cloud Task.
-
#store_payload_in_redis? ⇒ Boolean
Return true if the worker args must be stored in Redis.
-
#task_payload ⇒ Hash
Return the full task configuration sent to Cloud Task.
-
#worker_args_payload ⇒ Hash
Return the payload to use for job arguments.
-
#worker_payload ⇒ Hash
Return the task payload that Google Task will eventually send to the job processor.
Constructor Details
#initialize(worker) ⇒ WorkerHandler
Prepare a new cloud task.
144 145 146 |
# File 'lib/cloudtasker/worker_handler.rb', line 144 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/cloudtasker/worker_handler.rb', line 8 def worker @worker end |
Class Method Details
.execute_from_payload!(input_payload) ⇒ Any
Execute a task worker from a task payload
74 75 76 |
# File 'lib/cloudtasker/worker_handler.rb', line 74 def self.execute_from_payload!(input_payload) with_worker_handling(input_payload, &:execute) end |
.extract_payload(input_payload) ⇒ Hash
Return the argument payload key (if present) along with the actual worker payload.
If the payload was stored in Redis then retrieve it.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/cloudtasker/worker_handler.rb', line 123 def self.extract_payload(input_payload) # Get references payload = JSON.parse(input_payload.to_json, symbolize_names: true) args_payload_id = payload.delete(:job_args_payload_id) args_payload_key = args_payload_id ? key([REDIS_PAYLOAD_NAMESPACE, args_payload_id].join('/')) : nil # Retrieve the actual worker args payload args_payload = args_payload_key ? redis.fetch(args_payload_key) : payload[:job_args] # Return the payload { args_payload_key: args_payload_key, payload: payload.merge(job_args: args_payload) } end |
.key(val) ⇒ String
Return a namespaced key
24 25 26 27 28 |
# File 'lib/cloudtasker/worker_handler.rb', line 24 def self.key(val) return nil if val.nil? [to_s.underscore, val.to_s].join('/') end |
.log_execution_error(worker, error) ⇒ Object
Log error on execution failure.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/cloudtasker/worker_handler.rb', line 50 def self.log_execution_error(worker, error) # ActiveJob has its own error logging. No need to double log the error. # Note: we use string matching instead of class matching as # ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper might not be loaded return if worker.class.to_s =~ /^ActiveJob::/ # Do not log error when a retry was specifically requested return if error.is_a?(RetryWorkerError) # Choose logger to use based on context # Worker will be nil on InvalidWorkerError - in that case we use generic logging logger = worker&.logger || Cloudtasker.logger # Log error logger.error(error) end |
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
35 36 37 38 39 40 |
# File 'lib/cloudtasker/worker_handler.rb', line 35 def self.redis @redis ||= begin require 'cloudtasker/redis_client' RedisClient.new end end |
.with_worker_handling(input_payload) {|Hash| ... } ⇒ Any
Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/cloudtasker/worker_handler.rb', line 88 def self.with_worker_handling(input_payload) # Extract payload information extracted_payload = extract_payload(input_payload) payload = extracted_payload[:payload] args_payload_key = extracted_payload[:args_payload_key] # Build worker worker = Cloudtasker::Worker.from_hash(payload) || raise(InvalidWorkerError) # Yied worker resp = yield(worker) # Delete stored args payload if job has completed redis.del(args_payload_key) if args_payload_key && !worker.job_reenqueued resp rescue DeadWorkerError => e # Delete stored args payload if job is dead redis.del(args_payload_key) if args_payload_key log_execution_error(worker, e) Cloudtasker.config.on_dead.call(e, worker) raise(e) rescue StandardError => e log_execution_error(worker, e) Cloudtasker.config.on_error.call(e, worker) raise(e) end |
Instance Method Details
#schedule(time_at: nil) ⇒ Cloudtasker::CloudTask
Schedule the task on GCP Cloud Task.
242 243 244 245 246 247 248 |
# File 'lib/cloudtasker/worker_handler.rb', line 242 def schedule(time_at: nil) # Generate task payload task = task_payload.merge(schedule_time: time_at).compact # Create and return remote task CloudTask.create(task) end |
#store_payload_in_redis? ⇒ Boolean
Return true if the worker args must be stored in Redis.
179 180 181 182 |
# File 'lib/cloudtasker/worker_handler.rb', line 179 def store_payload_in_redis? Cloudtasker.config.redis_payload_storage_threshold && worker.job_args.to_json.bytesize > (Cloudtasker.config.redis_payload_storage_threshold * 1024) end |
#task_payload ⇒ Hash
Return the full task configuration sent to Cloud Task
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/cloudtasker/worker_handler.rb', line 153 def task_payload # Generate content worker_payload_json = worker_payload.to_json # Build payload { http_request: { http_method: 'POST', url: Cloudtasker.config.processor_url, headers: { Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json', Cloudtasker::Config::CT_SIGNATURE_HEADER => Authenticator.sign_payload(worker_payload_json) }.compact, oidc_token: Cloudtasker.config.oidc, body: worker_payload_json }.compact, dispatch_deadline: worker.dispatch_deadline.to_i, queue: worker.job_queue } end |
#worker_args_payload ⇒ Hash
Return the payload to use for job arguments. This payload is merged inside the #worker_payload.
If the argument payload must be stored in Redis then returns: ‘{ job_args_payload_id: <worker_id> }`
If the argument payload must be natively handled by the backend then returns: ‘{ job_args: […] }`
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/cloudtasker/worker_handler.rb', line 197 def worker_args_payload @worker_args_payload ||= if store_payload_in_redis? # Store payload in Redis self.class.redis.write( self.class.key([REDIS_PAYLOAD_NAMESPACE, worker.job_id].join('/')), worker.job_args ) # Return reference to args payload { job_args_payload_id: worker.job_id } else # Return regular job args payload { job_args: worker.job_args } end end |
#worker_payload ⇒ Hash
Return the task payload that Google Task will eventually send to the job processor.
The payload includes the worker name and the arguments to pass to the worker.
The worker arguments should use primitive types as much as possible as all arguments will be serialized to JSON.
225 226 227 228 229 230 231 232 |
# File 'lib/cloudtasker/worker_handler.rb', line 225 def worker_payload @worker_payload ||= { worker: worker.job_class_name, job_queue: worker.job_queue, job_id: worker.job_id, job_meta: worker..to_h }.merge(worker_args_payload) end |