Module: Legion::Extensions::Transport
Constant Summary
Helpers::Base::NAMESPACE_BOUNDARIES
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#additional_e_to_q ⇒ Object
-
#auto_create_dlx_exchange ⇒ Object
-
#auto_create_dlx_queue ⇒ Object
-
#auto_create_exchange(exchange, default_exchange: false) ⇒ Object
-
#auto_create_queue(queue) ⇒ Object
-
#auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp) ⇒ Object
-
#auto_generate_messages ⇒ Object
-
#auto_generate_runner_messages(runner_info, messages_mod, ext_amqp) ⇒ Object
-
#bind(from, to, routing_key: nil, **_options) ⇒ Object
-
#bind_e_to_q(to:, from: default_exchange, routing_key: nil) ⇒ Object
-
#build ⇒ Object
-
#build_e_to_e ⇒ Object
-
#build_e_to_q(array) ⇒ Object
-
#e_to_e ⇒ Object
-
#e_to_q ⇒ Object
-
#generate_base_modules ⇒ Object
-
#require_transport_items ⇒ Object
#handle_runner_exception
#actor_class, #actor_const, #actor_name, #amqp_prefix, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #lex_slug, #log_tag, #normalize, #runner_class, #runner_const, #runner_name, #segments, #settings_path, #table_prefix, #to_dotted_hash
Instance Attribute Details
#consumers ⇒ Object
Returns the value of attribute consumers.
9
10
11
|
# File 'lib/legion/extensions/transport.rb', line 9
def consumers
@consumers
end
|
#exchanges ⇒ Object
Returns the value of attribute exchanges.
9
10
11
|
# File 'lib/legion/extensions/transport.rb', line 9
def exchanges
@exchanges
end
|
#messages ⇒ Object
Returns the value of attribute messages.
9
10
11
|
# File 'lib/legion/extensions/transport.rb', line 9
def messages
@messages
end
|
#queues ⇒ Object
Returns the value of attribute queues.
9
10
11
|
# File 'lib/legion/extensions/transport.rb', line 9
def queues
@queues
end
|
Instance Method Details
#additional_e_to_q ⇒ Object
207
208
209
|
# File 'lib/legion/extensions/transport.rb', line 207
def additional_e_to_q
[]
end
|
#auto_create_dlx_exchange ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/legion/extensions/transport.rb', line 73
def auto_create_dlx_exchange
dlx = if transport_class::Exchanges.const_defined?('Dlx', false)
transport_class::Exchanges::Dlx
else
transport_class::Exchanges.const_set('Dlx', Class.new(default_exchange) do
def exchange_name
"#{super}.dlx"
end
def default_type
'fanout'
end
end)
end
dlx.new
end
|
#auto_create_dlx_queue ⇒ Object
91
92
93
94
95
96
97
|
# File 'lib/legion/extensions/transport.rb', line 91
def auto_create_dlx_queue
return if transport_class::Queues.const_defined?('Dlx', false)
special_name = default_exchange.new.exchange_name
dlx_queue = Legion::Transport::Queue.new "#{special_name}.dlx", auto_delete: false
dlx_queue.bind("#{special_name}.dlx", { routing_key: '#' })
end
|
#auto_create_exchange(exchange, default_exchange: false) ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/legion/extensions/transport.rb', line 51
def auto_create_exchange(exchange, default_exchange: false)
if Object.const_defined? exchange
log.warn "#{exchange} is already defined"
return
end
return build_default_exchange if default_exchange
ext_amqp = amqp_prefix
transport_class::Exchanges.const_set(exchange.split('::').pop, Class.new(Legion::Transport::Exchange) do
define_method(:exchange_name) { "#{ext_amqp}.#{self.class.to_s.split('::').last.downcase}" }
end)
end
|
#auto_create_queue(queue) ⇒ Object
64
65
66
67
68
69
70
71
|
# File 'lib/legion/extensions/transport.rb', line 64
def auto_create_queue(queue)
if Kernel.const_defined?(queue)
log.warn "#{queue} is already defined"
return
end
transport_class::Queues.const_set(queue.split('::').last, Class.new(Legion::Transport::Queue))
end
|
#auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp) ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/legion/extensions/transport.rb', line 119
def auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp)
defn = runner_module.definition_for(method_name)
return if defn.nil? || defn[:inputs].nil? || defn[:inputs].empty?
class_name = "#{runner_name.to_s.split('_').collect(&:capitalize).join}#{method_name.to_s.split('_').collect(&:capitalize).join}"
return if messages_mod.const_defined?(class_name, false)
routing_key = "#{ext_amqp}.runners.#{runner_name}.#{method_name}"
msg_class = Class.new(Legion::Transport::Message) do
define_method(:exchange_name) { ext_amqp }
define_method(:routing_key) { routing_key }
end
messages_mod.const_set(class_name, msg_class)
end
|
#auto_generate_messages ⇒ Object
99
100
101
102
103
104
105
106
107
|
# File 'lib/legion/extensions/transport.rb', line 99
def auto_generate_messages
return unless defined?(@runners) && @runners.is_a?(Hash)
messages_mod = transport_class::Messages
ext_amqp = amqp_prefix
@runners.each_value { |info| auto_generate_runner_messages(info, messages_mod, ext_amqp) }
rescue StandardError => e
log.error("[Transport] auto-generate messages failed: #{e.message}") if respond_to?(:log)
end
|
#auto_generate_runner_messages(runner_info, messages_mod, ext_amqp) ⇒ Object
109
110
111
112
113
114
115
116
117
|
# File 'lib/legion/extensions/transport.rb', line 109
def auto_generate_runner_messages(runner_info, messages_mod, ext_amqp)
runner_name = runner_info[:runner_name]
runner_module = runner_info[:runner_module]
return if runner_module.nil?
return unless runner_module.respond_to?(:definition_for)
methods = runner_module.respond_to?(:instance_methods) ? runner_module.instance_methods(false) : []
methods.each { |method_name| auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp) }
end
|
#bind(from, to, routing_key: nil, **_options) ⇒ Object
187
188
189
190
191
192
193
|
# File 'lib/legion/extensions/transport.rb', line 187
def bind(from, to, routing_key: nil, **_options)
from = from.is_a?(String) ? Kernel.const_get(from).new : from.new
to = to.is_a?(String) ? Kernel.const_get(to).new : to.new
to.bind(from, routing_key: routing_key)
rescue StandardError => e
handle_exception(e, level: :fatal, from: from, to: to, routing_key: routing_key)
end
|
#bind_e_to_q(to:, from: default_exchange, routing_key: nil) ⇒ Object
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# File 'lib/legion/extensions/transport.rb', line 149
def bind_e_to_q(to:, from: default_exchange, routing_key: nil, **)
log.debug "[transport] building auto binding exchange: #{from}, routing_key: #{routing_key}, to: #{to}"
if from.is_a? String
from = "#{transport_class}::Exchanges::#{from.tr('.', '_').split('_').collect(&:capitalize).join}" unless from.include?('::')
auto_create_exchange(from) unless Object.const_defined? from
end
if to.is_a? String
to = "#{transport_class}::Queues::#{to.tr('.', '_').split('_').collect(&:capitalize).join}" unless to.include?('::')
auto_create_queue(to) unless Object.const_defined?(to)
end
routing_key = to.to_s.split('::').last.downcase if routing_key.nil?
bind(from, to, routing_key: routing_key)
rescue StandardError => e
handle_exception(e, handled: false, level: :warn, from: from, to: to, routing_key: routing_key)
raise e
end
|
#build ⇒ Object
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/legion/extensions/transport.rb', line 11
def build
log.debug "[Transport] build start: #{lex_name}"
@queues = []
@exchanges = []
@messages = []
@consumers = []
generate_base_modules
require_transport_items
build_e_to_e
build_e_to_q(e_to_q)
build_e_to_q(additional_e_to_q)
auto_create_dlx_exchange
auto_create_dlx_queue
auto_generate_messages
log.info "[Transport] built exchanges=#{@exchanges.count} queues=#{@queues.count} for #{lex_name}"
rescue StandardError => e
log.error "[Transport] build failed for #{lex_name}"
handle_exception(e, lex: lex_name)
end
|
#build_e_to_e ⇒ Object
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
# File 'lib/legion/extensions/transport.rb', line 168
def build_e_to_e
e_to_e.each do |binding|
if binding[:from].is_a? String
binding[:from] = "#{transport_class}::Exchanges::#{binding[:from].capitalize}" unless binding[:from].include?('::')
auto_create_exchange(binding[:from]) unless Object.const_defined? binding[:from]
end
if binding[:to].is_a? String
binding[:to] = "#{transport_class}::Exchanges::#{binding[:to].capitalize}" unless binding[:to].include?('::')
auto_create_exchange(binding[:to]) unless Object.const_defined? binding[:to]
end
bind(binding[:from], binding[:to], binding)
rescue StandardError => e
handle_exception(e, handled: false, level: :warn)
raise e
end
end
|
#build_e_to_q(array) ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/legion/extensions/transport.rb', line 134
def build_e_to_q(array)
array.each do |binding|
binding[:routing_key] = nil unless binding.key? :routing_key
binding[:to] = nil unless binding.key?(:to)
binding[:from] = default_exchange if !binding.key?(:from) || binding[:from].nil?
bind_e_to_q(**binding)
rescue StandardError => e
log.warn '[transport] failed to build exchange-to-queue binding ' \
"from=#{binding[:from].inspect} to=#{binding[:to].inspect} " \
"routing_key=#{binding[:routing_key].inspect} binding=#{binding.inspect}"
handle_exception(e, handled: false, level: :warn)
raise e
end
end
|
#e_to_e ⇒ Object
203
204
205
|
# File 'lib/legion/extensions/transport.rb', line 203
def e_to_e
[]
end
|
#e_to_q ⇒ Object
195
196
197
198
199
200
201
|
# File 'lib/legion/extensions/transport.rb', line 195
def e_to_q
return [] if @exchanges.count != 1
@queues.map do |queue|
{ from: @exchanges.first, to: queue, routing_key: "#{amqp_prefix}.runners.#{queue}.#" }
end
end
|
#generate_base_modules ⇒ Object
32
33
34
35
36
37
38
39
|
# File 'lib/legion/extensions/transport.rb', line 32
def generate_base_modules
lex_class.const_set('Transport', Module.new) unless lex_class.const_defined?('Transport', false)
%w[Queues Exchanges Messages Consumers].each do |thing|
next if transport_class.const_defined?(thing, false)
transport_class.const_set(thing, Module.new)
end
end
|
#require_transport_items ⇒ Object
41
42
43
44
45
46
47
48
49
|
# File 'lib/legion/extensions/transport.rb', line 41
def require_transport_items
{ exchanges: @exchanges, queues: @queues, consumers: @consumers, messages: @messages }.each do |item, obj|
Dir[File.expand_path("#{transport_path}/#{item}/*.rb")].each do |file|
require file
file_name = file.to_s.split('/').last.split('.').first
obj.push(file_name) unless obj.include?(file_name)
end
end
end
|