Class: Cloudtasker::Backend::GoogleCloudTaskV1
- Inherits:
-
Object
- Object
- Cloudtasker::Backend::GoogleCloudTaskV1
- Defined in:
- lib/cloudtasker/backend/google_cloud_task_v1.rb
Overview
Manage tasks pushed to GCP Cloud Task
Instance Attribute Summary collapse
-
#gcp_task ⇒ Object
Returns the value of attribute gcp_task.
Class Method Summary collapse
-
.client ⇒ Google::Cloud::Tasks
Return the Google Cloud Task client.
-
.config ⇒ Cloudtasker::Config
Return the cloudtasker configuration.
-
.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Create a new task.
-
.delete(id) ⇒ Object
Delete a task by id.
-
.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Find a task by id.
-
.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?
Return a protobuf duration.
-
.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?
Return a protobuf timestamp specifying how to wait before running a task.
-
.format_task_payload(payload) ⇒ Hash
Format the job payload sent to Cloud Tasks.
-
.queue_path(queue_name) ⇒ String
Return the fully qualified path for the Cloud Task queue.
-
.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2beta3::Queue
Create the queue configured in Cloudtasker if it does not already exist.
-
.with_gax_retries(&block) ⇒ Object
Helper method encapsulating the retry strategy for GAX calls.
Instance Method Summary collapse
-
#initialize(gcp_task) ⇒ GoogleCloudTaskV1
constructor
Build a new instance of the class.
-
#relative_queue ⇒ String
Return the relative queue (queue name minus prefix) the task is in.
-
#to_h ⇒ Hash
Return a hash description of the task.
Constructor Details
#initialize(gcp_task) ⇒ GoogleCloudTaskV1
Build a new instance of the class.
193 194 195 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 193 def initialize(gcp_task) @gcp_task = gcp_task end |
Instance Attribute Details
#gcp_task ⇒ Object
Returns the value of attribute gcp_task.
10 11 12 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 10 def gcp_task @gcp_task end |
Class Method Details
.client ⇒ Google::Cloud::Tasks
Return the Google Cloud Task client.
49 50 51 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 49 def self.client @client ||= ::Google::Cloud::Tasks.new(version: :v2beta3) end |
.config ⇒ Cloudtasker::Config
Return the cloudtasker configuration. See Cloudtasker#configure.
58 59 60 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 58 def self.config Cloudtasker.config end |
.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Create a new task.
158 159 160 161 162 163 164 165 166 167 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 158 def self.create(payload) payload = format_task_payload(payload) # Extract relative queue name relative_queue = payload.delete(:queue) # Create task resp = with_gax_retries { client.create_task(queue_path(relative_queue), payload) } resp ? new(resp) : nil end |
.delete(id) ⇒ Object
Delete a task by id.
174 175 176 177 178 179 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 174 def self.delete(id) with_gax_retries { client.delete_task(id) } rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound, Google::Gax::PermissionDeniedError # The ID does not exist nil end |
.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?
Find a task by id.
143 144 145 146 147 148 149 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 143 def self.find(id) resp = with_gax_retries { client.get_task(id) } resp ? new(resp) : nil rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound # The ID does not exist nil end |
.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?
Return a protobuf duration.
99 100 101 102 103 104 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 99 def self.format_protobuf_duration(duration) return nil unless duration # Generate protobuf timestamp Google::Protobuf::Duration.new.tap { |e| e.seconds = duration.to_i } end |
.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?
Return a protobuf timestamp specifying how to wait before running a task.
85 86 87 88 89 90 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 85 def self.format_protobuf_time(schedule_time) return nil unless schedule_time # Generate protobuf timestamp Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i } end |
.format_task_payload(payload) ⇒ Hash
Format the job payload sent to Cloud Tasks.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 113 def self.format_task_payload(payload) payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup # Format schedule time to Google::Protobuf::Timestamp payload[:schedule_time] = format_protobuf_time(payload[:schedule_time]) # Format dispatch_deadline to Google::Protobuf::Duration payload[:dispatch_deadline] = format_protobuf_duration(payload[:dispatch_deadline]) # Setup headers payload[:http_request][:headers] ||= {} payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json' # Conditionally encode job content to support UTF-8. # Google Cloud Task expect content to be ASCII-8BIT compatible (binary) if config.base64_encode_body payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64' payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body]) end payload.compact end |
.queue_path(queue_name) ⇒ String
Return the fully qualified path for the Cloud Task queue.
69 70 71 72 73 74 75 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 69 def self.queue_path(queue_name) client.queue_path( config.gcp_project_id, config.gcp_location_id, [config.gcp_queue_prefix, queue_name].map(&:presence).compact.join('-') ) end |
.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2beta3::Queue
Create the queue configured in Cloudtasker if it does not already exist.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 21 def self.setup_queue(name: nil, concurrency: nil, retries: nil) # Build full queue path queue_name = name || Cloudtasker::Config::DEFAULT_JOB_QUEUE full_queue_name = queue_path(queue_name) # Try to get existing queue client.get_queue(full_queue_name) rescue Google::Gax::RetryError # Extract options queue_concurrency = (concurrency || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i queue_retries = (retries || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i # Create queue on 'not found' error client.create_queue( client.location_path(config.gcp_project_id, config.gcp_location_id), { name: full_queue_name, retry_config: { max_attempts: queue_retries }, rate_limits: { max_concurrent_dispatches: queue_concurrency } } ) end |
.with_gax_retries(&block) ⇒ Object
Helper method encapsulating the retry strategy for GAX calls
184 185 186 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 184 def self.with_gax_retries(&block) Retriable.retriable(on: [Google::Gax::UnavailableError], tries: 3, &block) end |
Instance Method Details
#relative_queue ⇒ String
Return the relative queue (queue name minus prefix) the task is in.
202 203 204 205 206 207 208 209 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 202 def relative_queue gcp_task .name .match(%r{/queues/([^/]+)}) &.captures &.first &.sub(/^#{self.class.config.gcp_queue_prefix}-/, '') end |
#to_h ⇒ Hash
Return a hash description of the task.
216 217 218 219 220 221 222 223 224 225 |
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 216 def to_h { id: gcp_task.name, http_request: gcp_task.to_h[:http_request], schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i, dispatch_deadline: gcp_task.to_h.dig(:dispatch_deadline, :seconds).to_i, retries: gcp_task.to_h[:response_count], queue: relative_queue } end |