Class: Cloudtasker::Backend::MemoryTask
- Inherits:
-
Object
- Object
- Cloudtasker::Backend::MemoryTask
- Defined in:
- lib/cloudtasker/backend/memory_task.rb
Overview
Manage local tasks pushed to memory. Used for testing.
Instance Attribute Summary collapse
-
#http_request ⇒ Object
readonly
Returns the value of attribute http_request.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#job_retries ⇒ Object
Returns the value of attribute job_retries.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#schedule_time ⇒ Object
readonly
Returns the value of attribute schedule_time.
Class Method Summary collapse
-
.all(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>
Return all enqueued tasks.
-
.clear(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>
Clear the queue.
-
.create(payload) ⇒ Object
Push a job to the queue.
-
.delete(id) ⇒ Object
Delete a task by id.
-
.drain(worker_class_name = nil) ⇒ Array<any>
Run all Tasks in the queue.
-
.find(id) ⇒ Cloudtasker::Backend::MemoryTask?
Get a task by id.
-
.inline_mode? ⇒ Boolean
Return true if we are in test inline execution mode.
-
.queue ⇒ Array<Hash>
Return the task queue.
-
.raise_errors? ⇒ Boolean
Return true if errors must be raised immediately.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#execute ⇒ Any
Execute the task.
-
#initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs) ⇒ MemoryTask
constructor
Build a new instance of the class.
-
#payload ⇒ Hash
Return task payload.
-
#to_h ⇒ Hash
Return a hash description of the task.
-
#worker_class_name ⇒ String
Return the worker class from the task payload.
Constructor Details
#initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs) ⇒ MemoryTask
Build a new instance of the class.
129 130 131 132 133 134 135 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 129 def initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs) @id = id @http_request = http_request @schedule_time = Time.at(schedule_time || 0) @queue = queue @job_retries = job_retries || 0 end |
Instance Attribute Details
#http_request ⇒ Object (readonly)
Returns the value of attribute http_request.
9 10 11 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 9 def http_request @http_request end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
9 10 11 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 9 def id @id end |
#job_retries ⇒ Object
Returns the value of attribute job_retries.
8 9 10 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 8 def job_retries @job_retries end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
9 10 11 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 9 def queue @queue end |
#schedule_time ⇒ Object (readonly)
Returns the value of attribute schedule_time.
9 10 11 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 9 def schedule_time @schedule_time end |
Class Method Details
.all(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>
Return all enqueued tasks. A worker class name can be specified to filter the returned results.
58 59 60 61 62 63 64 65 66 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 58 def self.all(worker_class_name = nil) # Always return a copy of the queue so that tasks can safely be removed # (via #execute) without tampering with the enumerator if worker_class_name queue.select { |e| e.worker_class_name == worker_class_name } else queue.dup end end |
.clear(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>
Clear the queue.
114 115 116 117 118 119 120 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 114 def self.clear(worker_class_name = nil) if worker_class_name queue.reject! { |e| e.worker_class_name == worker_class_name } else queue.clear end end |
.create(payload) ⇒ Object
Push a job to the queue.
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 73 def self.create(payload) id = payload[:id] || SecureRandom.uuid payload = payload.merge(schedule_time: payload[:schedule_time].to_i) # Save task task = new(**payload, id: id) queue << task # Execute task immediately if in testing and inline mode enabled task.execute if inline_mode? task end |
.delete(id) ⇒ Object
Delete a task by id.
103 104 105 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 103 def self.delete(id) queue.reject! { |e| e.id == id } end |
.drain(worker_class_name = nil) ⇒ Array<any>
Run all Tasks in the queue. Optionally filter which tasks to run based on the worker class name.
46 47 48 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 46 def self.drain(worker_class_name = nil) all(worker_class_name).map(&:execute) end |
.find(id) ⇒ Cloudtasker::Backend::MemoryTask?
Get a task by id.
94 95 96 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 94 def self.find(id) queue.find { |e| e.id == id } end |
.inline_mode? ⇒ Boolean
Return true if we are in test inline execution mode.
16 17 18 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 16 def self.inline_mode? defined?(Cloudtasker::Testing) && Cloudtasker::Testing.inline? end |
.queue ⇒ Array<Hash>
Return the task queue. A worker class name
34 35 36 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 34 def self.queue @queue ||= [] end |
.raise_errors? ⇒ Boolean
Return true if errors must be raised immediately
25 26 27 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 25 def self.raise_errors? defined?(Cloudtasker::Testing) && Cloudtasker::Testing.raise_errors? end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
197 198 199 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 197 def ==(other) other.is_a?(self.class) && other.id == id end |
#execute ⇒ Any
Execute the task.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 174 def execute # Execute worker worker_payload = payload.merge(job_retries: job_retries, task_id: id) resp = WorkerHandler.with_worker_handling(worker_payload, &:execute) # Delete task self.class.delete(id) resp rescue DeadWorkerError => e self.class.delete(id) raise(e) if self.class.raise_errors? rescue StandardError => e self.job_retries += 1 raise(e) if self.class.raise_errors? end |
#payload ⇒ Hash
Return task payload.
142 143 144 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 142 def payload @payload ||= JSON.parse(http_request[:body], symbolize_names: true) end |
#to_h ⇒ Hash
Return a hash description of the task.
160 161 162 163 164 165 166 167 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 160 def to_h { id: id, http_request: http_request, schedule_time: schedule_time.to_i, queue: queue } end |
#worker_class_name ⇒ String
Return the worker class from the task payload.
151 152 153 |
# File 'lib/cloudtasker/backend/memory_task.rb', line 151 def worker_class_name payload[:worker] end |