Module: Legion::Transport::Helper
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/transport/helper.rb
Instance Method Summary collapse
- #build_default_exchange ⇒ Object
- #default_exchange ⇒ Object
- #exchanges ⇒ Object
- #messages ⇒ Object
- #queues ⇒ Object
-
#transport_channel ⇒ Object
— Resource Info —.
- #transport_channel_open? ⇒ Boolean
- #transport_class ⇒ Object
-
#transport_connected? ⇒ Boolean
— Status —.
-
#transport_default_ttl ⇒ Object
— TTL Resolution — Override in your LEX to set a custom default message TTL for the extension.
- #transport_lite_mode? ⇒ Boolean
-
#transport_path ⇒ Object
— Namespace / Wiring —.
-
#transport_publish(routing_key:, payload: {}, **opts) ⇒ Object
— Publish Convenience —.
- #transport_session_open? ⇒ Boolean
- #transport_spool_count ⇒ Object
Instance Method Details
#build_default_exchange ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/legion/transport/helper.rb', line 48 def build_default_exchange return transport_class::Exchanges.const_get(lex_const, false) if transport_class::Exchanges.const_defined?(lex_const, false) amqp = amqp_prefix transport_class::Exchanges.const_set(lex_const, Class.new(Legion::Transport::Exchange) do define_method(:exchange_name) { amqp } end) @default_exchange = transport_class::Exchanges.const_get(lex_const, false) end |
#default_exchange ⇒ Object
44 45 46 |
# File 'lib/legion/transport/helper.rb', line 44 def default_exchange @default_exchange ||= build_default_exchange end |
#exchanges ⇒ Object
40 41 42 |
# File 'lib/legion/transport/helper.rb', line 40 def exchanges @exchanges ||= transport_class::Exchanges end |
#messages ⇒ Object
32 33 34 |
# File 'lib/legion/transport/helper.rb', line 32 def @messages ||= transport_class::Messages end |
#queues ⇒ Object
36 37 38 |
# File 'lib/legion/transport/helper.rb', line 36 def queues @queues ||= transport_class::Queues end |
#transport_channel ⇒ Object
— Resource Info —
89 90 91 |
# File 'lib/legion/transport/helper.rb', line 89 def transport_channel Legion::Transport::Connection.channel end |
#transport_channel_open? ⇒ Boolean
76 77 78 79 80 81 |
# File 'lib/legion/transport/helper.rb', line 76 def transport_channel_open? Legion::Transport::Connection.channel_open? rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_channel_open) false end |
#transport_class ⇒ Object
28 29 30 |
# File 'lib/legion/transport/helper.rb', line 28 def transport_class @transport_class ||= lex_class::Transport end |
#transport_connected? ⇒ Boolean
— Status —
60 61 62 63 64 65 66 67 |
# File 'lib/legion/transport/helper.rb', line 60 def transport_connected? return false unless defined?(Legion::Settings) !!Legion::Settings.dig(:transport, :connected) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_connected) false end |
#transport_default_ttl ⇒ Object
— TTL Resolution — Override in your LEX to set a custom default message TTL for the extension. Resolution chain: per-call :ttl option -> LEX override -> Settings -> nil (no expiration)
13 14 15 16 17 18 19 20 |
# File 'lib/legion/transport/helper.rb', line 13 def transport_default_ttl return nil unless defined?(Legion::Settings) Legion::Settings.dig(:transport, :messages, :ttl) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_default_ttl) nil end |
#transport_lite_mode? ⇒ Boolean
83 84 85 |
# File 'lib/legion/transport/helper.rb', line 83 def transport_lite_mode? Legion::Transport::Connection.lite_mode? end |
#transport_path ⇒ Object
— Namespace / Wiring —
24 25 26 |
# File 'lib/legion/transport/helper.rb', line 24 def transport_path @transport_path ||= "#{full_path}/transport" end |
#transport_publish(routing_key:, payload: {}, **opts) ⇒ Object
— Publish Convenience —
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/legion/transport/helper.rb', line 102 def transport_publish(routing_key:, payload: {}, **opts) return false unless transport_connected? if opts.key?(:ttl) ttl = opts.delete(:ttl) opts[:expiration] = ttl.to_s if ttl elsif !opts.key?(:expiration) ttl = transport_default_ttl opts[:expiration] = ttl.to_s if ttl end encoded = payload.is_a?(String) ? payload : Legion::JSON.dump(payload) exchange = default_exchange.cached_instance || default_exchange.new exchange.publish(encoded, routing_key: routing_key, **opts) true rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_publish, routing_key: routing_key) false end |
#transport_session_open? ⇒ Boolean
69 70 71 72 73 74 |
# File 'lib/legion/transport/helper.rb', line 69 def transport_session_open? Legion::Transport::Connection.session_open? rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: :transport_session_open) false end |