Class: Lepus::Consumers::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/lepus/consumers/config.rb

Overview

Parse the list of options for the consumer.

Constant Summary collapse

DEFAULT_EXCHANGE_OPTIONS =
{
  name: nil,
  type: :topic, # The type of the exchange (:direct, :fanout, :topic or :headers).
  durable: true
}.freeze
DEFAULT_CHANNEL_OPTIONS =
{
  pool_size: 1,
  abort_on_exception: false,
  shutdown_timeout: 60
}.freeze
DEFAULT_QUEUE_OPTIONS =
{
  name: nil,
  durable: true
}.freeze
DEFAULT_PREFETCH_COUNT =
1
DEFAULT_WORKER_OPTIONS =
{
  name: "default",
  threads: 1
}.freeze
DEFAULT_RETRY_QUEUE_OPTIONS =
{
  name: nil,
  durable: true,
  delay: 5000,
  arguments: {}
}
DEFAULT_ERROR_QUEUE_OPTIONS =
DEFAULT_QUEUE_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Config

Returns a new instance of Config.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/lepus/consumers/config.rb', line 42

def initialize(options = {})
  opts = Lepus::Primitive::Hash.new(options).deep_symbolize_keys

  @worker_opts = DEFAULT_WORKER_OPTIONS.merge(
    declaration_config(opts.delete(:worker))
  )
  @exchange_opts = DEFAULT_EXCHANGE_OPTIONS.merge(
    declaration_config(opts.delete(:exchange))
  )
  @queue_opts = DEFAULT_QUEUE_OPTIONS.merge(
    declaration_config(opts.delete(:queue))
  )
  if (value = opts.delete(:retry_queue))
    @retry_queue_opts = DEFAULT_RETRY_QUEUE_OPTIONS.merge(
      declaration_config(value)
    )
  end
  if (value = opts.delete(:error_queue))
    @error_queue_opts = DEFAULT_ERROR_QUEUE_OPTIONS.merge(
      declaration_config(value)
    )
  end
  @channel_opts = DEFAULT_CHANNEL_OPTIONS.merge(opts.delete(:channel) || {})
  @bind_opts = opts.delete(:bind) || {}
  if (routing_key = opts.delete(:routing_key))
    @bind_opts[:routing_key] ||= routing_key
  end
  @prefetch_count = opts.key?(:prefetch) ? opts.delete(:prefetch) : DEFAULT_PREFETCH_COUNT
  @options = opts
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



40
41
42
# File 'lib/lepus/consumers/config.rb', line 40

def options
  @options
end

#prefetch_countObject (readonly)

Returns the value of attribute prefetch_count.



40
41
42
# File 'lib/lepus/consumers/config.rb', line 40

def prefetch_count
  @prefetch_count
end

Instance Method Details

#binds_argsObject



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/lepus/consumers/config.rb', line 123

def binds_args
  arguments = @bind_opts.fetch(:arguments, {}).transform_keys(&:to_s)
  opts = {}
  opts[:arguments] = arguments unless arguments.empty?
  if (routing_keys = @bind_opts[:routing_key]).is_a?(Array)
    routing_keys.map { |key| opts.merge(routing_key: key) }
  elsif (routing_key = @bind_opts[:routing_key])
    [opts.merge(routing_key: routing_key)]
  elsif @exchange_opts[:type] == :topic
    [opts.merge(routing_key: "#")]
  else
    [opts]
  end
end

#channel_argsObject



73
74
75
76
77
78
79
80
81
82
# File 'lib/lepus/consumers/config.rb', line 73

def channel_args
  [
    nil,
    *@channel_opts.values_at(
      :pool_size,
      :abort_on_exception,
      :shutdown_timeout
    )
  ]
end

#consumer_queue_argsObject



92
93
94
95
96
97
98
99
100
101
# File 'lib/lepus/consumers/config.rb', line 92

def consumer_queue_args
  opts = @queue_opts.reject { |k, v| k == :name }
  return [queue_name, opts] unless retry_queue_args

  opts[:arguments] ||= {}
  opts[:arguments]["x-dead-letter-exchange"] = ""
  opts[:arguments]["x-dead-letter-routing-key"] = retry_queue_name

  [queue_name, opts]
end

#error_queue_argsObject



117
118
119
120
121
# File 'lib/lepus/consumers/config.rb', line 117

def error_queue_args
  return unless @error_queue_opts

  [error_queue_name, @error_queue_opts.reject { |k, v| k == :name }]
end

#error_queue_nameObject



160
161
162
163
164
# File 'lib/lepus/consumers/config.rb', line 160

def error_queue_name
  name = @error_queue_opts[:name]
  name ||= "#{queue_name}.error"
  name
end

#exchange_nameObject



84
85
86
# File 'lib/lepus/consumers/config.rb', line 84

def exchange_name
  @exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required")
end

#exchange_optionsObject



88
89
90
# File 'lib/lepus/consumers/config.rb', line 88

def exchange_options
  @exchange_opts.reject { |k, v| k == :name }
end

#queue_nameObject



150
151
152
# File 'lib/lepus/consumers/config.rb', line 150

def queue_name
  @queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required")
end

#retry_queue_argsObject



103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/lepus/consumers/config.rb', line 103

def retry_queue_args
  return unless @retry_queue_opts

  delay = @retry_queue_opts[:delay]
  args = (@retry_queue_opts[:arguments] || {}).merge(
    "x-dead-letter-exchange" => "",
    "x-dead-letter-routing-key" => queue_name,
    "x-message-ttl" => delay
  )
  extra_keys = %i[name delay]
  opts = @retry_queue_opts.reject { |k, v| extra_keys.include?(k) }
  [retry_queue_name, opts.merge(arguments: args)]
end

#retry_queue_nameObject



154
155
156
157
158
# File 'lib/lepus/consumers/config.rb', line 154

def retry_queue_name
  name = @retry_queue_opts[:name]
  name ||= "#{queue_name}.retry"
  name
end

#worker_nameObject



138
139
140
# File 'lib/lepus/consumers/config.rb', line 138

def worker_name
  @worker_opts.fetch(:name, DEFAULT_WORKER_OPTIONS[:name])
end

#worker_threadsObject



142
143
144
145
146
147
148
# File 'lib/lepus/consumers/config.rb', line 142

def worker_threads
  threads = @worker_opts.fetch(:threads, DEFAULT_WORKER_OPTIONS[:threads])
  if threads.to_i < 1
    raise InvalidConsumerConfigError, "Worker threads must be at least 1"
  end
  threads
end