Class: Shoryuken::Options
- Inherits:
-
Object
- Object
- Shoryuken::Options
- Defined in:
- lib/shoryuken/options.rb
Overview
Stores and manages all Shoryuken configuration options. This class is used internally to hold settings for workers, queues, middleware, and other runtime configurations.
Constant Summary collapse
- DEFAULTS =
Default configuration values for Shoryuken
{ thread_priority: -1, concurrency: 25, queues: [], aws: {}, delay: 0.0, timeout: 8, lifecycle_events: { startup: [], dispatch: [], utilization_update: [], quiet: [], shutdown: [], stopped: [] } }.freeze
Instance Attribute Summary collapse
- #active_job_queue_name_prefixing ⇒ Boolean, ...
- #cache_visibility_timeout ⇒ Boolean, ...
-
#default_worker_options ⇒ Hash{String => Object}
Returns the default worker options hash.
- #enable_reloading ⇒ Boolean, ...
- #exception_handlers ⇒ Boolean, ...
- #groups ⇒ Boolean, ...
- #launcher_executor ⇒ Boolean, ...
-
#logger ⇒ Logger
Returns the logger instance.
- #reloader ⇒ Boolean, ...
-
#sqs_client ⇒ Aws::SQS::Client
Returns the SQS client, initializing a default one if needed.
-
#sqs_client_receive_message_opts ⇒ Hash
Options passed to SQS receive_message calls.
- #start_callback ⇒ Boolean, ...
- #stop_callback ⇒ Boolean, ...
-
#thread_priority ⇒ Integer
Returns the thread priority setting.
- #worker_executor ⇒ Boolean, ...
- #worker_registry ⇒ Boolean, ...
Instance Method Summary collapse
-
#active_job? ⇒ Boolean
Checks if ActiveJob is available.
-
#active_job_queue_name_prefixing? ⇒ Boolean
Checks if ActiveJob queue name prefixing is enabled.
-
#add_group(group, concurrency = nil, delay: nil, polling_strategy: nil) ⇒ Hash
Adds a processing group with the specified concurrency and delay.
-
#add_queue(queue, weight, group) ⇒ void
Adds a queue to a processing group with the specified weight.
-
#cache_visibility_timeout? ⇒ Boolean
Checks if visibility timeout caching is enabled.
-
#client_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain
Returns the client middleware chain.
-
#configure_client {|Shoryuken::Options| ... } ⇒ void
Yields self unless running as a server for client-specific configuration.
-
#configure_server {|Shoryuken::Options| ... } ⇒ void
Yields self if running as a server for server-specific configuration.
-
#delay(group) ⇒ Float
Returns the polling delay for a group.
-
#initialize ⇒ Options
constructor
Initializes a new Options instance with default values.
-
#on(event, &block) { ... } ⇒ void
Registers a block to run at a point in the Shoryuken lifecycle.
-
#on_start(&block) { ... } ⇒ void
Registers a callback to run when the server starts.
-
#on_stop(&block) { ... } ⇒ void
Registers a callback to run when the server stops.
-
#options ⇒ Hash
Returns the global options hash.
-
#polling_strategy(group) ⇒ Class
Returns the polling strategy class for a group.
-
#register_worker(*args) ⇒ void
Registers a worker class with the worker registry.
-
#server? ⇒ Boolean
Checks if running as a server (CLI mode).
-
#server_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain
Returns the server middleware chain.
-
#ungrouped_queues ⇒ Array<String>
Returns all queues from all groups.
Constructor Details
#initialize ⇒ Options
Initializes a new Options instance with default values
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/shoryuken/options.rb', line 51 def initialize self.groups = {} self.worker_registry = DefaultWorkerRegistry.new self.exception_handlers = [DefaultExceptionHandler] self.active_job_queue_name_prefixing = false self.worker_executor = Worker::DefaultExecutor self.cache_visibility_timeout = false self.reloader = proc { |&block| block.call } self.enable_reloading ||= false # this is needed for keeping backward compatibility @sqs_client_receive_message_opts ||= {} end |
Instance Attribute Details
#active_job_queue_name_prefixing ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def active_job_queue_name_prefixing @active_job_queue_name_prefixing end |
#cache_visibility_timeout ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def cache_visibility_timeout @cache_visibility_timeout end |
#default_worker_options ⇒ Hash{String => Object}
Returns the default worker options hash
235 236 237 238 239 240 241 242 243 244 |
# File 'lib/shoryuken/options.rb', line 235 def @default_worker_options ||= { 'queue' => 'default', 'delete' => false, 'auto_delete' => false, 'auto_visibility_timeout' => false, 'retry_intervals' => nil, 'batch' => false } end |
#enable_reloading ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def enable_reloading @enable_reloading end |
#exception_handlers ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def exception_handlers @exception_handlers end |
#groups ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def groups @groups end |
#launcher_executor ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def launcher_executor @launcher_executor end |
#logger ⇒ Logger
Returns the logger instance
171 172 173 |
# File 'lib/shoryuken/options.rb', line 171 def logger @logger ||= Shoryuken::Logging.logger end |
#reloader ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def reloader @reloader end |
#sqs_client ⇒ Aws::SQS::Client
Returns the SQS client, initializing a default one if needed. Uses AWS configuration from options if available.
149 150 151 |
# File 'lib/shoryuken/options.rb', line 149 def sqs_client @sqs_client ||= Aws::SQS::Client.new([:aws]) end |
#sqs_client_receive_message_opts ⇒ Hash
Returns options passed to SQS receive_message calls.
48 49 50 |
# File 'lib/shoryuken/options.rb', line 48 def @sqs_client_receive_message_opts end |
#start_callback ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def start_callback @start_callback end |
#stop_callback ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def stop_callback @stop_callback end |
#thread_priority ⇒ Integer
Returns the thread priority setting
178 179 180 |
# File 'lib/shoryuken/options.rb', line 178 def thread_priority @thread_priority ||= [:thread_priority] end |
#worker_executor ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def worker_executor @worker_executor end |
#worker_registry ⇒ Boolean, ...
37 38 39 |
# File 'lib/shoryuken/options.rb', line 37 def worker_registry @worker_registry end |
Instance Method Details
#active_job? ⇒ Boolean
Checks if ActiveJob is available
67 68 69 |
# File 'lib/shoryuken/options.rb', line 67 def active_job? defined?(::ActiveJob) end |
#active_job_queue_name_prefixing? ⇒ Boolean
Checks if ActiveJob queue name prefixing is enabled
301 302 303 |
# File 'lib/shoryuken/options.rb', line 301 def active_job_queue_name_prefixing? @active_job_queue_name_prefixing end |
#add_group(group, concurrency = nil, delay: nil, polling_strategy: nil) ⇒ Hash
Adds a processing group with the specified concurrency and delay
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/shoryuken/options.rb', line 79 def add_group(group, concurrency = nil, delay: nil, polling_strategy: nil) unless polling_strategy.nil? || polling_strategy.is_a?(Class) || polling_strategy.is_a?(String) raise Errors::InvalidPollingStrategyError, "#{polling_strategy} is not a valid polling_strategy" end concurrency ||= [:concurrency] delay ||= [:delay] groups[group] ||= { concurrency: concurrency, delay: delay, polling_strategy: polling_strategy, queues: [] } end |
#add_queue(queue, weight, group) ⇒ void
This method returns an undefined value.
Adds a queue to a processing group with the specified weight
101 102 103 104 105 |
# File 'lib/shoryuken/options.rb', line 101 def add_queue(queue, weight, group) weight.times do groups[group][:queues] << queue end end |
#cache_visibility_timeout? ⇒ Boolean
Checks if visibility timeout caching is enabled
294 295 296 |
# File 'lib/shoryuken/options.rb', line 294 def cache_visibility_timeout? @cache_visibility_timeout end |
#client_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain
Returns the client middleware chain
226 227 228 229 230 |
# File 'lib/shoryuken/options.rb', line 226 def client_middleware @_client_chain ||= default_client_middleware yield @_client_chain if block_given? @_client_chain end |
#configure_client {|Shoryuken::Options| ... } ⇒ void
This method returns an undefined value.
Yields self unless running as a server for client-specific configuration
218 219 220 |
# File 'lib/shoryuken/options.rb', line 218 def configure_client yield self unless server? end |
#configure_server {|Shoryuken::Options| ... } ⇒ void
This method returns an undefined value.
Yields self if running as a server for server-specific configuration
200 201 202 |
# File 'lib/shoryuken/options.rb', line 200 def configure_server yield self if server? end |
#delay(group) ⇒ Float
Returns the polling delay for a group
141 142 143 |
# File 'lib/shoryuken/options.rb', line 141 def delay(group) groups[group].to_h.fetch(:delay, [:delay]).to_f end |
#on(event, &block) { ... } ⇒ void
This method returns an undefined value.
Registers a block to run at a point in the Shoryuken lifecycle.
277 278 279 280 281 282 |
# File 'lib/shoryuken/options.rb', line 277 def on(event, &block) raise Errors::InvalidEventError, "Symbols only please: #{event}" unless event.is_a?(Symbol) raise Errors::InvalidEventError, "Invalid event name: #{event}" unless [:lifecycle_events].key?(event) [:lifecycle_events][event] << block end |
#on_start(&block) { ... } ⇒ void
This method returns an undefined value.
Registers a callback to run when the server starts
251 252 253 |
# File 'lib/shoryuken/options.rb', line 251 def on_start(&block) self.start_callback = block end |
#on_stop(&block) { ... } ⇒ void
This method returns an undefined value.
Registers a callback to run when the server stops
260 261 262 |
# File 'lib/shoryuken/options.rb', line 260 def on_stop(&block) self.stop_callback = block end |
#options ⇒ Hash
Returns the global options hash
164 165 166 |
# File 'lib/shoryuken/options.rb', line 164 def @options ||= DEFAULTS.dup end |
#polling_strategy(group) ⇒ Class
Returns the polling strategy class for a group
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/shoryuken/options.rb', line 118 def polling_strategy(group) strategy = groups[group].to_h[:polling_strategy] || (group == 'default' ? : [:groups].to_h[group]).to_h[:polling_strategy] case strategy when 'WeightedRoundRobin', nil # Default case Polling::WeightedRoundRobin when 'StrictPriority' Polling::StrictPriority when String begin Object.const_get(strategy) rescue NameError raise Errors::InvalidPollingStrategyError, "#{strategy} is not a valid polling_strategy" end when Class strategy end end |
#register_worker(*args) ⇒ void
This method returns an undefined value.
Registers a worker class with the worker registry
192 193 194 |
# File 'lib/shoryuken/options.rb', line 192 def register_worker(*args) worker_registry.register_worker(*args) end |
#server? ⇒ Boolean
Checks if running as a server (CLI mode)
287 288 289 |
# File 'lib/shoryuken/options.rb', line 287 def server? defined?(Shoryuken::CLI) end |
#server_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain
Returns the server middleware chain
208 209 210 211 212 |
# File 'lib/shoryuken/options.rb', line 208 def server_middleware @_server_chain ||= default_server_middleware yield @_server_chain if block_given? @_server_chain end |
#ungrouped_queues ⇒ Array<String>
Returns all queues from all groups
110 111 112 |
# File 'lib/shoryuken/options.rb', line 110 def ungrouped_queues groups.values.flat_map { || [:queues] } end |