Class: Cloudtasker::Backend::MemoryTask

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/backend/memory_task.rb

Overview

Manage local tasks pushed to memory. Used for testing.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs) ⇒ MemoryTask

Build a new instance of the class.

Parameters:

  • id (String)

    The ID of the task.

  • http_request (Hash)

    The HTTP request content.

  • schedule_time (Integer) (defaults to: nil)

    When to run the task (Unix timestamp)



129
130
131
132
133
134
135
# File 'lib/cloudtasker/backend/memory_task.rb', line 129

def initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs)
  @id = id
  @http_request = http_request
  @schedule_time = Time.at(schedule_time || 0)
  @queue = queue
  @job_retries = job_retries || 0
end

Instance Attribute Details

#http_requestObject (readonly)

Returns the value of attribute http_request.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def http_request
  @http_request
end

#idObject (readonly)

Returns the value of attribute id.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def id
  @id
end

#job_retriesObject

Returns the value of attribute job_retries.



8
9
10
# File 'lib/cloudtasker/backend/memory_task.rb', line 8

def job_retries
  @job_retries
end

#queueObject (readonly)

Returns the value of attribute queue.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def queue
  @queue
end

#schedule_timeObject (readonly)

Returns the value of attribute schedule_time.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def schedule_time
  @schedule_time
end

Class Method Details

.all(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>

Return all enqueued tasks. A worker class name can be specified to filter the returned results.

Parameters:

  • worker_class_name (String) (defaults to: nil)

    Filter tasks on worker class name.

Returns:



58
59
60
61
62
63
64
65
66
# File 'lib/cloudtasker/backend/memory_task.rb', line 58

def self.all(worker_class_name = nil)
  # Always return a copy of the queue so that tasks can safely be removed
  # (via #execute) without tampering with the enumerator
  if worker_class_name
    queue.select { |e| e.worker_class_name == worker_class_name }
  else
    queue.dup
  end
end

.clear(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>

Clear the queue.

Parameters:

  • worker_class_name (String) (defaults to: nil)

    Filter jobs on worker class name.

Returns:



114
115
116
117
118
119
120
# File 'lib/cloudtasker/backend/memory_task.rb', line 114

def self.clear(worker_class_name = nil)
  if worker_class_name
    queue.reject! { |e| e.worker_class_name == worker_class_name }
  else
    queue.clear
  end
end

.create(payload) ⇒ Object

Push a job to the queue.

Parameters:

  • payload (Hash)

    The Cloud Task payload.



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/cloudtasker/backend/memory_task.rb', line 73

def self.create(payload)
  id = payload[:id] || SecureRandom.uuid
  payload = payload.merge(schedule_time: payload[:schedule_time].to_i)

  # Save task
  task = new(**payload, id: id)
  queue << task

  # Execute task immediately if in testing and inline mode enabled
  task.execute if inline_mode?

  task
end

.delete(id) ⇒ Object

Delete a task by id.

Parameters:

  • id (String)

    The task id.



103
104
105
# File 'lib/cloudtasker/backend/memory_task.rb', line 103

def self.delete(id)
  queue.reject! { |e| e.id == id }
end

.drain(worker_class_name = nil) ⇒ Array<any>

Run all Tasks in the queue. Optionally filter which tasks to run based on the worker class name.

Parameters:

  • worker_class_name (String) (defaults to: nil)

    Run tasks for a specific worker class name.

Returns:

  • (Array<any>)

    The return values of the workers perform method.



46
47
48
# File 'lib/cloudtasker/backend/memory_task.rb', line 46

def self.drain(worker_class_name = nil)
  all(worker_class_name).map(&:execute)
end

.find(id) ⇒ Cloudtasker::Backend::MemoryTask?

Get a task by id.

Parameters:

  • id (String)

    The id of the task.

Returns:



94
95
96
# File 'lib/cloudtasker/backend/memory_task.rb', line 94

def self.find(id)
  queue.find { |e| e.id == id }
end

.inline_mode?Boolean

Return true if we are in test inline execution mode.

Returns:

  • (Boolean)

    True if inline mode enabled.



16
17
18
# File 'lib/cloudtasker/backend/memory_task.rb', line 16

def self.inline_mode?
  defined?(Cloudtasker::Testing) && Cloudtasker::Testing.inline?
end

.queueArray<Hash>

Return the task queue. A worker class name

Returns:

  • (Array<Hash>)

    <description>



34
35
36
# File 'lib/cloudtasker/backend/memory_task.rb', line 34

def self.queue
  @queue ||= []
end

.raise_errors?Boolean

Return true if errors must be raised immediately

Returns:

  • (Boolean)

    True if raise error mode is enabled.



25
26
27
# File 'lib/cloudtasker/backend/memory_task.rb', line 25

def self.raise_errors?
  defined?(Cloudtasker::Testing) && Cloudtasker::Testing.raise_errors?
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



197
198
199
# File 'lib/cloudtasker/backend/memory_task.rb', line 197

def ==(other)
  other.is_a?(self.class) && other.id == id
end

#executeAny

Execute the task.

Returns:

  • (Any)

    The return value of the worker perform method.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/cloudtasker/backend/memory_task.rb', line 174

def execute
  # Execute worker
  worker_payload = payload.merge(job_retries: job_retries, task_id: id)
  resp = WorkerHandler.with_worker_handling(worker_payload, &:execute)

  # Delete task
  self.class.delete(id)
  resp
rescue DeadWorkerError => e
  self.class.delete(id)
  raise(e) if self.class.raise_errors?
rescue StandardError => e
  self.job_retries += 1
  raise(e) if self.class.raise_errors?
end

#payloadHash

Return task payload.

Returns:

  • (Hash)

    The task payload.



142
143
144
# File 'lib/cloudtasker/backend/memory_task.rb', line 142

def payload
  @payload ||= JSON.parse(http_request[:body], symbolize_names: true)
end

#to_hHash

Return a hash description of the task.

Returns:

  • (Hash)

    A hash description of the task.



160
161
162
163
164
165
166
167
# File 'lib/cloudtasker/backend/memory_task.rb', line 160

def to_h
  {
    id: id,
    http_request: http_request,
    schedule_time: schedule_time.to_i,
    queue: queue
  }
end

#worker_class_nameString

Return the worker class from the task payload.

Returns:

  • (String)

    The task worker class name.



151
152
153
# File 'lib/cloudtasker/backend/memory_task.rb', line 151

def worker_class_name
  payload[:worker]
end