Class: Legion::Transport::Queue
- Inherits:
-
Queue
- Object
- Queue
- Legion::Transport::Queue
show all
- Includes:
- Common
- Defined in:
- lib/legion/transport/queue.rb
Constant Summary
Constants included
from Common
Common::NAMESPACE_BOUNDARIES
Instance Method Summary
collapse
Methods included from Common
#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(queue = queue_name, options = {}) ⇒ Queue
Returns a new instance of Queue.
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/legion/transport/queue.rb', line 8
def initialize(queue = queue_name, options = {})
retries ||= 0
@queue_name_arg = queue
@options = options
merged = options_builder(default_options, queue_options, options)
ensure_dlx(merged)
super(channel, queue, merged)
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.queue.initialize', queue: queue)
identity_resolved = defined?(Legion::Identity::Process) && Legion::Identity::Process.resolved?
raise if credential_scoping_enabled? && (bootstrap_phase? || (!topology_mode? && identity_resolved && !own_queue?))
retries.zero? ? retries = 1 : raise
recreate_queue(queue)
safely_close_channel(@channel)
@channel = Legion::Transport::Connection.channel
retry
end
|
Instance Method Details
#acknowledge(delivery_tag) ⇒ Object
112
113
114
|
# File 'lib/legion/transport/queue.rb', line 112
def acknowledge(delivery_tag)
channel.acknowledge(delivery_tag)
end
|
#default_options ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/legion/transport/queue.rb', line 36
def default_options
hash = Concurrent::Hash.new
hash[:manual_ack] = true
hash[:durable] = true
hash[:exclusive] = false
hash[:block] = false
hash[:auto_delete] = false
is_passive = passive?
hash[:passive] = is_passive
if is_passive
hash[:arguments] = {}
else
args = { 'x-queue-type': 'quorum' }
args[:'x-dead-letter-exchange'] = dlx_exchange_name if dlx_enabled
hash[:arguments] = args
end
hash
end
|
#delete(options = {}) ⇒ Object
104
105
106
107
108
109
110
|
# File 'lib/legion/transport/queue.rb', line 104
def delete(options = {})
super
true
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.queue.delete')
false
end
|
#dlx_enabled ⇒ Object
78
79
80
|
# File 'lib/legion/transport/queue.rb', line 78
def dlx_enabled
true
end
|
#dlx_exchange_name ⇒ Object
82
83
84
|
# File 'lib/legion/transport/queue.rb', line 82
def dlx_exchange_name
"#{derive_segments.join('.')}.dlx"
end
|
#ensure_dlx(merged_options) ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/legion/transport/queue.rb', line 86
def ensure_dlx(merged_options)
return if credential_scoping_enabled? && (bootstrap_phase? || !topology_mode?)
dlx_name = merged_options.dig(:arguments, :'x-dead-letter-exchange')
return if dlx_name.nil? || dlx_name.empty?
channel.exchange_declare(dlx_name, 'fanout', durable: true, auto_delete: false)
channel.queue_declare("#{dlx_name}.queue", durable: true, auto_delete: false,
arguments: { 'x-queue-type': 'classic' })
channel.queue_bind("#{dlx_name}.queue", dlx_name, routing_key: '#')
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.queue.ensure_dlx', dlx: dlx_name)
end
|
#nack_or_dlq(delivery_tag, retry_count: 0, threshold: 2) ⇒ Object
120
121
122
123
124
125
126
|
# File 'lib/legion/transport/queue.rb', line 120
def nack_or_dlq(delivery_tag, retry_count: 0, threshold: 2)
if retry_count < threshold
reject(delivery_tag, requeue: true)
else
reject(delivery_tag, requeue: false)
end
end
|
#own_queue? ⇒ Boolean
65
66
67
68
69
70
71
72
|
# File 'lib/legion/transport/queue.rb', line 65
def own_queue?
return false unless defined?(Legion::Identity::Process) && Legion::Identity::Process.resolved?
prefix = Legion::Identity::Process.queue_prefix
return false if prefix.nil? || prefix.empty?
@queue_name_arg.to_s.start_with?(prefix)
end
|
#passive? ⇒ Boolean
55
56
57
58
59
60
61
62
63
|
# File 'lib/legion/transport/queue.rb', line 55
def passive?
return false unless credential_scoping_enabled?
return false unless defined?(Legion::Identity::Process)
return true if bootstrap_phase?
return false if topology_mode?
return false if own_queue?
true
end
|
#queue_name ⇒ Object
100
101
102
|
# File 'lib/legion/transport/queue.rb', line 100
def queue_name
"#{derive_segments.join('.')}.#{derive_leaf}"
end
|
#queue_options ⇒ Object
74
75
76
|
# File 'lib/legion/transport/queue.rb', line 74
def queue_options
Concurrent::Hash.new
end
|
#recreate_queue(queue) ⇒ Object
27
28
29
30
31
32
33
34
|
# File 'lib/legion/transport/queue.rb', line 27
def recreate_queue(queue)
log.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
tmp_channel = Legion::Transport::Connection.channel
tmp_queue = ::Bunny::Queue.new(tmp_channel, queue, no_declare: true, passive: true)
tmp_queue.delete
ensure
safely_close_channel(tmp_channel)
end
|
#reject(delivery_tag, requeue: false) ⇒ Object
116
117
118
|
# File 'lib/legion/transport/queue.rb', line 116
def reject(delivery_tag, requeue: false)
channel.reject(delivery_tag, requeue)
end
|