Class: Legion::Transport::Exchange
- Inherits:
-
Exchange
- Object
- Exchange
- Legion::Transport::Exchange
show all
- Includes:
- Common
- Defined in:
- lib/legion/transport/exchange.rb
Constant Summary
Constants included
from Common
Common::NAMESPACE_BOUNDARIES
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Common
#channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(exchange = exchange_name, options = {}) ⇒ Exchange
Returns a new instance of Exchange.
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/legion/transport/exchange.rb', line 35
def initialize(exchange = exchange_name, options = {})
@options = options
@explicit_channel = @options.delete(:channel)
@type = options[:type] || default_type
super(channel, @type, exchange, options_builder(default_options, exchange_options, @options))
self.class.cache_instance(self) if self.class.respond_to?(:cache_instance)
rescue Legion::Transport::CONNECTOR::PreconditionFailed, Legion::Transport::CONNECTOR::ChannelAlreadyClosed => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.exchange.initialize', exchange: exchange)
raise unless @retries.nil?
raise if credential_scoping_enabled? && (bootstrap_phase? || (!topology_mode? && Legion::Identity::Process.resolved?))
@retries = 1
safely_close_channel(@channel) if @explicit_channel.nil? || @channel != @explicit_channel
delete_exchange(exchange)
retry
end
|
Class Method Details
.cache_instance(inst) ⇒ Object
26
27
28
|
# File 'lib/legion/transport/exchange.rb', line 26
def cache_instance(inst)
instance_cache.value[name] = inst
end
|
.cached_instance ⇒ Object
17
18
19
20
21
22
23
24
|
# File 'lib/legion/transport/exchange.rb', line 17
def cached_instance
cache = instance_cache.value
inst = cache[name]
return inst if inst&.channel&.open?
cache.delete(name)
nil
end
|
.clear_cache ⇒ Object
30
31
32
|
# File 'lib/legion/transport/exchange.rb', line 30
def clear_cache
instance_cache.value.clear
end
|
.instance_cache ⇒ Object
13
14
15
|
# File 'lib/legion/transport/exchange.rb', line 13
def instance_cache
Legion::Transport::Exchange.instance_variable_get(:@instance_cache)
end
|
Instance Method Details
#channel ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/legion/transport/exchange.rb', line 97
def channel
@channel ||= @explicit_channel || Legion::Transport::Connection.channel
rescue Legion::Transport::CONNECTOR::ChannelLevelException => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.exchange.channel')
error_channel = e.respond_to?(:channel) ? e.channel : @channel
safely_close_channel(error_channel)
@channel = Legion::Transport::Connection.channel
raise e unless @channel.open?
@channel
end
|
#default_options ⇒ Object
59
60
61
62
63
64
65
66
|
# File 'lib/legion/transport/exchange.rb', line 59
def default_options
hash = Concurrent::Hash.new
hash[:durable] = true
hash[:auto_delete] = false
hash[:arguments] = {}
hash[:passive] = passive?
hash
end
|
#default_type ⇒ Object
93
94
95
|
# File 'lib/legion/transport/exchange.rb', line 93
def default_type
'topic'
end
|
#delete(options = {}) ⇒ Object
85
86
87
88
89
90
91
|
# File 'lib/legion/transport/exchange.rb', line 85
def delete(options = {})
super
true
rescue Legion::Transport::CONNECTOR::PreconditionFailed => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.exchange.delete')
false
end
|
#delete_exchange(exchange) ⇒ Object
53
54
55
56
57
|
# File 'lib/legion/transport/exchange.rb', line 53
def delete_exchange(exchange)
log.warn "Exchange:#{exchange} exists with wrong parameters, deleting and creating"
@channel = Legion::Transport::Connection.channel
@channel.exchange_delete(exchange)
end
|
#exchange_name ⇒ Object
77
78
79
|
# File 'lib/legion/transport/exchange.rb', line 77
def exchange_name
derive_segments.join('.')
end
|
#exchange_options ⇒ Object
81
82
83
|
# File 'lib/legion/transport/exchange.rb', line 81
def exchange_options
Concurrent::Hash.new
end
|
#passive? ⇒ Boolean
68
69
70
71
72
73
74
75
|
# File 'lib/legion/transport/exchange.rb', line 68
def passive?
return false unless credential_scoping_enabled?
return false unless defined?(Legion::Identity::Process)
return true if bootstrap_phase?
return false if topology_mode?
true
end
|