Module: SidekiqBus

Defined in:
lib/sidekiq-bus.rb,
lib/sidekiq_bus/server.rb,
lib/sidekiq_bus/version.rb,
lib/sidekiq_bus/middleware/retry.rb

Defined Under Namespace

Modules: Middleware, Server

Constant Summary collapse

ConfigurationError =
Class.new(StandardError)
REDIS_HANDLER_ERROR_MESSAGE =
'Please set SidekiqBus.redis_handler to a Callable that accepts a block and yields a '\
'Redis instance. See the SidekiqBus README for more details.'
VERSION =
'3.0.0'

Class Method Summary collapse

Class Method Details

.generate_weighted_queues(overrides: {}, default: 1) ⇒ Object

This method will analyze the current queues and generate an array that can operate as the sidekiq queues configuration. It should be based on how The sidekiq CLI builds weighted queues.

Parameters:

  • overrides (Hash<String, Integer>) (defaults to: {})

    A mapping of queue names and weights that must be included

  • default (Integer) (defaults to: 1)

    The default weight to apply to any given queue



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq-bus.rb', line 22

def self.generate_weighted_queues(overrides: {}, default: 1)
  # Gathers all queues and overrides as strictly strings
  queues = Set.new(QueueBus::TaskManager.new(false).queue_names.map(&:to_s))
  overrides = overrides.each_with_object({}) { |(q, w), h| h[q.to_s] = w }
  overrides.default = default

  # Also pitches-in for driving the bus.
  queues << 'bus_incoming'

  # Make sure every queue from the overrides is included
  queues += overrides.keys

  entry = Struct.new(:queue, :weight)

  # Map all queue names to their weights and returns them as entries
  entries = queues.map { |q| entry.new(q, [1, overrides[q]].max) }

  # Sorts by weight to provide a visual indication of queue order in sidekiq
  # UI. Otherwise they can appear in various orders. They will be sorted
  # from greatest to least weight. The negative sign on the weight is key to
  # making this work.
  entries = entries.sort_by { |e| [-e.weight, e.queue] }

  # Creates an array of N length with the same queue name (N=weight) then
  # flattened into a single array
  entries.flat_map { |e| Array.new(e.weight, e.queue) }
end

.redis(&block) ⇒ Object

Raises:



57
58
59
60
# File 'lib/sidekiq-bus.rb', line 57

def self.redis(&block)
  raise ConfigurationError, REDIS_HANDLER_ERROR_MESSAGE unless @redis_handler
  @redis_handler.call(&block)
end

.redis_handler=(handler) ⇒ Object



50
51
52
53
54
55
# File 'lib/sidekiq-bus.rb', line 50

def self.redis_handler=(handler)
  unless handler.respond_to?(:call)
    raise ConfigurationError, REDIS_HANDLER_ERROR_MESSAGE
  end
  @redis_handler = handler
end

.validate_redis_handlerObject



62
63
# File 'lib/sidekiq-bus.rb', line 62

def self.validate_redis_handler
end