Class: Legion::Extensions::Llm::Transport::Messages::FleetRequest
- Inherits:
-
Transport::Message
- Object
- Transport::Message
- Legion::Extensions::Llm::Transport::Messages::FleetRequest
- Includes:
- Fleet::EnvelopeValidation, Fleet::PublishSafety
- Defined in:
- lib/legion/extensions/llm/transport/messages/fleet_request.rb
Overview
Strict protocol-v2 request envelope for outbound fleet work.
Constant Summary collapse
- PRIORITY_MAP =
{ critical: 9, high: 7, normal: 5, low: 2 }.freeze
- DEFAULT_PUBLISH_OPTIONS =
{ mandatory: true, publisher_confirm: true, spool: false, return_result: true }.freeze
- REQUIRED_OPTIONS =
%i[ request_id correlation_id operation provider provider_instance model params reply_to message_context caller trace_context signed_token timeout_seconds expires_at protocol_version idempotency_key ].freeze
Constants included from Fleet::EnvelopeValidation
Fleet::EnvelopeValidation::LEGACY_OPTIONS
Instance Method Summary collapse
- #app_id ⇒ Object
- #correlation_id ⇒ Object
- #exchange ⇒ Object
- #expiration ⇒ Object
- #message ⇒ Object
- #message_id ⇒ Object
- #priority ⇒ Object
- #publish(options = nil) ⇒ Object
- #reply_to ⇒ Object
- #routing_key ⇒ Object
- #type ⇒ Object
- #validate ⇒ Object
Instance Method Details
#app_id ⇒ Object
34 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 34 def app_id = @options[:app_id] || 'lex-llm' |
#correlation_id ⇒ Object
36 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 36 def correlation_id = @options[:correlation_id] |
#exchange ⇒ Object
32 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 32 def exchange = Exchanges::Fleet |
#expiration ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 47 def expiration ttl = @options[:ttl] || @options[:timeout_seconds] return super unless ttl (Float(ttl) * 1000).ceil.to_s rescue ArgumentError, TypeError super end |
#message ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 85 def super.merge( protocol_version: @options[:protocol_version], request_id: @options[:request_id], correlation_id: correlation_id, idempotency_key: @options[:idempotency_key], operation: @options[:operation], provider: @options[:provider], provider_instance: @options[:provider_instance], model: @options[:model], params: @options[:params] || {}, reply_to: reply_to, message_context: @options[:message_context], caller: @options[:caller], trace_context: @options[:trace_context], signed_token: @options[:signed_token], timeout_seconds: @options[:timeout_seconds], expires_at: @options[:expires_at] ).compact end |
#message_id ⇒ Object
37 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 37 def = @options[:message_id] ||= "llm_fleet_req_#{SecureRandom.uuid}" |
#priority ⇒ Object
39 40 41 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 39 def priority PRIORITY_MAP.fetch(@options[:priority].to_sym, 5) if @options[:priority] end |
#publish(options = nil) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 56 def publish( = nil) raise unless @valid = DEFAULT_PUBLISH_OPTIONS.merge(@options).merge( || {}) return_result = return_publish_result?() = () validate_payload_size exchange_dest = fleet_exchange return_state = {} install_return_listener(exchange_dest, , return_state) prepare_publisher_confirms(exchange_dest, ) exchange_dest.publish(, **) return nil unless return_result publish_result(exchange_dest, .merge(), return_state) rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError, Bunny::NetworkErrorWrapper, IOError, Timeout::Error => e handle_exception(e, level: :warn, handled: true, operation: 'llm.fleet.request.publish') publish_failure_result(:failed, e, || || @options) end |
#reply_to ⇒ Object
35 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 35 def reply_to = @options[:reply_to] |
#routing_key ⇒ Object
43 44 45 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 43 def routing_key @options[:routing_key] || raise(ArgumentError, 'routing_key is required') end |
#type ⇒ Object
33 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 33 def type = Fleet::Protocol::REQUEST_TYPE |
#validate ⇒ Object
77 78 79 80 81 82 83 |
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 77 def validate require_option!(:routing_key) REQUIRED_OPTIONS.each { |key| require_option!(key) } require_protocol_version! @valid = true end |