Class: Lepus::Consumers::WorkerFactory
- Inherits:
-
Object
- Object
- Lepus::Consumers::WorkerFactory
- Defined in:
- lib/lepus/consumers/worker_factory.rb
Overview
This is a configuration object for defining process-level settings It holds settings such as connection pool size, timeouts, and alive thresholds and more importantly, the list of consumers that should be run in this process.
Note that this class only holds configuration data related the process and does not handle the actual process management or consumer execution. Consumer has its own configuration for AMPQ settings, queue names, etc.
Constant Summary collapse
- DEFAULT_NAME =
"default"- DEFAULT_POOL_SIZE =
1- DEFAULT_POOL_TIMEOUT =
5.0
Instance Attribute Summary collapse
-
#consumers ⇒ Array<Lepus::Consumer>
readonly
The list of consumer classes to be run in this process.
-
#name ⇒ String
readonly
The unique name for this process configuration.
-
#pool_size ⇒ Integer
The size of the connection pool for this process.
-
#pool_timeout ⇒ Integer
The timeout in seconds to wait for a connection from the pool.
Class Method Summary collapse
- .[](name) ⇒ Object
- .default ⇒ Object
- .exists?(name) ⇒ Boolean
-
.immutate_with(name, consumers: []) ⇒ Lepus::Consumers::WorkerFactory
Create an immutable copy of the process configuration with the specified consumers.
Instance Method Summary collapse
- #after_fork(&block) ⇒ Object
-
#assign(options = {}) ⇒ void
Assign multiple attributes at once from a hash of options.
- #before_fork(&block) ⇒ Object
-
#freeze_with(consumers) ⇒ void
Freeze this configuration instance and set the consumers that will run in this process.
-
#initialize(name) ⇒ WorkerFactory
constructor
You probably want to use .[] or .default to get an instance instead of calling new directly.
-
#instantiate_process ⇒ Lepus::Consumers::Worker
Instantiate a new Lepus::Consumers::Worker based on this configuration.
- #run_process_callbacks(type) ⇒ Object
Constructor Details
#initialize(name) ⇒ WorkerFactory
You probably want to use .[] or .default to get an instance instead of calling new directly.
64 65 66 67 68 69 70 |
# File 'lib/lepus/consumers/worker_factory.rb', line 64 def initialize(name) @name = name.to_s @pool_size = DEFAULT_POOL_SIZE @pool_timeout = DEFAULT_POOL_TIMEOUT @consumers = [] @callbacks = {before_fork: [], after_fork: []} end |
Instance Attribute Details
#consumers ⇒ Array<Lepus::Consumer> (readonly)
Returns the list of consumer classes to be run in this process.
55 56 57 |
# File 'lib/lepus/consumers/worker_factory.rb', line 55 def consumers @consumers end |
#name ⇒ String (readonly)
Returns the unique name for this process configuration. Default is “default”.
52 53 54 |
# File 'lib/lepus/consumers/worker_factory.rb', line 52 def name @name end |
#pool_size ⇒ Integer
Returns the size of the connection pool for this process. Default is 1.
58 59 60 |
# File 'lib/lepus/consumers/worker_factory.rb', line 58 def pool_size @pool_size end |
#pool_timeout ⇒ Integer
Returns the timeout in seconds to wait for a connection from the pool. Default is 5 seconds.
61 62 63 |
# File 'lib/lepus/consumers/worker_factory.rb', line 61 def pool_timeout @pool_timeout end |
Class Method Details
.[](name) ⇒ Object
18 19 20 21 |
# File 'lib/lepus/consumers/worker_factory.rb', line 18 def [](name) @instances ||= Concurrent::Map.new @instances[name.to_s] ||= new(name) end |
.default ⇒ Object
23 24 25 |
# File 'lib/lepus/consumers/worker_factory.rb', line 23 def default self[DEFAULT_NAME] end |
.exists?(name) ⇒ Boolean
27 28 29 30 31 |
# File 'lib/lepus/consumers/worker_factory.rb', line 27 def exists?(name) return false unless @instances @instances.key?(name.to_s) end |
.immutate_with(name, consumers: []) ⇒ Lepus::Consumers::WorkerFactory
Create an immutable copy of the process configuration with the specified consumers.
37 38 39 40 41 |
# File 'lib/lepus/consumers/worker_factory.rb', line 37 def immutate_with(name, consumers: []) definer = self[name].dup definer.freeze_with(consumers) definer end |
Instance Method Details
#after_fork(&block) ⇒ Object
109 110 111 |
# File 'lib/lepus/consumers/worker_factory.rb', line 109 def after_fork(&block) callbacks[:after_fork] << block if block end |
#assign(options = {}) ⇒ void
This method returns an undefined value.
Assign multiple attributes at once from a hash of options.
75 76 77 78 79 80 81 |
# File 'lib/lepus/consumers/worker_factory.rb', line 75 def assign( = {}) .each do |key, value| raise ArgumentError, "Unknown attribute #{key}" unless respond_to?(:"#{key}=") public_send(:"#{key}=", value) end end |
#before_fork(&block) ⇒ Object
105 106 107 |
# File 'lib/lepus/consumers/worker_factory.rb', line 105 def before_fork(&block) callbacks[:before_fork] << block if block end |
#freeze_with(consumers) ⇒ void
This method returns an undefined value.
Freeze this configuration instance and set the consumers that will run in this process.
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/lepus/consumers/worker_factory.rb', line 86 def freeze_with(consumers) @consumers = Array(consumers).map do |consumer| unless consumer <= Lepus::Consumer raise ArgumentError, "#{consumer} is not a subclass of Lepus::Consumer" end consumer end.uniq.freeze @callbacks = @callbacks.transform_values(&:freeze) freeze end |
#instantiate_process ⇒ Lepus::Consumers::Worker
Instantiate a new Lepus::Consumers::Worker based on this configuration.
101 102 103 |
# File 'lib/lepus/consumers/worker_factory.rb', line 101 def instantiate_process Lepus::Consumers::Worker.new(self) end |
#run_process_callbacks(type) ⇒ Object
113 114 115 116 117 |
# File 'lib/lepus/consumers/worker_factory.rb', line 113 def run_process_callbacks(type) return unless callbacks[type] callbacks[type].each { |callback| callback.call } end |