Class: Shoryuken::DefaultWorkerRegistry

Inherits:
WorkerRegistry show all
Defined in:
lib/shoryuken/default_worker_registry.rb

Overview

Default implementation of the worker registry. Stores and retrieves worker classes mapped to queue names.

Instance Method Summary collapse

Constructor Details

#initializeDefaultWorkerRegistry

Initializes a new DefaultWorkerRegistry with an empty workers hash



8
9
10
# File 'lib/shoryuken/default_worker_registry.rb', line 8

def initialize
  @workers = Shoryuken::Helpers::AtomicHash.new
end

Instance Method Details

#batch_receive_messages?(queue) ⇒ Boolean

Checks if a queue is configured for batch message receiving

Parameters:

  • queue (String)

    the queue name

Returns:

  • (Boolean)

    true if the queue’s worker has batch mode enabled



16
17
18
# File 'lib/shoryuken/default_worker_registry.rb', line 16

def batch_receive_messages?(queue)
  !!(@workers[queue] && @workers[queue].get_shoryuken_options['batch'])
end

#clearvoid

This method returns an undefined value.

Clears all registered workers



23
24
25
# File 'lib/shoryuken/default_worker_registry.rb', line 23

def clear
  @workers.clear
end

#fetch_worker(queue, message) ⇒ Object?

Fetches a worker instance for processing a message

Parameters:

Returns:

  • (Object, nil)

    a new worker instance or nil if not found



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/shoryuken/default_worker_registry.rb', line 32

def fetch_worker(queue, message)
  worker_class = !message.is_a?(Array) &&
                 message.message_attributes &&
                 message.message_attributes['shoryuken_class'] &&
                 message.message_attributes['shoryuken_class'][:string_value]

  worker_class = begin
    Shoryuken::Helpers::StringUtils.constantize(worker_class)
  rescue
    @workers[queue]
  end

  worker_class.new if worker_class
end

#queuesArray<String>

Returns all registered queue names

Returns:

  • (Array<String>)

    the queue names with registered workers



50
51
52
# File 'lib/shoryuken/default_worker_registry.rb', line 50

def queues
  @workers.keys
end

#register_worker(queue, clazz) ⇒ Class

Registers a worker class for a queue

Parameters:

  • queue (String)

    the queue name

  • clazz (Class)

    the worker class to register

Returns:

  • (Class)

    the registered worker class

Raises:



60
61
62
63
64
65
66
67
68
# File 'lib/shoryuken/default_worker_registry.rb', line 60

def register_worker(queue, clazz)
  if (worker_class = @workers[queue]) && (worker_class.get_shoryuken_options['batch'] == true || clazz.get_shoryuken_options['batch'] == true)
    raise Errors::InvalidWorkerRegistrationError, "Could not register #{clazz} for #{queue}, "\
      "because #{worker_class} is already registered for this queue, "\
      "and Shoryuken doesn't support a batchable worker for a queue with multiple workers"
  end

  @workers[queue] = clazz
end

#workers(queue) ⇒ Array<Class>

Returns all worker classes for a queue

Parameters:

  • queue (String)

    the queue name

Returns:

  • (Array<Class>)

    the registered worker classes



74
75
76
# File 'lib/shoryuken/default_worker_registry.rb', line 74

def workers(queue)
  [@workers.fetch(queue, [])].flatten
end