Class: Cloudtasker::Backend::RedisTask

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/backend/redis_task.rb

Overview

Manage local tasks pushed to Redis

Constant Summary collapse

RETRY_INTERVAL =

seconds

20

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil) ⇒ RedisTask

Build a new instance of the class.

Parameters:

  • id (String)

    The ID of the task.

  • http_request (Hash)

    The HTTP request content.

  • schedule_time (Integer) (defaults to: nil)

    When to run the task (Unix timestamp)

  • retries (Integer) (defaults to: 0)

    The number of times the job failed.

  • dispatch_deadline (Integer) (defaults to: nil)

    The dispatch_deadline in seconds.



128
129
130
131
132
133
134
135
# File 'lib/cloudtasker/backend/redis_task.rb', line 128

def initialize(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil)
  @id = id
  @http_request = http_request
  @schedule_time = Time.at(schedule_time || 0)
  @retries = retries || 0
  @queue = queue || Config::DEFAULT_JOB_QUEUE
  @dispatch_deadline = dispatch_deadline || Config::DEFAULT_DISPATCH_DEADLINE
end

Instance Attribute Details

#dispatch_deadlineObject (readonly)

Returns the value of attribute dispatch_deadline.



10
11
12
# File 'lib/cloudtasker/backend/redis_task.rb', line 10

def dispatch_deadline
  @dispatch_deadline
end

#http_requestObject (readonly)

Returns the value of attribute http_request.



10
11
12
# File 'lib/cloudtasker/backend/redis_task.rb', line 10

def http_request
  @http_request
end

#idObject (readonly)

Returns the value of attribute id.



10
11
12
# File 'lib/cloudtasker/backend/redis_task.rb', line 10

def id
  @id
end

#queueObject (readonly)

Returns the value of attribute queue.



10
11
12
# File 'lib/cloudtasker/backend/redis_task.rb', line 10

def queue
  @queue
end

#retriesObject (readonly)

Returns the value of attribute retries.



10
11
12
# File 'lib/cloudtasker/backend/redis_task.rb', line 10

def retries
  @retries
end

#schedule_timeObject (readonly)

Returns the value of attribute schedule_time.



10
11
12
# File 'lib/cloudtasker/backend/redis_task.rb', line 10

def schedule_time
  @schedule_time
end

Class Method Details

.allArray<Cloudtasker::Backend::RedisTask>

Return all tasks stored in Redis.

Returns:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/cloudtasker/backend/redis_task.rb', line 39

def self.all
  if redis.exists?(key)
    # Use Schedule Set if available
    redis.smembers(key).map { |id| find(id) }.compact
  else
    # Fallback to redis key matching and migrate tasks
    # to use Task Set instead.
    redis.search(key('*')).map do |gid|
      task_id = gid.sub(key(''), '')
      redis.sadd(key, [task_id])
      find(task_id)
    end
  end
end

.create(payload) ⇒ Object

Push a job to the queue.

Parameters:

  • payload (Hash)

    The Cloud Task payload.



85
86
87
88
89
90
91
92
93
# File 'lib/cloudtasker/backend/redis_task.rb', line 85

def self.create(payload)
  id = SecureRandom.uuid
  payload = payload.merge(schedule_time: payload[:schedule_time].to_i)

  # Save job
  redis.write(key(id), payload)
  redis.sadd(key, [id])
  new(**payload.merge(id: id))
end

.delete(id) ⇒ Object

Delete a task by id.

Parameters:

  • id (String)

    The task id.



114
115
116
117
# File 'lib/cloudtasker/backend/redis_task.rb', line 114

def self.delete(id)
  redis.srem(key, [id])
  redis.del(key(id))
end

.find(id) ⇒ Cloudtasker::Backend::RedisTask?

Get a task by id.

Parameters:

  • id (String)

    The id of the task.

Returns:



102
103
104
105
106
107
# File 'lib/cloudtasker/backend/redis_task.rb', line 102

def self.find(id)
  gid = key(id)
  return nil unless (payload = redis.fetch(gid))

  new(**payload.merge(id: id))
end

.key(val = nil) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol, nil) (defaults to: nil)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



30
31
32
# File 'lib/cloudtasker/backend/redis_task.rb', line 30

def self.key(val = nil)
  [to_s.underscore, val].compact.map(&:to_s).join('/')
end

.pop(queue = nil) ⇒ Cloudtasker::Backend::RedisTask

Retrieve and remove a task from the queue.

Parameters:

  • queue (String) (defaults to: nil)

    The queue to retrieve items from.

Returns:



74
75
76
77
78
# File 'lib/cloudtasker/backend/redis_task.rb', line 74

def self.pop(queue = nil)
  redis.with_lock('cloudtasker/server') do
    ready_to_process(queue).first&.tap(&:destroy)
  end
end

.ready_to_process(queue = nil) ⇒ Array<Cloudtasker::Backend::RedisTask>

Return all tasks ready to process.

Parameters:

  • queue (String) (defaults to: nil)

    The queue to retrieve items from.

Returns:



61
62
63
64
65
# File 'lib/cloudtasker/backend/redis_task.rb', line 61

def self.ready_to_process(queue = nil)
  list = all.select { |e| e.schedule_time <= Time.now }
  list = list.select { |e| e.queue == queue } if queue
  list
end

.redisCloudtasker::RedisClient

Return the Cloudtasker redis client

Returns:



19
20
21
# File 'lib/cloudtasker/backend/redis_task.rb', line 19

def self.redis
  @redis ||= RedisClient.new
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



237
238
239
# File 'lib/cloudtasker/backend/redis_task.rb', line 237

def ==(other)
  other.is_a?(self.class) && other.id == id
end

#deliverObject

Deliver the task to the processing endpoint.



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/cloudtasker/backend/redis_task.rb', line 202

def deliver
  Cloudtasker.logger.info(format_log_message('Processing task...'))

  # Send request
  resp = http_client.request(request_content)

  # Delete task if successful
  if resp.code.to_s =~ /20\d/
    destroy
    Cloudtasker.logger.info(format_log_message('Task handled successfully'))
  else
    retry_later(RETRY_INTERVAL)
    Cloudtasker.logger.info(format_log_message("Task failure - Retry in #{RETRY_INTERVAL} seconds..."))
  end

  resp
rescue Errno::ECONNREFUSED
  retry_later(RETRY_INTERVAL)
  Cloudtasker.logger.info(format_log_message("Processor not ready - Retry in #{RETRY_INTERVAL} seconds..."))
rescue Net::ReadTimeout
  retry_later(RETRY_INTERVAL)
  Cloudtasker.logger.info(
    format_log_message(
      "Task deadline exceeded (#{dispatch_deadline}s) - Retry in #{RETRY_INTERVAL} seconds..."
    )
  )
end

#destroyObject

Remove the task from the queue.



195
196
197
# File 'lib/cloudtasker/backend/redis_task.rb', line 195

def destroy
  self.class.delete(id)
end

#gid<Type>

Return the namespaced task id

Returns:

  • (<Type>)

    The namespaced task id



167
168
169
# File 'lib/cloudtasker/backend/redis_task.rb', line 167

def gid
  self.class.key(id)
end

#redisClass

Return the redis client.

Returns:

  • (Class)

    The RedisClient.



142
143
144
# File 'lib/cloudtasker/backend/redis_task.rb', line 142

def redis
  self.class.redis
end

#retry_later(interval, opts = {}) ⇒ Object

Retry the task later.

Parameters:

  • interval (Integer)

    The delay in seconds before retrying the task

  • opts (Hash) (defaults to: {})

    Additional options

Options Hash (opts):

  • :is_error (Boolean)

    Increase number of retries. Default to true.



178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/cloudtasker/backend/redis_task.rb', line 178

def retry_later(interval, opts = {})
  is_error = opts.to_h.fetch(:is_error, true)

  redis.write(
    gid,
    retries: is_error ? retries + 1 : retries,
    http_request: http_request,
    schedule_time: (Time.now + interval).to_i,
    queue: queue,
    dispatch_deadline: dispatch_deadline
  )
  redis.sadd(self.class.key, [id])
end

#to_hHash

Return a hash description of the task.

Returns:

  • (Hash)

    A hash description of the task.



151
152
153
154
155
156
157
158
159
160
# File 'lib/cloudtasker/backend/redis_task.rb', line 151

def to_h
  {
    id: id,
    http_request: http_request,
    schedule_time: schedule_time.to_i,
    retries: retries,
    queue: queue,
    dispatch_deadline: dispatch_deadline
  }
end