Class: Shoryuken::Options

Inherits:
Object
  • Object
show all
Defined in:
lib/shoryuken/options.rb

Overview

Stores and manages all Shoryuken configuration options. This class is used internally to hold settings for workers, queues, middleware, and other runtime configurations.

Constant Summary collapse

DEFAULTS =

Default configuration values for Shoryuken

{
  thread_priority: -1,
  concurrency: 25,
  queues: [],
  aws: {},
  delay: 0.0,
  timeout: 8,
  lifecycle_events: {
    startup: [],
    dispatch: [],
    utilization_update: [],
    quiet: [],
    shutdown: [],
    stopped: []
  }
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeOptions

Initializes a new Options instance with default values



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/shoryuken/options.rb', line 51

def initialize
  self.groups = {}
  self.worker_registry = DefaultWorkerRegistry.new
  self.exception_handlers = [DefaultExceptionHandler]
  self.active_job_queue_name_prefixing = false
  self.worker_executor = Worker::DefaultExecutor
  self.cache_visibility_timeout = false
  self.reloader = proc { |&block| block.call }
  self.enable_reloading ||= false
  # this is needed for keeping backward compatibility
  @sqs_client_receive_message_opts ||= {}
end

Instance Attribute Details

#active_job_queue_name_prefixingBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def active_job_queue_name_prefixing
  @active_job_queue_name_prefixing
end

#cache_visibility_timeoutBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def cache_visibility_timeout
  @cache_visibility_timeout
end

#default_worker_optionsHash{String => Object}

Returns the default worker options hash

Returns:

  • (Hash{String => Object})

    the default worker options



235
236
237
238
239
240
241
242
243
244
# File 'lib/shoryuken/options.rb', line 235

def default_worker_options
  @default_worker_options ||= {
    'queue' => 'default',
    'delete' => false,
    'auto_delete' => false,
    'auto_visibility_timeout' => false,
    'retry_intervals' => nil,
    'batch' => false
  }
end

#enable_reloadingBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def enable_reloading
  @enable_reloading
end

#exception_handlersBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def exception_handlers
  @exception_handlers
end

#groupsBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def groups
  @groups
end

#launcher_executorBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def launcher_executor
  @launcher_executor
end

#loggerLogger

Returns the logger instance

Returns:

  • (Logger)

    the logger



171
172
173
# File 'lib/shoryuken/options.rb', line 171

def logger
  @logger ||= Shoryuken::Logging.logger
end

#reloaderBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def reloader
  @reloader
end

#sqs_clientAws::SQS::Client

Returns the SQS client, initializing a default one if needed. Uses AWS configuration from options if available.

Returns:

  • (Aws::SQS::Client)

    the SQS client



149
150
151
# File 'lib/shoryuken/options.rb', line 149

def sqs_client
  @sqs_client ||= Aws::SQS::Client.new(options[:aws])
end

#sqs_client_receive_message_optsHash

Returns options passed to SQS receive_message calls.

Returns:

  • (Hash)

    options passed to SQS receive_message calls



48
49
50
# File 'lib/shoryuken/options.rb', line 48

def sqs_client_receive_message_opts
  @sqs_client_receive_message_opts
end

#start_callbackBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def start_callback
  @start_callback
end

#stop_callbackBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def stop_callback
  @stop_callback
end

#thread_priorityInteger

Returns the thread priority setting

Returns:

  • (Integer)

    the thread priority



178
179
180
# File 'lib/shoryuken/options.rb', line 178

def thread_priority
  @thread_priority ||= options[:thread_priority]
end

#worker_executorBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def worker_executor
  @worker_executor
end

#worker_registryBoolean, ...

Returns:

  • (Boolean)

    whether to enable ActiveJob queue name prefixing

  • (Boolean)

    whether to cache SQS visibility timeout

  • (Hash{String => Hash})

    the configured processing groups

  • (Object)

    the executor used to launch workers

  • (Proc)

    the code reloader proc for development environments

  • (Boolean)

    whether code reloading is enabled

  • (Proc, nil)

    callback to execute when server starts

  • (Proc, nil)

    callback to execute when server stops

  • (Class)

    the executor class for running workers

  • (Shoryuken::WorkerRegistry)

    the registry for worker classes

  • (Array<#call>)

    handlers for processing exceptions



37
38
39
# File 'lib/shoryuken/options.rb', line 37

def worker_registry
  @worker_registry
end

Instance Method Details

#active_job?Boolean

Checks if ActiveJob is available

Returns:

  • (Boolean)

    true if ActiveJob is defined



67
68
69
# File 'lib/shoryuken/options.rb', line 67

def active_job?
  defined?(::ActiveJob)
end

#active_job_queue_name_prefixing?Boolean

Checks if ActiveJob queue name prefixing is enabled

Returns:

  • (Boolean)

    true if prefixing is enabled



301
302
303
# File 'lib/shoryuken/options.rb', line 301

def active_job_queue_name_prefixing?
  @active_job_queue_name_prefixing
end

#add_group(group, concurrency = nil, delay: nil, polling_strategy: nil) ⇒ Hash

Adds a processing group with the specified concurrency and delay

Parameters:

  • group (String)

    the name of the group

  • concurrency (Integer, nil) (defaults to: nil)

    the number of concurrent workers for the group

  • delay (Float, nil) (defaults to: nil)

    the delay between polling cycles

  • polling_strategy (Class, String, nil) (defaults to: nil)

    the polling strategy class for the group

Returns:

  • (Hash)

    the group configuration

Raises:



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/shoryuken/options.rb', line 79

def add_group(group, concurrency = nil, delay: nil, polling_strategy: nil)
  unless polling_strategy.nil? || polling_strategy.is_a?(Class) || polling_strategy.is_a?(String)
    raise Errors::InvalidPollingStrategyError, "#{polling_strategy} is not a valid polling_strategy"
  end

  concurrency ||= options[:concurrency]
  delay ||= options[:delay]

  groups[group] ||= {
    concurrency: concurrency,
    delay: delay,
    polling_strategy: polling_strategy,
    queues: []
  }
end

#add_queue(queue, weight, group) ⇒ void

This method returns an undefined value.

Adds a queue to a processing group with the specified weight

Parameters:

  • queue (String)

    the name of the queue

  • weight (Integer)

    the weight (priority) of the queue

  • group (String)

    the name of the group to add the queue to



101
102
103
104
105
# File 'lib/shoryuken/options.rb', line 101

def add_queue(queue, weight, group)
  weight.times do
    groups[group][:queues] << queue
  end
end

#cache_visibility_timeout?Boolean

Checks if visibility timeout caching is enabled

Returns:

  • (Boolean)

    true if caching is enabled



294
295
296
# File 'lib/shoryuken/options.rb', line 294

def cache_visibility_timeout?
  @cache_visibility_timeout
end

#client_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain

Returns the client middleware chain

Yields:

Returns:



226
227
228
229
230
# File 'lib/shoryuken/options.rb', line 226

def client_middleware
  @_client_chain ||= default_client_middleware
  yield @_client_chain if block_given?
  @_client_chain
end

#configure_client {|Shoryuken::Options| ... } ⇒ void

This method returns an undefined value.

Yields self unless running as a server for client-specific configuration

Yields:



218
219
220
# File 'lib/shoryuken/options.rb', line 218

def configure_client
  yield self unless server?
end

#configure_server {|Shoryuken::Options| ... } ⇒ void

This method returns an undefined value.

Yields self if running as a server for server-specific configuration

Yields:



200
201
202
# File 'lib/shoryuken/options.rb', line 200

def configure_server
  yield self if server?
end

#delay(group) ⇒ Float

Returns the polling delay for a group

Parameters:

  • group (String)

    the name of the group

Returns:

  • (Float)

    the delay in seconds



141
142
143
# File 'lib/shoryuken/options.rb', line 141

def delay(group)
  groups[group].to_h.fetch(:delay, options[:delay]).to_f
end

#on(event, &block) { ... } ⇒ void

This method returns an undefined value.

Registers a block to run at a point in the Shoryuken lifecycle.

Examples:

Shoryuken.configure_server do |config|
  config.on(:shutdown) do
    puts "Goodbye cruel world!"
  end
end

Parameters:

  • event (Symbol)

    the lifecycle event (:startup, :quiet, :shutdown, or :stopped)

  • block (Proc)

    the block to execute for the event

Yields:

  • the block to execute for the event

Raises:

  • (ArgumentError)

    if event is not a Symbol or not a valid event name



277
278
279
280
281
282
# File 'lib/shoryuken/options.rb', line 277

def on(event, &block)
  raise Errors::InvalidEventError, "Symbols only please: #{event}" unless event.is_a?(Symbol)
  raise Errors::InvalidEventError, "Invalid event name: #{event}" unless options[:lifecycle_events].key?(event)

  options[:lifecycle_events][event] << block
end

#on_start(&block) { ... } ⇒ void

This method returns an undefined value.

Registers a callback to run when the server starts

Parameters:

  • block (Proc)

    the block to execute on start

Yields:

  • the block to execute on start



251
252
253
# File 'lib/shoryuken/options.rb', line 251

def on_start(&block)
  self.start_callback = block
end

#on_stop(&block) { ... } ⇒ void

This method returns an undefined value.

Registers a callback to run when the server stops

Parameters:

  • block (Proc)

    the block to execute on stop

Yields:

  • the block to execute on stop



260
261
262
# File 'lib/shoryuken/options.rb', line 260

def on_stop(&block)
  self.stop_callback = block
end

#optionsHash

Returns the global options hash

Returns:

  • (Hash)

    the options hash



164
165
166
# File 'lib/shoryuken/options.rb', line 164

def options
  @options ||= DEFAULTS.dup
end

#polling_strategy(group) ⇒ Class

Returns the polling strategy class for a group

Parameters:

  • group (String)

    the name of the group

Returns:

  • (Class)

    the polling strategy class to use



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/shoryuken/options.rb', line 118

def polling_strategy(group)
  strategy = groups[group].to_h[:polling_strategy] ||
             (group == 'default' ? options : options[:groups].to_h[group]).to_h[:polling_strategy]
  case strategy
  when 'WeightedRoundRobin', nil # Default case
    Polling::WeightedRoundRobin
  when 'StrictPriority'
    Polling::StrictPriority
  when String
    begin
      Object.const_get(strategy)
    rescue NameError
      raise Errors::InvalidPollingStrategyError, "#{strategy} is not a valid polling_strategy"
    end
  when Class
    strategy
  end
end

#register_worker(*args) ⇒ void

This method returns an undefined value.

Registers a worker class with the worker registry

Parameters:

  • args (Array)

    arguments to pass to the registry



192
193
194
# File 'lib/shoryuken/options.rb', line 192

def register_worker(*args)
  worker_registry.register_worker(*args)
end

#server?Boolean

Checks if running as a server (CLI mode)

Returns:

  • (Boolean)

    true if Shoryuken::CLI is defined



287
288
289
# File 'lib/shoryuken/options.rb', line 287

def server?
  defined?(Shoryuken::CLI)
end

#server_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain

Returns the server middleware chain

Yields:

Returns:



208
209
210
211
212
# File 'lib/shoryuken/options.rb', line 208

def server_middleware
  @_server_chain ||= default_server_middleware
  yield @_server_chain if block_given?
  @_server_chain
end

#ungrouped_queuesArray<String>

Returns all queues from all groups

Returns:

  • (Array<String>)

    flat array of all queue names



110
111
112
# File 'lib/shoryuken/options.rb', line 110

def ungrouped_queues
  groups.values.flat_map { |options| options[:queues] }
end