Module: Legion::Extensions::Llm::Gateway::Runners::Fleet

Defined in:
lib/legion/extensions/llm/gateway/runners/fleet.rb

Constant Summary collapse

DEFAULT_TIMEOUT =
30

Class Method Summary collapse

Class Method Details

.dispatch(model:, messages:, **opts) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 13

def dispatch(model:, messages:, **opts)
  return error_result('fleet_unavailable') unless fleet_available?

  intent = opts[:intent]
  token = Helpers::Auth.sign_request({ model: model, intent: intent })
  return error_result('fleet_auth_failed') if token.nil? && require_auth?

  correlation_id = Helpers::Rpc.generate_correlation_id
  publish_request(model: model, messages: messages, intent: intent,
                  correlation_id: correlation_id, signed_token: token,
                  **opts.except(:intent, :timeout))

  wait_for_response(correlation_id, timeout: resolve_timeout(opts[:timeout]))
end

.error_result(reason) ⇒ Object



93
94
95
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 93

def error_result(reason)
  { success: false, error: reason }
end

.fleet_available?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 28

def fleet_available?
  transport_ready? && fleet_enabled?
end

.fleet_enabled?Boolean

Returns:

  • (Boolean)


38
39
40
41
42
43
44
45
46
47
48
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 38

def fleet_enabled?
  return true unless defined?(Legion::Settings)

  settings = Legion::Settings[:llm] rescue nil # rubocop:disable Style/RescueModifier
  return true unless settings.is_a?(Hash)

  routing = settings[:routing]
  return true unless routing.is_a?(Hash)

  routing.fetch(:use_fleet, true)
end

.publish_requestObject



73
74
75
76
77
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 73

def publish_request(**)
  Transport::Messages::InferenceRequest.new(
    reply_to: Helpers::Rpc.agent_queue_name, **
  ).publish
end

.require_auth?Boolean

Returns:

  • (Boolean)


50
51
52
53
54
55
56
57
58
59
60
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 50

def require_auth?
  return false unless defined?(Legion::Settings)

  settings = Legion::Settings[:llm] rescue nil # rubocop:disable Style/RescueModifier
  return false unless settings.is_a?(Hash)

  fleet = settings.dig(:routing, :fleet)
  return false unless fleet.is_a?(Hash)

  fleet.fetch(:require_auth, false)
end

.resolve_timeout(override) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 62

def resolve_timeout(override)
  return override if override

  return DEFAULT_TIMEOUT unless defined?(Legion::Settings)

  settings = Legion::Settings[:llm] rescue nil # rubocop:disable Style/RescueModifier
  return DEFAULT_TIMEOUT unless settings.is_a?(Hash)

  settings.dig(:routing, :fleet, :timeout_seconds) || DEFAULT_TIMEOUT
end

.timeout_result(correlation_id, timeout) ⇒ Object



89
90
91
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 89

def timeout_result(correlation_id, timeout)
  { success: false, error: 'fleet_timeout', correlation_id: correlation_id, timeout: timeout }
end

.transport_ready?Boolean

Returns:

  • (Boolean)


32
33
34
35
36
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 32

def transport_ready?
  !!(defined?(Legion::Transport) && # rubocop:disable Legion/HelperMigration/DefinedTransportGuard
     Legion::Transport.respond_to?(:connected?) &&
     Legion::Transport.connected?)
end

.wait_for_response(correlation_id, timeout:) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 79

def wait_for_response(correlation_id, timeout:)
  future = Helpers::ReplyDispatcher.register(correlation_id)
  result = future.value!(timeout)
  result || timeout_result(correlation_id, timeout)
rescue Concurrent::CancelledOperationError => _e
  timeout_result(correlation_id, timeout)
ensure
  Helpers::ReplyDispatcher.deregister(correlation_id)
end