Module: Legion::LLM::Fleet::Handler
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/fleet/handler.rb
Constant Summary collapse
- LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze
Class Method Summary collapse
- .build_error(envelope, error) ⇒ Object
- .build_success(envelope, response) ⇒ Object
- .error_code(error) ⇒ Object
- .handle_fleet_request(payload) ⇒ Object
- .log_reply_publish_failure(publish_result, correlation_id) ⇒ Object
- .normalize_hash(hash) ⇒ Object
- .normalize_value(value) ⇒ Object
- .protocol_version?(value) ⇒ Boolean
- .publish_accepted?(publish_result) ⇒ Boolean
- .publish_error(_envelope, result) ⇒ Object
- .publish_response(_envelope, result) ⇒ Object
- .reply_publish_options ⇒ Object
- .resolve_provider(envelope) ⇒ Object
- .responder_auth_required? ⇒ Boolean
- .response_content(response) ⇒ Object
- .response_finish_reason(response) ⇒ Object
- .response_metadata(response) ⇒ Object
- .response_model(response) ⇒ Object
- .response_tool_calls(response) ⇒ Object
- .response_usage(response) ⇒ Object
- .validate_envelope!(envelope) ⇒ Object
Class Method Details
.build_error(envelope, error) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/legion/llm/fleet/handler.rb', line 84 def build_error(envelope, error) { success: false, error: error_code(error), protocol_version: envelope[:protocol_version] || ::Legion::Extensions::Llm::Fleet::Protocol::VERSION, request_id: envelope[:request_id], correlation_id: envelope[:correlation_id], idempotency_key: envelope[:idempotency_key], operation: envelope[:operation], provider: envelope[:provider], provider_instance: envelope[:provider_instance], model: envelope[:model], reply_to: envelope[:reply_to], message_context: envelope[:message_context] || {}, trace_context: envelope[:trace_context] || {}, message: error., error_class: error.class.name }.compact end |
.build_success(envelope, response) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/legion/llm/fleet/handler.rb', line 62 def build_success(envelope, response) { success: true, protocol_version: envelope[:protocol_version], request_id: envelope[:request_id], correlation_id: envelope[:correlation_id], idempotency_key: envelope[:idempotency_key], operation: envelope[:operation], provider: envelope[:provider], provider_instance: envelope[:provider_instance], model: response_model(response) || envelope[:model], reply_to: envelope[:reply_to], message_context: envelope[:message_context] || {}, trace_context: envelope[:trace_context] || {}, content: response_content(response), tool_calls: response_tool_calls(response), usage: response_usage(response), finish_reason: response_finish_reason(response), metadata: (response) }.compact end |
.error_code(error) ⇒ Object
216 217 218 219 220 221 222 223 |
# File 'lib/legion/llm/fleet/handler.rb', line 216 def error_code(error) = error..to_s return 'invalid_fleet_request' if .start_with?('invalid_fleet_request') return 'provider_not_registered' if .start_with?('provider_not_registered') return 'fleet_policy_denied' if error.is_a?(WorkerExecution::PolicyError) 'fleet_worker_error' end |
.handle_fleet_request(payload) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/legion/llm/fleet/handler.rb', line 19 def handle_fleet_request(payload) envelope = normalize_hash(payload) log.debug '[llm][fleet][handler] action=handle_fleet_request.enter ' \ "operation=#{envelope[:operation]} model=#{envelope[:model]} provider=#{envelope[:provider]}" validate_envelope!(envelope) provider_resolver = proc { |validated_envelope| resolve_provider(validated_envelope) } response = WorkerExecution.call(envelope: envelope, provider: provider_resolver) result = build_success(envelope, response) publish_response(envelope, result) if envelope[:reply_to] result rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.handler.handle_fleet_request') result = build_error(envelope || {}, e) publish_error(envelope || {}, result) if envelope&.[](:reply_to) result end |
.log_reply_publish_failure(publish_result, correlation_id) ⇒ Object
251 252 253 |
# File 'lib/legion/llm/fleet/handler.rb', line 251 def log_reply_publish_failure(publish_result, correlation_id) log.warn("[llm][fleet][handler] action=reply_publish_failed correlation_id=#{correlation_id} status=#{publish_result&.[](:status)}") end |
.normalize_hash(hash) ⇒ Object
155 156 157 158 159 160 161 |
# File 'lib/legion/llm/fleet/handler.rb', line 155 def normalize_hash(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = normalize_value(value) end end |
.normalize_value(value) ⇒ Object
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/legion/llm/fleet/handler.rb', line 163 def normalize_value(value) case value when Hash normalize_hash(value) when Array value.map { |entry| normalize_value(entry) } else value end end |
.protocol_version?(value) ⇒ Boolean
232 233 234 235 |
# File 'lib/legion/llm/fleet/handler.rb', line 232 def protocol_version?(value) value == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION || value == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION.to_s end |
.publish_accepted?(publish_result) ⇒ Boolean
247 248 249 |
# File 'lib/legion/llm/fleet/handler.rb', line 247 def publish_accepted?(publish_result) publish_result.is_a?(Hash) && publish_result[:accepted] == true end |
.publish_error(_envelope, result) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/legion/llm/fleet/handler.rb', line 130 def publish_error(_envelope, result) require 'legion/extensions/llm/transport/messages/fleet_error' publish_result = ::Legion::Extensions::Llm::Transport::Messages::FleetError.new( protocol_version: result[:protocol_version], request_id: result[:request_id], correlation_id: result[:correlation_id], idempotency_key: result[:idempotency_key], operation: result[:operation], provider: result[:provider], provider_instance: result[:provider_instance], model: result[:model], reply_to: result[:reply_to], message_context: result[:message_context], trace_context: result[:trace_context], code: result[:error], message: result[:message], error_class: result[:error_class], retryable: false ).publish() log_reply_publish_failure(publish_result, result[:correlation_id]) unless publish_accepted?(publish_result) publish_result rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.handler.publish_error') end |
.publish_response(_envelope, result) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/legion/llm/fleet/handler.rb', line 104 def publish_response(_envelope, result) require 'legion/extensions/llm/transport/messages/fleet_response' publish_result = ::Legion::Extensions::Llm::Transport::Messages::FleetResponse.new( protocol_version: result[:protocol_version], request_id: result[:request_id], correlation_id: result[:correlation_id], idempotency_key: result[:idempotency_key], operation: result[:operation], provider: result[:provider], provider_instance: result[:provider_instance], model: result[:model], reply_to: result[:reply_to], message_context: result[:message_context], trace_context: result[:trace_context], content: result[:content], tool_calls: result[:tool_calls], usage: result[:usage], finish_reason: result[:finish_reason], metadata: result[:metadata] ).publish() log_reply_publish_failure(publish_result, result[:correlation_id]) unless publish_accepted?(publish_result) publish_result rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.handler.publish_response') end |
.reply_publish_options ⇒ Object
237 238 239 240 241 242 243 244 245 |
# File 'lib/legion/llm/fleet/handler.rb', line 237 def { mandatory: Legion::LLM::Settings.value(:fleet, :responder, :mandatory, default: false), publisher_confirm: Legion::LLM::Settings.value(:fleet, :responder, :publisher_confirm, default: false), publish_confirm_timeout_ms: Legion::LLM::Settings.value(:fleet, :responder, :publish_confirm_timeout_ms, default: 500), spool: Legion::LLM::Settings.value(:fleet, :responder, :spool, default: false), return_result: true } end |
.resolve_provider(envelope) ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/legion/llm/fleet/handler.rb', line 53 def resolve_provider(envelope) provider = envelope[:provider] instance = envelope[:provider_instance] adapter = Legion::LLM::Call::Registry.for(provider, instance: instance) return adapter if adapter raise ArgumentError, "provider_not_registered: #{provider}/#{instance}" end |
.responder_auth_required? ⇒ Boolean
225 226 227 228 229 230 |
# File 'lib/legion/llm/fleet/handler.rb', line 225 def responder_auth_required? value = Legion::LLM::Settings.value(:fleet, :responder, :require_auth, default: nil) return value != false unless value.nil? Legion::LLM::Settings.value(:fleet, :auth, :require_signed_token, default: true) != false end |
.response_content(response) ⇒ Object
174 175 176 177 178 179 |
# File 'lib/legion/llm/fleet/handler.rb', line 174 def response_content(response) return response[:content] || response['content'] || response[:result] || response['result'] if response.is_a?(Hash) return response.content if response.respond_to?(:content) response end |
.response_finish_reason(response) ⇒ Object
202 203 204 205 206 207 |
# File 'lib/legion/llm/fleet/handler.rb', line 202 def response_finish_reason(response) return response[:finish_reason] || response['finish_reason'] if response.is_a?(Hash) return response.finish_reason if response.respond_to?(:finish_reason) nil end |
.response_metadata(response) ⇒ Object
209 210 211 212 213 214 |
# File 'lib/legion/llm/fleet/handler.rb', line 209 def (response) return response[:metadata] || response['metadata'] || {} if response.is_a?(Hash) return response. if response.respond_to?(:metadata) {} end |
.response_model(response) ⇒ Object
181 182 183 184 185 186 |
# File 'lib/legion/llm/fleet/handler.rb', line 181 def response_model(response) return response[:model] || response['model'] if response.is_a?(Hash) return response.model if response.respond_to?(:model) nil end |
.response_tool_calls(response) ⇒ Object
188 189 190 191 192 193 |
# File 'lib/legion/llm/fleet/handler.rb', line 188 def response_tool_calls(response) return response[:tool_calls] || response['tool_calls'] if response.is_a?(Hash) return response.tool_calls if response.respond_to?(:tool_calls) nil end |
.response_usage(response) ⇒ Object
195 196 197 198 199 200 |
# File 'lib/legion/llm/fleet/handler.rb', line 195 def response_usage(response) return response[:usage] || response['usage'] || {} if response.is_a?(Hash) return response.tokens if response.respond_to?(:tokens) {} end |
.validate_envelope!(envelope) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/legion/llm/fleet/handler.rb', line 37 def validate_envelope!(envelope) raise ArgumentError, 'invalid_fleet_request: unsupported protocol_version' unless protocol_version?(envelope[:protocol_version]) required_fields = %i[ request_id correlation_id operation provider provider_instance model params reply_to message_context caller trace_context timeout_seconds expires_at idempotency_key ] required_fields << :signed_token if responder_auth_required? required_fields.each do |key| raise ArgumentError, "invalid_fleet_request: #{key} is required" if envelope[key].nil? end LEGACY_FIELDS.each do |key| raise ArgumentError, "invalid_fleet_request: #{key} is not supported" if envelope.key?(key) end end |