Class: Legion::Transport::Exchange

Inherits:
Exchange
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/exchange.rb

Constant Summary

Constants included from Common

Common::NAMESPACE_BOUNDARIES

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Common

#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(exchange = exchange_name, options = {}) ⇒ Exchange

Returns a new instance of Exchange.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/legion/transport/exchange.rb', line 35

def initialize(exchange = exchange_name, options = {})
  @options = options
  @explicit_channel = @options.delete(:channel)
  @type = options[:type] || default_type
  super(channel, @type, exchange, options_builder(default_options, exchange_options, @options))
  self.class.cache_instance(self) if self.class.respond_to?(:cache_instance)
rescue Legion::Transport::CONNECTOR::PreconditionFailed, Legion::Transport::CONNECTOR::ChannelAlreadyClosed => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.exchange.initialize', exchange: exchange)
  raise unless @retries.nil?
  raise if credential_scoping_enabled? && (bootstrap_phase? || (!topology_mode? && Legion::Identity::Process.resolved?))

  @retries = 1
  # Only close the channel if it was not explicitly provided by the caller.
  safely_close_channel(@channel) if @explicit_channel.nil? || @channel != @explicit_channel
  delete_exchange(exchange)
  retry
end

Class Method Details

.cache_instance(inst) ⇒ Object



26
27
28
# File 'lib/legion/transport/exchange.rb', line 26

def cache_instance(inst)
  instance_cache.value[name] = inst
end

.cached_instanceObject



17
18
19
20
21
22
23
24
# File 'lib/legion/transport/exchange.rb', line 17

def cached_instance
  cache = instance_cache.value
  inst = cache[name]
  return inst if inst&.channel&.open?

  cache.delete(name)
  nil
end

.clear_cacheObject



30
31
32
# File 'lib/legion/transport/exchange.rb', line 30

def clear_cache
  instance_cache.value.clear
end

.instance_cacheObject



13
14
15
# File 'lib/legion/transport/exchange.rb', line 13

def instance_cache
  Legion::Transport::Exchange.instance_variable_get(:@instance_cache)
end

Instance Method Details

#channelObject



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/legion/transport/exchange.rb', line 97

def channel
  @channel ||= @explicit_channel || Legion::Transport::Connection.channel
rescue Legion::Transport::CONNECTOR::ChannelLevelException => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.exchange.channel')
  # Prefer closing the channel from the exception (available even when @channel is nil
  # because the exception was raised before assignment completed).
  error_channel = e.respond_to?(:channel) ? e.channel : @channel
  safely_close_channel(error_channel)
  @channel = Legion::Transport::Connection.channel
  raise e unless @channel.open?

  @channel
end

#default_optionsObject



59
60
61
62
63
64
65
66
# File 'lib/legion/transport/exchange.rb', line 59

def default_options
  hash = Concurrent::Hash.new
  hash[:durable] = true
  hash[:auto_delete] = false
  hash[:arguments] = {}
  hash[:passive] = passive?
  hash
end

#default_typeObject



93
94
95
# File 'lib/legion/transport/exchange.rb', line 93

def default_type
  'topic'
end

#delete(options = {}) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/legion/transport/exchange.rb', line 85

def delete(options = {})
  super
  true
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.exchange.delete')
  false
end

#delete_exchange(exchange) ⇒ Object



53
54
55
56
57
# File 'lib/legion/transport/exchange.rb', line 53

def delete_exchange(exchange)
  log.warn "Exchange:#{exchange} exists with wrong parameters, deleting and creating"
  @channel = Legion::Transport::Connection.channel
  @channel.exchange_delete(exchange)
end

#exchange_nameObject



77
78
79
# File 'lib/legion/transport/exchange.rb', line 77

def exchange_name
  derive_segments.join('.')
end

#exchange_optionsObject



81
82
83
# File 'lib/legion/transport/exchange.rb', line 81

def exchange_options
  Concurrent::Hash.new
end

#passive?Boolean

Returns:

  • (Boolean)


68
69
70
71
72
73
74
75
# File 'lib/legion/transport/exchange.rb', line 68

def passive?
  return false unless credential_scoping_enabled?
  return false unless defined?(Legion::Identity::Process)
  return true  if bootstrap_phase?
  return false if topology_mode?

  true
end