Class: Lepus::Consumers::Config
- Inherits:
-
Object
- Object
- Lepus::Consumers::Config
- Defined in:
- lib/lepus/consumers/config.rb
Overview
Parse the list of options for the consumer.
Constant Summary collapse
- DEFAULT_EXCHANGE_OPTIONS =
{ name: nil, type: :topic, # The type of the exchange (:direct, :fanout, :topic or :headers). durable: true }.freeze
- DEFAULT_CHANNEL_OPTIONS =
{ pool_size: 1, abort_on_exception: false, shutdown_timeout: 60 }.freeze
- DEFAULT_QUEUE_OPTIONS =
{ name: nil, durable: true }.freeze
- DEFAULT_PREFETCH_COUNT =
1- DEFAULT_WORKER_OPTIONS =
{ name: "default", threads: 1 }.freeze
- DEFAULT_RETRY_QUEUE_OPTIONS =
{ name: nil, durable: true, delay: 5000, arguments: {} }
- DEFAULT_ERROR_QUEUE_OPTIONS =
DEFAULT_QUEUE_OPTIONS
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#prefetch_count ⇒ Object
readonly
Returns the value of attribute prefetch_count.
Instance Method Summary collapse
- #binds_args ⇒ Object
- #channel_args ⇒ Object
- #consumer_queue_args ⇒ Object
- #error_queue_args ⇒ Object
- #error_queue_name ⇒ Object
- #exchange_name ⇒ Object
- #exchange_options ⇒ Object
-
#initialize(options = {}) ⇒ Config
constructor
A new instance of Config.
- #queue_name ⇒ Object
- #retry_queue_args ⇒ Object
- #retry_queue_name ⇒ Object
- #worker_name ⇒ Object
- #worker_threads ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Config
Returns a new instance of Config.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/lepus/consumers/config.rb', line 42 def initialize( = {}) opts = Lepus::Primitive::Hash.new().deep_symbolize_keys @worker_opts = DEFAULT_WORKER_OPTIONS.merge( declaration_config(opts.delete(:worker)) ) @exchange_opts = DEFAULT_EXCHANGE_OPTIONS.merge( declaration_config(opts.delete(:exchange)) ) @queue_opts = DEFAULT_QUEUE_OPTIONS.merge( declaration_config(opts.delete(:queue)) ) if (value = opts.delete(:retry_queue)) @retry_queue_opts = DEFAULT_RETRY_QUEUE_OPTIONS.merge( declaration_config(value) ) end if (value = opts.delete(:error_queue)) @error_queue_opts = DEFAULT_ERROR_QUEUE_OPTIONS.merge( declaration_config(value) ) end @channel_opts = DEFAULT_CHANNEL_OPTIONS.merge(opts.delete(:channel) || {}) @bind_opts = opts.delete(:bind) || {} if (routing_key = opts.delete(:routing_key)) @bind_opts[:routing_key] ||= routing_key end @prefetch_count = opts.key?(:prefetch) ? opts.delete(:prefetch) : DEFAULT_PREFETCH_COUNT @options = opts end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
40 41 42 |
# File 'lib/lepus/consumers/config.rb', line 40 def @options end |
#prefetch_count ⇒ Object (readonly)
Returns the value of attribute prefetch_count.
40 41 42 |
# File 'lib/lepus/consumers/config.rb', line 40 def prefetch_count @prefetch_count end |
Instance Method Details
#binds_args ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/lepus/consumers/config.rb', line 123 def binds_args arguments = @bind_opts.fetch(:arguments, {}).transform_keys(&:to_s) opts = {} opts[:arguments] = arguments unless arguments.empty? if (routing_keys = @bind_opts[:routing_key]).is_a?(Array) routing_keys.map { |key| opts.merge(routing_key: key) } elsif (routing_key = @bind_opts[:routing_key]) [opts.merge(routing_key: routing_key)] elsif @exchange_opts[:type] == :topic [opts.merge(routing_key: "#")] else [opts] end end |
#channel_args ⇒ Object
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/lepus/consumers/config.rb', line 73 def channel_args [ nil, *@channel_opts.values_at( :pool_size, :abort_on_exception, :shutdown_timeout ) ] end |
#consumer_queue_args ⇒ Object
92 93 94 95 96 97 98 99 100 101 |
# File 'lib/lepus/consumers/config.rb', line 92 def consumer_queue_args opts = @queue_opts.reject { |k, v| k == :name } return [queue_name, opts] unless retry_queue_args opts[:arguments] ||= {} opts[:arguments]["x-dead-letter-exchange"] = "" opts[:arguments]["x-dead-letter-routing-key"] = retry_queue_name [queue_name, opts] end |
#error_queue_args ⇒ Object
117 118 119 120 121 |
# File 'lib/lepus/consumers/config.rb', line 117 def error_queue_args return unless @error_queue_opts [error_queue_name, @error_queue_opts.reject { |k, v| k == :name }] end |
#error_queue_name ⇒ Object
160 161 162 163 164 |
# File 'lib/lepus/consumers/config.rb', line 160 def error_queue_name name = @error_queue_opts[:name] name ||= "#{queue_name}.error" name end |
#exchange_name ⇒ Object
84 85 86 |
# File 'lib/lepus/consumers/config.rb', line 84 def exchange_name @exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required") end |
#exchange_options ⇒ Object
88 89 90 |
# File 'lib/lepus/consumers/config.rb', line 88 def @exchange_opts.reject { |k, v| k == :name } end |
#queue_name ⇒ Object
150 151 152 |
# File 'lib/lepus/consumers/config.rb', line 150 def queue_name @queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required") end |
#retry_queue_args ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/lepus/consumers/config.rb', line 103 def retry_queue_args return unless @retry_queue_opts delay = @retry_queue_opts[:delay] args = (@retry_queue_opts[:arguments] || {}).merge( "x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => queue_name, "x-message-ttl" => delay ) extra_keys = %i[name delay] opts = @retry_queue_opts.reject { |k, v| extra_keys.include?(k) } [retry_queue_name, opts.merge(arguments: args)] end |
#retry_queue_name ⇒ Object
154 155 156 157 158 |
# File 'lib/lepus/consumers/config.rb', line 154 def retry_queue_name name = @retry_queue_opts[:name] name ||= "#{queue_name}.retry" name end |
#worker_name ⇒ Object
138 139 140 |
# File 'lib/lepus/consumers/config.rb', line 138 def worker_name @worker_opts.fetch(:name, DEFAULT_WORKER_OPTIONS[:name]) end |
#worker_threads ⇒ Object
142 143 144 145 146 147 148 |
# File 'lib/lepus/consumers/config.rb', line 142 def worker_threads threads = @worker_opts.fetch(:threads, DEFAULT_WORKER_OPTIONS[:threads]) if threads.to_i < 1 raise InvalidConsumerConfigError, "Worker threads must be at least 1" end threads end |