Module: Legion::Transport::Helper
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/transport/helper.rb
Instance Method Summary collapse
- #build_default_exchange ⇒ Object
- #default_exchange ⇒ Object
- #exchanges ⇒ Object
- #messages ⇒ Object
- #queues ⇒ Object
-
#transport_channel ⇒ Object
— Resource Info —.
- #transport_channel_open? ⇒ Boolean
- #transport_class ⇒ Object
-
#transport_connected? ⇒ Boolean
— Status —.
-
#transport_default_ttl ⇒ Object
— TTL Resolution — Override in your LEX to set a custom default message TTL for the extension.
- #transport_lite_mode? ⇒ Boolean
-
#transport_path ⇒ Object
— Namespace / Wiring —.
-
#transport_publish(routing_key:, payload: {}, **opts) ⇒ Object
— Publish Convenience —.
- #transport_session_open? ⇒ Boolean
- #transport_spool_count ⇒ Object
Instance Method Details
#build_default_exchange ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/legion/transport/helper.rb', line 46 def build_default_exchange return transport_class::Exchanges.const_get(lex_const, false) if transport_class::Exchanges.const_defined?(lex_const, false) amqp = amqp_prefix transport_class::Exchanges.const_set(lex_const, Class.new(Legion::Transport::Exchange) do define_method(:exchange_name) { amqp } end) @default_exchange = transport_class::Exchanges.const_get(lex_const, false) end |
#default_exchange ⇒ Object
42 43 44 |
# File 'lib/legion/transport/helper.rb', line 42 def default_exchange @default_exchange ||= build_default_exchange end |
#exchanges ⇒ Object
38 39 40 |
# File 'lib/legion/transport/helper.rb', line 38 def exchanges @exchanges ||= transport_class::Exchanges end |
#messages ⇒ Object
30 31 32 |
# File 'lib/legion/transport/helper.rb', line 30 def @messages ||= transport_class::Messages end |
#queues ⇒ Object
34 35 36 |
# File 'lib/legion/transport/helper.rb', line 34 def queues @queues ||= transport_class::Queues end |
#transport_channel ⇒ Object
— Resource Info —
85 86 87 |
# File 'lib/legion/transport/helper.rb', line 85 def transport_channel Legion::Transport::Connection.channel end |
#transport_channel_open? ⇒ Boolean
72 73 74 75 76 77 |
# File 'lib/legion/transport/helper.rb', line 72 def transport_channel_open? Legion::Transport::Connection.channel_open? rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_channel_open) false end |
#transport_class ⇒ Object
26 27 28 |
# File 'lib/legion/transport/helper.rb', line 26 def transport_class @transport_class ||= lex_class::Transport end |
#transport_connected? ⇒ Boolean
— Status —
58 59 60 61 62 63 |
# File 'lib/legion/transport/helper.rb', line 58 def transport_connected? !!Legion::Settings.dig(:transport, :connected) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_connected) false end |
#transport_default_ttl ⇒ Object
— TTL Resolution — Override in your LEX to set a custom default message TTL for the extension. Resolution chain: per-call :ttl option -> LEX override -> Settings -> nil (no expiration)
13 14 15 16 17 18 |
# File 'lib/legion/transport/helper.rb', line 13 def transport_default_ttl Legion::Settings.dig(:transport, :messages, :ttl) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_default_ttl) nil end |
#transport_lite_mode? ⇒ Boolean
79 80 81 |
# File 'lib/legion/transport/helper.rb', line 79 def transport_lite_mode? Legion::Transport::Connection.lite_mode? end |
#transport_path ⇒ Object
— Namespace / Wiring —
22 23 24 |
# File 'lib/legion/transport/helper.rb', line 22 def transport_path @transport_path ||= "#{full_path}/transport" end |
#transport_publish(routing_key:, payload: {}, **opts) ⇒ Object
— Publish Convenience —
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/legion/transport/helper.rb', line 98 def transport_publish(routing_key:, payload: {}, **opts) return false unless transport_connected? if opts.key?(:ttl) ttl = opts.delete(:ttl) opts[:expiration] = ttl.to_s if ttl elsif !opts.key?(:expiration) ttl = transport_default_ttl opts[:expiration] = ttl.to_s if ttl end encoded = payload.is_a?(String) ? payload : Legion::JSON.dump(payload) exchange = default_exchange.cached_instance || default_exchange.new exchange.publish(encoded, routing_key: routing_key, **opts) true rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_publish, routing_key: routing_key) false end |
#transport_session_open? ⇒ Boolean
65 66 67 68 69 70 |
# File 'lib/legion/transport/helper.rb', line 65 def transport_session_open? Legion::Transport::Connection.session_open? rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_session_open) false end |