Module: Legion::Extensions::Absorbers::Transport
- Defined in:
- lib/legion/extensions/absorbers/transport.rb
Class Method Summary collapse
- .absorber_name_from_class(klass) ⇒ Object
- .build_message(lex_name:, absorber_name:, record:) ⇒ Object
- .lex_name_from_absorber_class(klass) ⇒ Object
- .publish_absorb_request(absorber_class:, record:) ⇒ Object
- .transport_connected? ⇒ Boolean
Class Method Details
.absorber_name_from_class(klass) ⇒ Object
57 58 59 60 |
# File 'lib/legion/extensions/absorbers/transport.rb', line 57 def absorber_name_from_class(klass) klass.name.to_s.split('::').last .gsub(/([A-Z])/, '_\1').sub(/^_/, '').downcase end |
.build_message(lex_name:, absorber_name:, record:) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/legion/extensions/absorbers/transport.rb', line 27 def (lex_name:, absorber_name:, record:) input = record[:input].to_s { exchange: "lex.#{lex_name}", routing_key: "lex.#{lex_name}.absorbers.#{absorber_name}.absorb", payload: { type: 'absorb.request', version: '1.0', id: SecureRandom.uuid, absorb_id: record[:absorb_id], timestamp: Time.now.utc.iso8601, url: input.start_with?('http') ? input : nil, file_path: input.start_with?('http') ? nil : input, context: record[:context], metadata: record[:metadata] || {} } } end |
.lex_name_from_absorber_class(klass) ⇒ Object
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/legion/extensions/absorbers/transport.rb', line 46 def lex_name_from_absorber_class(klass) name = klass.name.to_s # Legion::Extensions::MicrosoftTeams::Absorbers::Meeting -> microsoft_teams # Lex::Example::Absorbers::Content -> example m = name.match(/Legion::Extensions::(\w+)::Absorbers::/) || name.match(/Lex::(\w+)::Absorbers::/) return 'unknown' unless m m[1].gsub(/([A-Z])/, '_\1').sub(/^_/, '').downcase end |
.publish_absorb_request(absorber_class:, record:) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/legion/extensions/absorbers/transport.rb', line 11 def publish_absorb_request(absorber_class:, record:) lex = lex_name_from_absorber_class(absorber_class) name = absorber_name_from_class(absorber_class) msg = (lex_name: lex, absorber_name: name, record: record) return msg unless transport_connected? exchange = Legion::Transport::Exchange.new(msg[:exchange], type: :topic, durable: true) exchange.publish( Legion::JSON.dump(msg[:payload]), routing_key: msg[:routing_key], content_type: 'application/json', message_id: record[:absorb_id] ) msg end |