Module: Legion::Extensions::Llm::Fleet::DefaultExchangeReply

Includes:
PublishSafety
Included in:
Transport::Messages::FleetError, Transport::Messages::FleetResponse
Defined in:
lib/legion/extensions/llm/fleet/default_exchange_reply.rb

Overview

Publishes correlated fleet replies directly to the caller’s reply queue.

Constant Summary collapse

DEFAULT_REPLY_PUBLISH_OPTIONS =
{
  mandatory: false,
  publisher_confirm: false,
  spool: false,
  return_result: true
}.freeze

Instance Method Summary collapse

Instance Method Details

#publish(options = nil) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/llm/fleet/default_exchange_reply.rb', line 20

def publish(options = nil)
  raise unless @valid

  requested_options = DEFAULT_REPLY_PUBLISH_OPTIONS.merge(@options).merge(options || {})
  return_result = return_publish_result?(requested_options)
  publish_options = reply_publish_options(requested_options)
  validate_payload_size
  default_exchange = channel.default_exchange
  return_state = {}
  install_return_listener(default_exchange, requested_options, return_state)
  prepare_publisher_confirms(default_exchange, requested_options)
  default_exchange.publish(encode_message, **publish_options)
  return nil unless return_result

  publish_result(default_exchange, requested_options.merge(publish_options), return_state)
rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed, Bunny::ChannelError,
       Bunny::NetworkErrorWrapper, IOError, Timeout::Error => e
  handle_exception(e, level: :warn, handled: true, operation: 'llm.fleet.reply.publish')
  reply_publish_failure_result(e, publish_options || @options)
end