Class: Legion::Extensions::Llm::Transport::Messages::FleetRequest

Inherits:
Transport::Message
  • Object
show all
Includes:
Fleet::EnvelopeValidation, Fleet::PublishSafety
Defined in:
lib/legion/extensions/llm/transport/messages/fleet_request.rb

Overview

Strict protocol-v2 request envelope for outbound fleet work.

Constant Summary collapse

PRIORITY_MAP =
{ critical: 9, high: 7, normal: 5, low: 2 }.freeze
DEFAULT_PUBLISH_OPTIONS =
{
  mandatory: true,
  publisher_confirm: true,
  spool: false,
  return_result: true
}.freeze
REQUIRED_OPTIONS =
%i[
  request_id correlation_id operation provider provider_instance model params reply_to
  message_context caller trace_context signed_token timeout_seconds expires_at protocol_version
  idempotency_key
].freeze

Constants included from Fleet::EnvelopeValidation

Fleet::EnvelopeValidation::LEGACY_OPTIONS

Instance Method Summary collapse

Instance Method Details

#app_idObject



34
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 34

def app_id = @options[:app_id] || 'lex-llm'

#correlation_idObject



36
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 36

def correlation_id = @options[:correlation_id]

#exchangeObject



32
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 32

def exchange = Exchanges::Fleet

#expirationObject



47
48
49
50
51
52
53
54
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 47

def expiration
  ttl = @options[:ttl] || @options[:timeout_seconds]
  return super unless ttl

  (Float(ttl) * 1000).ceil.to_s
rescue ArgumentError, TypeError
  super
end

#messageObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 85

def message
  super.merge(
    protocol_version: @options[:protocol_version],
    request_id: @options[:request_id],
    correlation_id: correlation_id,
    idempotency_key: @options[:idempotency_key],
    operation: @options[:operation],
    provider: @options[:provider],
    provider_instance: @options[:provider_instance],
    model: @options[:model],
    params: @options[:params] || {},
    reply_to: reply_to,
    message_context: @options[:message_context],
    caller: @options[:caller],
    trace_context: @options[:trace_context],
    signed_token: @options[:signed_token],
    timeout_seconds: @options[:timeout_seconds],
    expires_at: @options[:expires_at]
  ).compact
end

#message_idObject



37
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 37

def message_id = @options[:message_id] ||= "llm_fleet_req_#{SecureRandom.uuid}"

#priorityObject



39
40
41
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 39

def priority
  PRIORITY_MAP.fetch(@options[:priority].to_sym, 5) if @options[:priority]
end

#publish(options = nil) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 56

def publish(options = nil)
  raise unless @valid

  requested_options = DEFAULT_PUBLISH_OPTIONS.merge(@options).merge(options || {})
  return_result = return_publish_result?(requested_options)
  publish_options = request_publish_options(requested_options)
  validate_payload_size
  exchange_dest = fleet_exchange
  return_state = {}
  install_return_listener(exchange_dest, requested_options, return_state)
  prepare_publisher_confirms(exchange_dest, requested_options)
  exchange_dest.publish(encode_message, **publish_options)
  return nil unless return_result

  publish_result(exchange_dest, requested_options.merge(publish_options), return_state)
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
       Bunny::NetworkErrorWrapper, IOError, Timeout::Error => e
  handle_exception(e, level: :warn, handled: true, operation: 'llm.fleet.request.publish')
  publish_failure_result(:failed, e, publish_options || requested_options || @options)
end

#reply_toObject



35
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 35

def reply_to = @options[:reply_to]

#routing_keyObject



43
44
45
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 43

def routing_key
  @options[:routing_key] || raise(ArgumentError, 'routing_key is required')
end

#typeObject



33
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 33

def type = Fleet::Protocol::REQUEST_TYPE

#validateObject



77
78
79
80
81
82
83
# File 'lib/legion/extensions/llm/transport/messages/fleet_request.rb', line 77

def validate
  reject_legacy_options!
  require_option!(:routing_key)
  REQUIRED_OPTIONS.each { |key| require_option!(key) }
  require_protocol_version!
  @valid = true
end