Class: Lepus::Consumers::WorkerFactory

Inherits:
Object
  • Object
show all
Defined in:
lib/lepus/consumers/worker_factory.rb

Overview

This is a configuration object for defining process-level settings It holds settings such as connection pool size, timeouts, and alive thresholds and more importantly, the list of consumers that should be run in this process.

Note that this class only holds configuration data related the process and does not handle the actual process management or consumer execution. Consumer has its own configuration for AMPQ settings, queue names, etc.

Constant Summary collapse

DEFAULT_NAME =
"default"
DEFAULT_POOL_SIZE =
1
DEFAULT_POOL_TIMEOUT =
5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ WorkerFactory

You probably want to use .[] or .default to get an instance instead of calling new directly.



64
65
66
67
68
69
70
# File 'lib/lepus/consumers/worker_factory.rb', line 64

def initialize(name)
  @name = name.to_s
  @pool_size = DEFAULT_POOL_SIZE
  @pool_timeout = DEFAULT_POOL_TIMEOUT
  @consumers = []
  @callbacks = {before_fork: [], after_fork: []}
end

Instance Attribute Details

#consumersArray<Lepus::Consumer> (readonly)

Returns the list of consumer classes to be run in this process.

Returns:

  • (Array<Lepus::Consumer>)

    the list of consumer classes to be run in this process.



55
56
57
# File 'lib/lepus/consumers/worker_factory.rb', line 55

def consumers
  @consumers
end

#nameString (readonly)

Returns the unique name for this process configuration. Default is “default”.

Returns:

  • (String)

    the unique name for this process configuration. Default is “default”.



52
53
54
# File 'lib/lepus/consumers/worker_factory.rb', line 52

def name
  @name
end

#pool_sizeInteger

Returns the size of the connection pool for this process. Default is 1.

Returns:

  • (Integer)

    the size of the connection pool for this process. Default is 1.



58
59
60
# File 'lib/lepus/consumers/worker_factory.rb', line 58

def pool_size
  @pool_size
end

#pool_timeoutInteger

Returns the timeout in seconds to wait for a connection from the pool. Default is 5 seconds.

Returns:

  • (Integer)

    the timeout in seconds to wait for a connection from the pool. Default is 5 seconds.



61
62
63
# File 'lib/lepus/consumers/worker_factory.rb', line 61

def pool_timeout
  @pool_timeout
end

Class Method Details

.[](name) ⇒ Object



18
19
20
21
# File 'lib/lepus/consumers/worker_factory.rb', line 18

def [](name)
  @instances ||= Concurrent::Map.new
  @instances[name.to_s] ||= new(name)
end

.defaultObject



23
24
25
# File 'lib/lepus/consumers/worker_factory.rb', line 23

def default
  self[DEFAULT_NAME]
end

.exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


27
28
29
30
31
# File 'lib/lepus/consumers/worker_factory.rb', line 27

def exists?(name)
  return false unless @instances

  @instances.key?(name.to_s)
end

.immutate_with(name, consumers: []) ⇒ Lepus::Consumers::WorkerFactory

Create an immutable copy of the process configuration with the specified consumers.

Parameters:

  • name (String, Symbol)

    the name of the process configuration to use.

  • consumers (Array<Lepus::Consumer>) (defaults to: [])

    the list of consumer classes to be run in this process.

Returns:



37
38
39
40
41
# File 'lib/lepus/consumers/worker_factory.rb', line 37

def immutate_with(name, consumers: [])
  definer = self[name].dup
  definer.freeze_with(consumers)
  definer
end

Instance Method Details

#after_fork(&block) ⇒ Object



109
110
111
# File 'lib/lepus/consumers/worker_factory.rb', line 109

def after_fork(&block)
  callbacks[:after_fork] << block if block
end

#assign(options = {}) ⇒ void

This method returns an undefined value.

Assign multiple attributes at once from a hash of options.

Raises:

  • (ArgumentError)

    if an unknown attribute is provided.



75
76
77
78
79
80
81
# File 'lib/lepus/consumers/worker_factory.rb', line 75

def assign(options = {})
  options.each do |key, value|
    raise ArgumentError, "Unknown attribute #{key}" unless respond_to?(:"#{key}=")

    public_send(:"#{key}=", value)
  end
end

#before_fork(&block) ⇒ Object



105
106
107
# File 'lib/lepus/consumers/worker_factory.rb', line 105

def before_fork(&block)
  callbacks[:before_fork] << block if block
end

#freeze_with(consumers) ⇒ void

This method returns an undefined value.

Freeze this configuration instance and set the consumers that will run in this process.

Parameters:

  • consumers (Array<Lepus::Consumer>)

    the list of consumer classes to be run in this process.



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/lepus/consumers/worker_factory.rb', line 86

def freeze_with(consumers)
  @consumers = Array(consumers).map do |consumer|
    unless consumer <= Lepus::Consumer
      raise ArgumentError, "#{consumer} is not a subclass of Lepus::Consumer"
    end

    consumer
  end.uniq.freeze
  @callbacks = @callbacks.transform_values(&:freeze)

  freeze
end

#instantiate_processLepus::Consumers::Worker

Instantiate a new Lepus::Consumers::Worker based on this configuration.

Returns:



101
102
103
# File 'lib/lepus/consumers/worker_factory.rb', line 101

def instantiate_process
  Lepus::Consumers::Worker.new(self)
end

#run_process_callbacks(type) ⇒ Object



113
114
115
116
117
# File 'lib/lepus/consumers/worker_factory.rb', line 113

def run_process_callbacks(type)
  return unless callbacks[type]

  callbacks[type].each { |callback| callback.call }
end