Class: Legion::Transport::Queue
- Inherits:
-
Queue
- Object
- Queue
- Legion::Transport::Queue
show all
- Includes:
- Common
- Defined in:
- lib/legion/transport/queue.rb
Instance Method Summary
collapse
Methods included from Common
#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(queue = queue_name, options = {}) ⇒ Queue
Returns a new instance of Queue.
8
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/legion/transport/queue.rb', line 8
def initialize(queue = queue_name, options = {})
retries ||= 0
@options = options
merged = options_builder(default_options, queue_options, options)
ensure_dlx(merged)
super(channel, queue, merged)
rescue Legion::Transport::CONNECTOR::PreconditionFailed
retries.zero? ? retries = 1 : raise
recreate_queue(queue)
@channel = Legion::Transport::Connection.channel
retry
end
|
Instance Method Details
#acknowledge(delivery_tag) ⇒ Object
72
73
74
|
# File 'lib/legion/transport/queue.rb', line 72
def acknowledge(delivery_tag)
channel.acknowledge(delivery_tag)
end
|
#default_options ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/legion/transport/queue.rb', line 27
def default_options
hash = Concurrent::Hash.new
hash[:manual_ack] = true
hash[:durable] = true
hash[:exclusive] = false
hash[:block] = false
hash[:auto_delete] = false
hash[:arguments] = {
'x-queue-type': 'quorum',
'x-dead-letter-exchange': "#{self.class.ancestors.first.to_s.split('::')[2].downcase}.dlx"
}
hash
end
|
#delete(options = {}) ⇒ Object
64
65
66
67
68
69
70
|
# File 'lib/legion/transport/queue.rb', line 64
def delete(options = {})
super
true
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
Legion::Logging.warn("Queue#delete precondition failed: #{e.message}") if defined?(Legion::Logging)
false
end
|
#ensure_dlx(merged_options) ⇒ Object
45
46
47
48
49
50
51
52
|
# File 'lib/legion/transport/queue.rb', line 45
def ensure_dlx(merged_options)
dlx_name = merged_options.dig(:arguments, :'x-dead-letter-exchange')
return if dlx_name.nil? || dlx_name.empty?
channel.exchange_declare(dlx_name, 'fanout', durable: true, auto_delete: false)
rescue StandardError => e
Legion::Transport.logger.warn "Failed to declare DLX #{dlx_name}: #{e.message}"
end
|
#queue_name ⇒ Object
54
55
56
57
58
59
60
61
62
|
# File 'lib/legion/transport/queue.rb', line 54
def queue_name
ancestor = self.class.ancestors.first.to_s.split('::')
name = if ancestor[5].scan(/[A-Z]/).length > 1
ancestor[5].gsub!(/(.)([A-Z])/, '\1_\2').downcase!
else
ancestor[5].downcase!
end
"#{ancestor[2].downcase}.#{name}"
end
|
#queue_options ⇒ Object
41
42
43
|
# File 'lib/legion/transport/queue.rb', line 41
def queue_options
Concurrent::Hash.new
end
|
#recreate_queue(queue) ⇒ Object
21
22
23
24
25
|
# File 'lib/legion/transport/queue.rb', line 21
def recreate_queue(queue)
Legion::Transport.logger.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
queue = ::Bunny::Queue.new(Legion::Transport::Connection.channel, queue, no_declare: true, passive: true)
queue.delete
end
|
#reject(delivery_tag, requeue: false) ⇒ Object
76
77
78
|
# File 'lib/legion/transport/queue.rb', line 76
def reject(delivery_tag, requeue: false)
channel.reject(delivery_tag, requeue)
end
|