Class: Cloudtasker::Backend::GoogleCloudTaskV1
- Inherits:
-
Object
- Object
- Cloudtasker::Backend::GoogleCloudTaskV1
- Defined in:
- lib/cloudtasker/backend/google_cloud_task_v1.rb
Overview
Manage tasks pushed to GCP Cloud Task
Instance Attribute Summary collapse
-
#gcp_task ⇒ Object
Returns the value of attribute gcp_task.
Class Method Summary collapse
-
.client ⇒ Google::Cloud::Tasks
Return the Google Cloud Task client.
-
.config ⇒ Cloudtasker::Config
Return the cloudtasker configuration.
-
.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Create a new task.
-
.delete(id) ⇒ Object
Delete a task by id.
-
.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Find a task by id.
-
.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?
Return a protobuf duration.
-
.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?
Return a protobuf timestamp specifying how to wait before running a task.
-
.format_task_payload(payload) ⇒ Hash
Format the job payload sent to Cloud Tasks.
-
.queue_path(queue_name) ⇒ String
Return the fully qualified path for the Cloud Task queue.
-
.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2beta3::Queue
Create the queue configured in Cloudtasker if it does not already exist.
-
.with_gax_retries(&block) ⇒ Object
Helper method encapsulating the retry strategy for GAX calls.
Instance Method Summary collapse
-
#initialize(gcp_task) ⇒ GoogleCloudTaskV1
constructor
Build a new instance of the class.
-
#relative_queue ⇒ String
Return the relative queue (queue name minus prefix) the task is in.
-
#to_h ⇒ Hash
Return a hash description of the task.
Constructor Details
#initialize(gcp_task) ⇒ GoogleCloudTaskV1
Build a new instance of the class.
189 190 191 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 189 def initialize(gcp_task) @gcp_task = gcp_task end |
Instance Attribute Details
#gcp_task ⇒ Object
Returns the value of attribute gcp_task.
10 11 12 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 10 def gcp_task @gcp_task end |
Class Method Details
.client ⇒ Google::Cloud::Tasks
Return the Google Cloud Task client.
49 50 51 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 49 def self.client @client ||= ::Google::Cloud::Tasks.new(version: :v2beta3) end |
.config ⇒ Cloudtasker::Config
Return the cloudtasker configuration. See Cloudtasker#configure.
58 59 60 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 58 def self.config Cloudtasker.config end |
.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Create a new task.
154 155 156 157 158 159 160 161 162 163 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 154 def self.create(payload) payload = format_task_payload(payload) # Extract relative queue name relative_queue = payload.delete(:queue) # Create task resp = with_gax_retries { client.create_task(queue_path(relative_queue), payload) } resp ? new(resp) : nil end |
.delete(id) ⇒ Object
Delete a task by id.
170 171 172 173 174 175 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 170 def self.delete(id) with_gax_retries { client.delete_task(id) } rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound, Google::Gax::PermissionDeniedError # The ID does not exist nil end |
.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Find a task by id.
139 140 141 142 143 144 145 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 139 def self.find(id) resp = with_gax_retries { client.get_task(id) } resp ? new(resp) : nil rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound # The ID does not exist nil end |
.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?
Return a protobuf duration.
99 100 101 102 103 104 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 99 def self.format_protobuf_duration(duration) return nil unless duration # Generate protobuf timestamp Google::Protobuf::Duration.new.tap { |e| e.seconds = duration.to_i } end |
.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?
Return a protobuf timestamp specifying how to wait before running a task.
85 86 87 88 89 90 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 85 def self.format_protobuf_time(schedule_time) return nil unless schedule_time # Generate protobuf timestamp Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i } end |
.format_task_payload(payload) ⇒ Hash
Format the job payload sent to Cloud Tasks.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 113 def self.format_task_payload(payload) payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup # Format schedule time to Google::Protobuf::Timestamp payload[:schedule_time] = format_protobuf_time(payload[:schedule_time]) # Format dispatch_deadline to Google::Protobuf::Duration payload[:dispatch_deadline] = format_protobuf_duration(payload[:dispatch_deadline]) # Encode job content to support UTF-8. Google Cloud Task # expect content to be ASCII-8BIT compatible (binary) payload[:http_request][:headers] ||= {} payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json' payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64' payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body]) payload.compact end |
.queue_path(queue_name) ⇒ String
Return the fully qualified path for the Cloud Task queue.
69 70 71 72 73 74 75 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 69 def self.queue_path(queue_name) client.queue_path( config.gcp_project_id, config.gcp_location_id, [config.gcp_queue_prefix, queue_name].map(&:presence).compact.join('-') ) end |
.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2beta3::Queue
Create the queue configured in Cloudtasker if it does not already exist.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 21 def self.setup_queue(name: nil, concurrency: nil, retries: nil) # Build full queue path queue_name = name || Cloudtasker::Config::DEFAULT_JOB_QUEUE full_queue_name = queue_path(queue_name) # Try to get existing queue client.get_queue(full_queue_name) rescue Google::Gax::RetryError # Extract options queue_concurrency = (concurrency || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i queue_retries = (retries || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i # Create queue on 'not found' error client.create_queue( client.location_path(config.gcp_project_id, config.gcp_location_id), { name: full_queue_name, retry_config: { max_attempts: queue_retries }, rate_limits: { max_concurrent_dispatches: queue_concurrency } } ) end |
.with_gax_retries(&block) ⇒ Object
Helper method encapsulating the retry strategy for GAX calls
180 181 182 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 180 def self.with_gax_retries(&block) Retriable.retriable(on: [Google::Gax::UnavailableError], tries: 3, &block) end |
Instance Method Details
#relative_queue ⇒ String
Return the relative queue (queue name minus prefix) the task is in.
198 199 200 201 202 203 204 205 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 198 def relative_queue gcp_task .name .match(%r{/queues/([^/]+)}) &.captures &.first &.sub(/^#{self.class.config.gcp_queue_prefix}-/, '') end |
#to_h ⇒ Hash
Return a hash description of the task.
212 213 214 215 216 217 218 219 220 221 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 212 def to_h { id: gcp_task.name, http_request: gcp_task.to_h[:http_request], schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i, dispatch_deadline: gcp_task.to_h.dig(:dispatch_deadline, :seconds).to_i, retries: gcp_task.to_h[:response_count], queue: relative_queue } end |