Module: Legion::Extensions::Llm::Gateway::Runners::Fleet
- Defined in:
- lib/legion/extensions/llm/gateway/runners/fleet.rb
Constant Summary collapse
- DEFAULT_TIMEOUT =
30
Class Method Summary collapse
- .dispatch(model:, messages:, **opts) ⇒ Object
- .error_result(reason) ⇒ Object
- .fleet_available? ⇒ Boolean
- .fleet_enabled? ⇒ Boolean
- .publish_request ⇒ Object
- .require_auth? ⇒ Boolean
- .resolve_timeout(override) ⇒ Object
- .timeout_result(correlation_id, timeout) ⇒ Object
- .transport_ready? ⇒ Boolean
- .wait_for_response(correlation_id, timeout:) ⇒ Object
Class Method Details
.dispatch(model:, messages:, **opts) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 13 def dispatch(model:, messages:, **opts) return error_result('fleet_unavailable') unless fleet_available? intent = opts[:intent] token = Helpers::Auth.sign_request({ model: model, intent: intent }) return error_result('fleet_auth_failed') if token.nil? && require_auth? correlation_id = Helpers::Rpc.generate_correlation_id publish_request(model: model, messages: , intent: intent, correlation_id: correlation_id, signed_token: token, **opts.except(:intent, :timeout)) wait_for_response(correlation_id, timeout: resolve_timeout(opts[:timeout])) end |
.error_result(reason) ⇒ Object
93 94 95 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 93 def error_result(reason) { success: false, error: reason } end |
.fleet_available? ⇒ Boolean
28 29 30 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 28 def fleet_available? transport_ready? && fleet_enabled? end |
.fleet_enabled? ⇒ Boolean
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 38 def fleet_enabled? return true unless defined?(Legion::Settings) settings = Legion::Settings[:llm] rescue nil # rubocop:disable Style/RescueModifier return true unless settings.is_a?(Hash) routing = settings[:routing] return true unless routing.is_a?(Hash) routing.fetch(:use_fleet, true) end |
.publish_request ⇒ Object
73 74 75 76 77 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 73 def publish_request(**) Transport::Messages::InferenceRequest.new( reply_to: Helpers::Rpc.agent_queue_name, ** ).publish end |
.require_auth? ⇒ Boolean
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 50 def require_auth? return false unless defined?(Legion::Settings) settings = Legion::Settings[:llm] rescue nil # rubocop:disable Style/RescueModifier return false unless settings.is_a?(Hash) fleet = settings.dig(:routing, :fleet) return false unless fleet.is_a?(Hash) fleet.fetch(:require_auth, false) end |
.resolve_timeout(override) ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 62 def resolve_timeout(override) return override if override return DEFAULT_TIMEOUT unless defined?(Legion::Settings) settings = Legion::Settings[:llm] rescue nil # rubocop:disable Style/RescueModifier return DEFAULT_TIMEOUT unless settings.is_a?(Hash) settings.dig(:routing, :fleet, :timeout_seconds) || DEFAULT_TIMEOUT end |
.timeout_result(correlation_id, timeout) ⇒ Object
89 90 91 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 89 def timeout_result(correlation_id, timeout) { success: false, error: 'fleet_timeout', correlation_id: correlation_id, timeout: timeout } end |
.transport_ready? ⇒ Boolean
32 33 34 35 36 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 32 def transport_ready? !!(defined?(Legion::Transport) && # rubocop:disable Legion/HelperMigration/DefinedTransportGuard Legion::Transport.respond_to?(:connected?) && Legion::Transport.connected?) end |
.wait_for_response(correlation_id, timeout:) ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/legion/extensions/llm/gateway/runners/fleet.rb', line 79 def wait_for_response(correlation_id, timeout:) future = Helpers::ReplyDispatcher.register(correlation_id) result = future.value!(timeout) result || timeout_result(correlation_id, timeout) rescue Concurrent::CancelledOperationError => _e timeout_result(correlation_id, timeout) ensure Helpers::ReplyDispatcher.deregister(correlation_id) end |