Class: Legion::Transport::Queue

Inherits:
Queue
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/queue.rb

Constant Summary

Constants included from Common

Common::NAMESPACE_BOUNDARIES

Instance Method Summary collapse

Methods included from Common

#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(queue = queue_name, options = {}) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/legion/transport/queue.rb', line 8

def initialize(queue = queue_name, options = {})
  retries ||= 0
  @queue_name_arg = queue
  @options = options
  merged = options_builder(default_options, queue_options, options)
  ensure_dlx(merged)
  super(channel, queue, merged)
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.queue.initialize', queue: queue)
  identity_resolved = defined?(Legion::Identity::Process) && Legion::Identity::Process.resolved?
  raise if credential_scoping_enabled? && (bootstrap_phase? || (!topology_mode? && identity_resolved && !own_queue?))

  retries.zero? ? retries = 1 : raise
  recreate_queue(queue)
  safely_close_channel(@channel)
  @channel = Legion::Transport::Connection.channel
  retry
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Object



112
113
114
# File 'lib/legion/transport/queue.rb', line 112

def acknowledge(delivery_tag)
  channel.acknowledge(delivery_tag)
end

#default_optionsObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/legion/transport/queue.rb', line 36

def default_options
  hash = Concurrent::Hash.new
  hash[:manual_ack] = true
  hash[:durable] = true
  hash[:exclusive] = false
  hash[:block] = false
  hash[:auto_delete] = false
  is_passive = passive?
  hash[:passive] = is_passive
  if is_passive
    hash[:arguments] = {}
  else
    args = { 'x-queue-type': 'quorum' }
    args[:'x-dead-letter-exchange'] = dlx_exchange_name if dlx_enabled
    hash[:arguments] = args
  end
  hash
end

#delete(options = {}) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/legion/transport/queue.rb', line 104

def delete(options = {})
  super
  true
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.queue.delete')
  false
end

#dlx_enabledObject



78
79
80
# File 'lib/legion/transport/queue.rb', line 78

def dlx_enabled
  true
end

#dlx_exchange_nameObject



82
83
84
# File 'lib/legion/transport/queue.rb', line 82

def dlx_exchange_name
  "#{derive_segments.join('.')}.dlx"
end

#ensure_dlx(merged_options) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/legion/transport/queue.rb', line 86

def ensure_dlx(merged_options)
  return if credential_scoping_enabled? && (bootstrap_phase? || !topology_mode?)

  dlx_name = merged_options.dig(:arguments, :'x-dead-letter-exchange')
  return if dlx_name.nil? || dlx_name.empty?

  channel.exchange_declare(dlx_name, 'fanout', durable: true, auto_delete: false)
  channel.queue_declare("#{dlx_name}.queue", durable: true, auto_delete: false,
                                              arguments: { 'x-queue-type': 'classic' })
  channel.queue_bind("#{dlx_name}.queue", dlx_name, routing_key: '#')
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.queue.ensure_dlx', dlx: dlx_name)
end

#nack_or_dlq(delivery_tag, retry_count: 0, threshold: 2) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/legion/transport/queue.rb', line 120

def nack_or_dlq(delivery_tag, retry_count: 0, threshold: 2)
  if retry_count < threshold
    reject(delivery_tag, requeue: true)
  else
    reject(delivery_tag, requeue: false)
  end
end

#own_queue?Boolean

Returns:

  • (Boolean)


65
66
67
68
69
70
71
72
# File 'lib/legion/transport/queue.rb', line 65

def own_queue?
  return false unless defined?(Legion::Identity::Process) && Legion::Identity::Process.resolved?

  prefix = Legion::Identity::Process.queue_prefix
  return false if prefix.nil? || prefix.empty?

  @queue_name_arg.to_s.start_with?(prefix)
end

#passive?Boolean

Returns:

  • (Boolean)


55
56
57
58
59
60
61
62
63
# File 'lib/legion/transport/queue.rb', line 55

def passive?
  return false unless credential_scoping_enabled?
  return false unless defined?(Legion::Identity::Process)
  return true  if bootstrap_phase?
  return false if topology_mode?
  return false if own_queue?

  true
end

#queue_nameObject



100
101
102
# File 'lib/legion/transport/queue.rb', line 100

def queue_name
  "#{derive_segments.join('.')}.#{derive_leaf}"
end

#queue_optionsObject



74
75
76
# File 'lib/legion/transport/queue.rb', line 74

def queue_options
  Concurrent::Hash.new
end

#recreate_queue(queue) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/legion/transport/queue.rb', line 27

def recreate_queue(queue)
  log.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
  tmp_channel = Legion::Transport::Connection.channel
  tmp_queue = ::Bunny::Queue.new(tmp_channel, queue, no_declare: true, passive: true)
  tmp_queue.delete
ensure
  safely_close_channel(tmp_channel)
end

#reject(delivery_tag, requeue: false) ⇒ Object



116
117
118
# File 'lib/legion/transport/queue.rb', line 116

def reject(delivery_tag, requeue: false)
  channel.reject(delivery_tag, requeue)
end