Class: Legion::Transport::Queue

Inherits:
Queue
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/queue.rb

Instance Method Summary collapse

Methods included from Common

#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(queue = queue_name, options = {}) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/legion/transport/queue.rb', line 8

def initialize(queue = queue_name, options = {})
  retries ||= 0
  @options = options
  merged = options_builder(default_options, queue_options, options)
  ensure_dlx(merged)
  super(channel, queue, merged)
rescue Legion::Transport::CONNECTOR::PreconditionFailed
  retries.zero? ? retries = 1 : raise
  recreate_queue(queue)
  @channel = Legion::Transport::Connection.channel
  retry
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Object



71
72
73
# File 'lib/legion/transport/queue.rb', line 71

def acknowledge(delivery_tag)
  channel.acknowledge(delivery_tag)
end

#default_optionsObject



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/transport/queue.rb', line 27

def default_options
  hash = Concurrent::Hash.new
  hash[:manual_ack] = true
  hash[:durable] = true
  hash[:exclusive] = false
  hash[:block] = false
  hash[:auto_delete] = false
  hash[:arguments] = {
    'x-queue-type':           'quorum',
    'x-dead-letter-exchange': "#{self.class.ancestors.first.to_s.split('::')[2].downcase}.dlx"
  }
  hash
end

#delete(options = {}) ⇒ Object



64
65
66
67
68
69
# File 'lib/legion/transport/queue.rb', line 64

def delete(options = {})
  super
  true
rescue Legion::Transport::CONNECTOR::PreconditionFailed
  false
end

#ensure_dlx(merged_options) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/legion/transport/queue.rb', line 45

def ensure_dlx(merged_options)
  dlx_name = merged_options.dig(:arguments, :'x-dead-letter-exchange')
  return if dlx_name.nil? || dlx_name.empty?

  channel.exchange_declare(dlx_name, 'fanout', durable: true, auto_delete: false)
rescue StandardError => e
  Legion::Transport.logger.warn "Failed to declare DLX #{dlx_name}: #{e.message}"
end

#queue_nameObject



54
55
56
57
58
59
60
61
62
# File 'lib/legion/transport/queue.rb', line 54

def queue_name
  ancestor = self.class.ancestors.first.to_s.split('::')
  name = if ancestor[5].scan(/[A-Z]/).length > 1
           ancestor[5].gsub!(/(.)([A-Z])/, '\1_\2').downcase!
         else
           ancestor[5].downcase!
         end
  "#{ancestor[2].downcase}.#{name}"
end

#queue_optionsObject



41
42
43
# File 'lib/legion/transport/queue.rb', line 41

def queue_options
  Concurrent::Hash.new
end

#recreate_queue(queue) ⇒ Object



21
22
23
24
25
# File 'lib/legion/transport/queue.rb', line 21

def recreate_queue(queue)
  Legion::Transport.logger.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
  queue = ::Bunny::Queue.new(Legion::Transport::Connection.channel, queue, no_declare: true, passive: true)
  queue.delete
end

#reject(delivery_tag, requeue: false) ⇒ Object



75
76
77
# File 'lib/legion/transport/queue.rb', line 75

def reject(delivery_tag, requeue: false)
  channel.reject(delivery_tag, requeue)
end