Module: Legion::Extensions::Transport

Includes:
Helpers::Logger, Helpers::Transport
Defined in:
lib/legion/extensions/transport.rb

Constant Summary

Constants included from Helpers::Base

Helpers::Base::NAMESPACE_BOUNDARIES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Logger

#handle_runner_exception

Methods included from Helpers::Base

#actor_class, #actor_const, #actor_name, #amqp_prefix, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #lex_slug, #log_tag, #normalize, #runner_class, #runner_const, #runner_name, #segments, #settings_path, #table_prefix, #to_dotted_hash

Instance Attribute Details

#consumersObject

Returns the value of attribute consumers.



9
10
11
# File 'lib/legion/extensions/transport.rb', line 9

def consumers
  @consumers
end

#exchangesObject

Returns the value of attribute exchanges.



9
10
11
# File 'lib/legion/extensions/transport.rb', line 9

def exchanges
  @exchanges
end

#messagesObject

Returns the value of attribute messages.



9
10
11
# File 'lib/legion/extensions/transport.rb', line 9

def messages
  @messages
end

#queuesObject

Returns the value of attribute queues.



9
10
11
# File 'lib/legion/extensions/transport.rb', line 9

def queues
  @queues
end

Instance Method Details

#additional_e_to_qObject



207
208
209
# File 'lib/legion/extensions/transport.rb', line 207

def additional_e_to_q
  []
end

#auto_create_dlx_exchangeObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/legion/extensions/transport.rb', line 73

def auto_create_dlx_exchange
  dlx = if transport_class::Exchanges.const_defined?('Dlx', false)
          transport_class::Exchanges::Dlx
        else
          transport_class::Exchanges.const_set('Dlx', Class.new(default_exchange) do
            def exchange_name
              "#{super}.dlx"
            end

            def default_type
              'fanout'
            end
          end)
        end

  dlx.new
end

#auto_create_dlx_queueObject



91
92
93
94
95
96
97
# File 'lib/legion/extensions/transport.rb', line 91

def auto_create_dlx_queue
  return if transport_class::Queues.const_defined?('Dlx', false)

  special_name = default_exchange.new.exchange_name
  dlx_queue = Legion::Transport::Queue.new "#{special_name}.dlx", auto_delete: false
  dlx_queue.bind("#{special_name}.dlx", { routing_key: '#' })
end

#auto_create_exchange(exchange, default_exchange: false) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/legion/extensions/transport.rb', line 51

def auto_create_exchange(exchange, default_exchange: false)
  if Object.const_defined? exchange
    log.warn "#{exchange} is already defined"
    return
  end
  return build_default_exchange if default_exchange

  ext_amqp = amqp_prefix
  transport_class::Exchanges.const_set(exchange.split('::').pop, Class.new(Legion::Transport::Exchange) do
    define_method(:exchange_name) { "#{ext_amqp}.#{self.class.to_s.split('::').last.downcase}" }
  end)
end

#auto_create_queue(queue) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/legion/extensions/transport.rb', line 64

def auto_create_queue(queue)
  if Kernel.const_defined?(queue)
    log.warn "#{queue} is already defined"
    return
  end

  transport_class::Queues.const_set(queue.split('::').last, Class.new(Legion::Transport::Queue))
end

#auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/legion/extensions/transport.rb', line 119

def auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp)
  defn = runner_module.definition_for(method_name)
  return if defn.nil? || defn[:inputs].nil? || defn[:inputs].empty?

  class_name = "#{runner_name.to_s.split('_').collect(&:capitalize).join}#{method_name.to_s.split('_').collect(&:capitalize).join}"
  return if messages_mod.const_defined?(class_name, false)

  routing_key = "#{ext_amqp}.runners.#{runner_name}.#{method_name}"
  msg_class = Class.new(Legion::Transport::Message) do
    define_method(:exchange_name) { ext_amqp }
    define_method(:routing_key) { routing_key }
  end
  messages_mod.const_set(class_name, msg_class)
end

#auto_generate_messagesObject



99
100
101
102
103
104
105
106
107
# File 'lib/legion/extensions/transport.rb', line 99

def auto_generate_messages
  return unless defined?(@runners) && @runners.is_a?(Hash)

  messages_mod = transport_class::Messages
  ext_amqp = amqp_prefix
  @runners.each_value { |info| auto_generate_runner_messages(info, messages_mod, ext_amqp) }
rescue StandardError => e
  log.error("[Transport] auto-generate messages failed: #{e.message}") if respond_to?(:log)
end

#auto_generate_runner_messages(runner_info, messages_mod, ext_amqp) ⇒ Object



109
110
111
112
113
114
115
116
117
# File 'lib/legion/extensions/transport.rb', line 109

def auto_generate_runner_messages(runner_info, messages_mod, ext_amqp)
  runner_name = runner_info[:runner_name]
  runner_module = runner_info[:runner_module]
  return if runner_module.nil?
  return unless runner_module.respond_to?(:definition_for)

  methods = runner_module.respond_to?(:instance_methods) ? runner_module.instance_methods(false) : []
  methods.each { |method_name| auto_generate_message(runner_name, method_name, runner_module, messages_mod, ext_amqp) }
end

#bind(from, to, routing_key: nil, **_options) ⇒ Object



187
188
189
190
191
192
193
# File 'lib/legion/extensions/transport.rb', line 187

def bind(from, to, routing_key: nil, **_options)
  from = from.is_a?(String) ? Kernel.const_get(from).new : from.new
  to = to.is_a?(String) ? Kernel.const_get(to).new : to.new
  to.bind(from, routing_key: routing_key)
rescue StandardError => e
  handle_exception(e, level: :fatal, from: from, to: to, routing_key: routing_key)
end

#bind_e_to_q(to:, from: default_exchange, routing_key: nil) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/legion/extensions/transport.rb', line 149

def bind_e_to_q(to:, from: default_exchange, routing_key: nil, **)
  log.debug "[transport] building auto binding exchange: #{from}, routing_key: #{routing_key}, to: #{to}"
  if from.is_a? String
    from = "#{transport_class}::Exchanges::#{from.tr('.', '_').split('_').collect(&:capitalize).join}" unless from.include?('::')
    auto_create_exchange(from) unless Object.const_defined? from
  end

  if to.is_a? String
    to = "#{transport_class}::Queues::#{to.tr('.', '_').split('_').collect(&:capitalize).join}" unless to.include?('::')
    auto_create_queue(to) unless Object.const_defined?(to)
  end

  routing_key = to.to_s.split('::').last.downcase if routing_key.nil?
  bind(from, to, routing_key: routing_key)
rescue StandardError => e
  handle_exception(e, handled: false, level: :warn, from: from, to: to, routing_key: routing_key)
  raise e
end

#buildObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/legion/extensions/transport.rb', line 11

def build
  log.debug "[Transport] build start: #{lex_name}"
  @queues = []
  @exchanges = []
  @messages = []
  @consumers = []
  generate_base_modules
  require_transport_items

  build_e_to_e
  build_e_to_q(e_to_q)
  build_e_to_q(additional_e_to_q)
  auto_create_dlx_exchange
  auto_create_dlx_queue
  auto_generate_messages
  log.info "[Transport] built exchanges=#{@exchanges.count} queues=#{@queues.count} for #{lex_name}"
rescue StandardError => e
  log.error "[Transport] build failed for #{lex_name}"
  handle_exception(e, lex: lex_name)
end

#build_e_to_eObject



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/legion/extensions/transport.rb', line 168

def build_e_to_e
  e_to_e.each do |binding|
    if binding[:from].is_a? String
      binding[:from] = "#{transport_class}::Exchanges::#{binding[:from].capitalize}" unless binding[:from].include?('::')
      auto_create_exchange(binding[:from]) unless Object.const_defined? binding[:from]
    end

    if binding[:to].is_a? String
      binding[:to] = "#{transport_class}::Exchanges::#{binding[:to].capitalize}" unless binding[:to].include?('::')
      auto_create_exchange(binding[:to]) unless Object.const_defined? binding[:to]
    end

    bind(binding[:from], binding[:to], binding)
  rescue StandardError => e
    handle_exception(e, handled: false, level: :warn)
    raise e
  end
end

#build_e_to_q(array) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/legion/extensions/transport.rb', line 134

def build_e_to_q(array)
  array.each do |binding|
    binding[:routing_key] = nil unless binding.key? :routing_key
    binding[:to] = nil unless binding.key?(:to)
    binding[:from] = default_exchange if !binding.key?(:from) || binding[:from].nil?
    bind_e_to_q(**binding)
  rescue StandardError => e
    log.warn '[transport] failed to build exchange-to-queue binding ' \
             "from=#{binding[:from].inspect} to=#{binding[:to].inspect} " \
             "routing_key=#{binding[:routing_key].inspect} binding=#{binding.inspect}"
    handle_exception(e, handled: false, level: :warn)
    raise e
  end
end

#e_to_eObject



203
204
205
# File 'lib/legion/extensions/transport.rb', line 203

def e_to_e
  []
end

#e_to_qObject



195
196
197
198
199
200
201
# File 'lib/legion/extensions/transport.rb', line 195

def e_to_q
  return [] if @exchanges.count != 1

  @queues.map do |queue|
    { from: @exchanges.first, to: queue, routing_key: "#{amqp_prefix}.runners.#{queue}.#" }
  end
end

#generate_base_modulesObject



32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/transport.rb', line 32

def generate_base_modules
  lex_class.const_set('Transport', Module.new) unless lex_class.const_defined?('Transport', false)
  %w[Queues Exchanges Messages Consumers].each do |thing|
    next if transport_class.const_defined?(thing, false)

    transport_class.const_set(thing, Module.new)
  end
end

#require_transport_itemsObject



41
42
43
44
45
46
47
48
49
# File 'lib/legion/extensions/transport.rb', line 41

def require_transport_items
  { exchanges: @exchanges, queues: @queues, consumers: @consumers, messages: @messages }.each do |item, obj|
    Dir[File.expand_path("#{transport_path}/#{item}/*.rb")].each do |file|
      require file
      file_name = file.to_s.split('/').last.split('.').first
      obj.push(file_name) unless obj.include?(file_name)
    end
  end
end