Module: Cloudtasker::Storable::Worker::ClassMethods

Defined in:
lib/cloudtasker/storable/worker.rb

Overview

Module class methods

Instance Method Summary collapse

Instance Method Details

#pull_all_from_store(namespace, page_size: 1000) ⇒ Object

Pull the jobs from the namespaced store and enqueue them.

Parameters:

  • namespace (String)

    The store namespace.

  • page_size (Integer) (defaults to: 1000)

    The number of items to pull on each page. Defaults to 1000.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cloudtasker/storable/worker.rb', line 56

def pull_all_from_store(namespace, page_size: 1000)
  items = nil

  while items.nil? || items.present?
    # Pull items
    items = redis.lpop(store_cache_key(namespace), page_size).to_a

    # For each item, execute block or enqueue it
    items.each do |args_json|
      worker_args = JSON.parse(args_json)

      if block_given?
        yield(worker_args)
      else
        perform_async(*worker_args)
      end
    end
  end
end

#push_many_to_store(namespace, args_list) ⇒ String

Push many workers to a namespaced store at once.

Parameters:

  • namespace (String)

    The store namespace

  • args_list (Array<Array<any>>)

    A list of arguments for each worker

Returns:

  • (String)

    The number of elements added to the store



46
47
48
# File 'lib/cloudtasker/storable/worker.rb', line 46

def push_many_to_store(namespace, args_list)
  redis.rpush(store_cache_key(namespace), args_list.map(&:to_json))
end

#push_to_store(namespace, *args) ⇒ String

Push the worker to a namespaced store.

Parameters:

  • namespace (String)

    The store namespace

  • *args (Array<any>)

    List of worker arguments

Returns:

  • (String)

    The number of elements added to the store



34
35
36
# File 'lib/cloudtasker/storable/worker.rb', line 34

def push_to_store(namespace, *args)
  redis.rpush(store_cache_key(namespace), [args.to_json])
end

#store_cache_key(namespace) ⇒ String

Return the namespaced store key used to store jobs that have been parked and should be manually popped later.

Parameters:

  • namespace (String)

    The user-provided store namespace

Returns:

  • (String)

    The full store cache key



22
23
24
# File 'lib/cloudtasker/storable/worker.rb', line 22

def store_cache_key(namespace)
  cache_key([Config::WORKER_STORE_PREFIX, namespace])
end