Module: Legion::LLM::Fleet::Handler
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/fleet/handler.rb
Constant Summary collapse
- LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze
Class Method Summary collapse
- .build_error(envelope, error) ⇒ Object
- .build_success(envelope, response) ⇒ Object
- .error_code(error) ⇒ Object
- .handle_fleet_request(payload) ⇒ Object
- .log_reply_publish_failure(publish_result, correlation_id) ⇒ Object
- .normalize_hash(hash) ⇒ Object
- .normalize_value(value) ⇒ Object
- .protocol_version?(value) ⇒ Boolean
- .publish_accepted?(publish_result) ⇒ Boolean
- .publish_error(envelope, result) ⇒ Object
- .publish_response(envelope, result) ⇒ Object
- .reply_publish_options ⇒ Object
- .resolve_provider(envelope) ⇒ Object
- .responder_auth_required? ⇒ Boolean
- .response_content(response) ⇒ Object
- .response_finish_reason(response) ⇒ Object
- .response_metadata(response) ⇒ Object
- .response_model(response) ⇒ Object
- .response_tool_calls(response) ⇒ Object
- .response_usage(response) ⇒ Object
- .validate_envelope!(envelope) ⇒ Object
Class Method Details
.build_error(envelope, error) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/legion/llm/fleet/handler.rb', line 87 def build_error(envelope, error) { success: false, error: error_code(error), protocol_version: envelope[:protocol_version] || ::Legion::Extensions::Llm::Fleet::Protocol::VERSION, request_id: envelope[:request_id], correlation_id: envelope[:correlation_id], idempotency_key: envelope[:idempotency_key], operation: envelope[:operation], provider: envelope[:provider], provider_instance: envelope[:provider_instance], model: envelope[:model], reply_to: envelope[:reply_to], message_context: envelope[:message_context] || {}, trace_context: envelope[:trace_context] || {}, caller: envelope[:caller], identity: Legion::LLM::PublisherIdentity.current, message: error., error_class: error.class.name }.compact end |
.build_success(envelope, response) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/legion/llm/fleet/handler.rb', line 63 def build_success(envelope, response) { success: true, protocol_version: envelope[:protocol_version], request_id: envelope[:request_id], correlation_id: envelope[:correlation_id], idempotency_key: envelope[:idempotency_key], operation: envelope[:operation], provider: envelope[:provider], provider_instance: envelope[:provider_instance], model: response_model(response) || envelope[:model], reply_to: envelope[:reply_to], message_context: envelope[:message_context] || {}, trace_context: envelope[:trace_context] || {}, caller: envelope[:caller], identity: Legion::LLM::PublisherIdentity.current, content: response_content(response), tool_calls: response_tool_calls(response), usage: response_usage(response), finish_reason: response_finish_reason(response), metadata: (response) }.compact end |
.error_code(error) ⇒ Object
225 226 227 228 229 230 231 232 |
# File 'lib/legion/llm/fleet/handler.rb', line 225 def error_code(error) = error..to_s return 'invalid_fleet_request' if .start_with?('invalid_fleet_request') return 'provider_not_registered' if .start_with?('provider_not_registered') return 'fleet_policy_denied' if error.is_a?(WorkerExecution::PolicyError) 'fleet_worker_error' end |
.handle_fleet_request(payload) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/legion/llm/fleet/handler.rb', line 20 def handle_fleet_request(payload) envelope = normalize_hash(payload) log.debug '[llm][fleet][handler] action=handle_fleet_request.enter ' \ "operation=#{envelope[:operation]} model=#{envelope[:model]} provider=#{envelope[:provider]}" validate_envelope!(envelope) provider_resolver = proc { |validated_envelope| resolve_provider(validated_envelope) } response = WorkerExecution.call(envelope: envelope, provider: provider_resolver) result = build_success(envelope, response) publish_response(envelope, result) if envelope[:reply_to] result rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.handler.handle_fleet_request') result = build_error(envelope || {}, e) publish_error(envelope || {}, result) if envelope&.[](:reply_to) result end |
.log_reply_publish_failure(publish_result, correlation_id) ⇒ Object
260 261 262 |
# File 'lib/legion/llm/fleet/handler.rb', line 260 def log_reply_publish_failure(publish_result, correlation_id) log.warn("[llm][fleet][handler] action=reply_publish_failed correlation_id=#{correlation_id} status=#{publish_result&.[](:status)}") end |
.normalize_hash(hash) ⇒ Object
164 165 166 167 168 169 170 |
# File 'lib/legion/llm/fleet/handler.rb', line 164 def normalize_hash(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = normalize_value(value) end end |
.normalize_value(value) ⇒ Object
172 173 174 175 176 177 178 179 180 181 |
# File 'lib/legion/llm/fleet/handler.rb', line 172 def normalize_value(value) case value when Hash normalize_hash(value) when Array value.map { |entry| normalize_value(entry) } else value end end |
.protocol_version?(value) ⇒ Boolean
241 242 243 244 |
# File 'lib/legion/llm/fleet/handler.rb', line 241 def protocol_version?(value) value == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION || value == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION.to_s end |
.publish_accepted?(publish_result) ⇒ Boolean
256 257 258 |
# File 'lib/legion/llm/fleet/handler.rb', line 256 def publish_accepted?(publish_result) publish_result.is_a?(Hash) && publish_result[:accepted] == true end |
.publish_error(envelope, result) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/legion/llm/fleet/handler.rb', line 137 def publish_error(envelope, result) require 'legion/extensions/llm/transport/messages/fleet_error' publish_result = ::Legion::Extensions::Llm::Transport::Messages::FleetError.new( protocol_version: result[:protocol_version], request_id: result[:request_id], correlation_id: result[:correlation_id], idempotency_key: result[:idempotency_key], operation: result[:operation], provider: result[:provider], provider_instance: result[:provider_instance], model: result[:model], reply_to: result[:reply_to], message_context: result[:message_context], trace_context: result[:trace_context], caller: envelope[:caller], identity: Legion::LLM::PublisherIdentity.current, code: result[:error], message: result[:message], error_class: result[:error_class], retryable: false ).publish() log_reply_publish_failure(publish_result, result[:correlation_id]) unless publish_accepted?(publish_result) publish_result rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.handler.publish_error') end |
.publish_response(envelope, result) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/legion/llm/fleet/handler.rb', line 109 def publish_response(envelope, result) require 'legion/extensions/llm/transport/messages/fleet_response' publish_result = ::Legion::Extensions::Llm::Transport::Messages::FleetResponse.new( protocol_version: result[:protocol_version], request_id: result[:request_id], correlation_id: result[:correlation_id], idempotency_key: result[:idempotency_key], operation: result[:operation], provider: result[:provider], provider_instance: result[:provider_instance], model: result[:model], reply_to: result[:reply_to], message_context: result[:message_context], trace_context: result[:trace_context], caller: envelope[:caller], identity: Legion::LLM::PublisherIdentity.current, content: result[:content], tool_calls: result[:tool_calls], usage: result[:usage], finish_reason: result[:finish_reason], metadata: result[:metadata] ).publish() log_reply_publish_failure(publish_result, result[:correlation_id]) unless publish_accepted?(publish_result) publish_result rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.handler.publish_response') end |
.reply_publish_options ⇒ Object
246 247 248 249 250 251 252 253 254 |
# File 'lib/legion/llm/fleet/handler.rb', line 246 def { mandatory: Legion::LLM::Settings.value(:fleet, :responder, :mandatory, default: false), publisher_confirm: Legion::LLM::Settings.value(:fleet, :responder, :publisher_confirm, default: false), publish_confirm_timeout_ms: Legion::LLM::Settings.value(:fleet, :responder, :publish_confirm_timeout_ms, default: 500), spool: Legion::LLM::Settings.value(:fleet, :responder, :spool, default: false), return_result: true } end |
.resolve_provider(envelope) ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/legion/llm/fleet/handler.rb', line 54 def resolve_provider(envelope) provider = envelope[:provider] instance = envelope[:provider_instance] adapter = Legion::LLM::Call::Registry.for(provider, instance: instance) return adapter if adapter raise ArgumentError, "provider_not_registered: #{provider}/#{instance}" end |
.responder_auth_required? ⇒ Boolean
234 235 236 237 238 239 |
# File 'lib/legion/llm/fleet/handler.rb', line 234 def responder_auth_required? value = Legion::LLM::Settings.value(:fleet, :responder, :require_auth, default: nil) return value != false unless value.nil? Legion::LLM::Settings.value(:fleet, :auth, :require_signed_token, default: true) != false end |
.response_content(response) ⇒ Object
183 184 185 186 187 188 |
# File 'lib/legion/llm/fleet/handler.rb', line 183 def response_content(response) return response[:content] || response['content'] || response[:result] || response['result'] if response.is_a?(Hash) return response.content if response.respond_to?(:content) response end |
.response_finish_reason(response) ⇒ Object
211 212 213 214 215 216 |
# File 'lib/legion/llm/fleet/handler.rb', line 211 def response_finish_reason(response) return response[:finish_reason] || response['finish_reason'] if response.is_a?(Hash) return response.finish_reason if response.respond_to?(:finish_reason) nil end |
.response_metadata(response) ⇒ Object
218 219 220 221 222 223 |
# File 'lib/legion/llm/fleet/handler.rb', line 218 def (response) return response[:metadata] || response['metadata'] || {} if response.is_a?(Hash) return response. if response.respond_to?(:metadata) {} end |
.response_model(response) ⇒ Object
190 191 192 193 194 195 |
# File 'lib/legion/llm/fleet/handler.rb', line 190 def response_model(response) return response[:model] || response['model'] if response.is_a?(Hash) return response.model if response.respond_to?(:model) nil end |
.response_tool_calls(response) ⇒ Object
197 198 199 200 201 202 |
# File 'lib/legion/llm/fleet/handler.rb', line 197 def response_tool_calls(response) return response[:tool_calls] || response['tool_calls'] if response.is_a?(Hash) return response.tool_calls if response.respond_to?(:tool_calls) nil end |
.response_usage(response) ⇒ Object
204 205 206 207 208 209 |
# File 'lib/legion/llm/fleet/handler.rb', line 204 def response_usage(response) return response[:usage] || response['usage'] || {} if response.is_a?(Hash) return response.tokens if response.respond_to?(:tokens) {} end |
.validate_envelope!(envelope) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/legion/llm/fleet/handler.rb', line 38 def validate_envelope!(envelope) raise ArgumentError, 'invalid_fleet_request: unsupported protocol_version' unless protocol_version?(envelope[:protocol_version]) required_fields = %i[ request_id correlation_id operation provider provider_instance model params reply_to message_context caller trace_context timeout_seconds expires_at idempotency_key ] required_fields << :signed_token if responder_auth_required? required_fields.each do |key| raise ArgumentError, "invalid_fleet_request: #{key} is required" if envelope[key].nil? end LEGACY_FIELDS.each do |key| raise ArgumentError, "invalid_fleet_request: #{key} is not supported" if envelope.key?(key) end end |