Module: Legion::Extensions::Llm::Fleet::ProviderResponder
- Extended by:
- Logging::Helper
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/llm/fleet/provider_responder.rb
Overview
Shared implementation for provider-owned fleet responder runners.
Defined Under Namespace
Classes: ConfigurationError, FleetEnvelope
Constant Summary collapse
- REQUIRED_FIELDS =
%i[ request_id correlation_id idempotency_key operation provider provider_instance model params reply_to message_context caller trace_context signed_token timeout_seconds expires_at protocol_version ].freeze
- LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze
Class Method Summary collapse
- .ack(delivery) ⇒ Object
- .build_provider(envelope:, provider_class:, provider_instances:) ⇒ Object
-
.call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) ⇒ Object
Public runner entry point mirrors AMQP delivery callbacks, which carry both delivery and property metadata.
- .check_envelope!(envelope, provider_family:) ⇒ Object
- .deep_symbolize(value) ⇒ Object
- .dig(hash, *keys) ⇒ Object
-
.enabled_for?(provider_instances) ⇒ Boolean
rubocop:enable Metrics/ParameterLists.
- .error_code(error) ⇒ Object
- .parse_json(payload) ⇒ Object
- .parse_payload(payload) ⇒ Object
- .publish_error(envelope, error) ⇒ Object
- .publish_response(envelope, response) ⇒ Object
- .reject(delivery, requeue:) ⇒ Object
- .reject_legacy_fields!(envelope) ⇒ Object
- .requeue_error?(error) ⇒ Boolean
- .resolve_provider_instances(provider_instances) ⇒ Object
- .response_content(response) ⇒ Object
- .response_field(response, field) ⇒ Object
- .response_metadata(response) ⇒ Object
- .response_usage(response) ⇒ Object
- .retryable_error?(error) ⇒ Boolean
- .safe_publish_error(envelope, error) ⇒ Object
- .transport_message_class(name) ⇒ Object
- .truthy?(value) ⇒ Boolean
- .validate_protocol_version!(envelope) ⇒ Object
- .validate_provider_family!(envelope, provider_family) ⇒ Object
Class Method Details
.ack(delivery) ⇒ Object
186 187 188 189 190 191 192 193 194 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 186 def ack(delivery) return unless delivery if delivery.respond_to?(:ack) delivery.ack elsif delivery.respond_to?(:channel) && delivery.respond_to?(:delivery_tag) delivery.channel.ack(delivery.delivery_tag) end end |
.build_provider(envelope:, provider_class:, provider_instances:) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 114 def build_provider(envelope:, provider_class:, provider_instances:) instances = resolve_provider_instances(provider_instances) instance_id = envelope.provider_instance.to_s instance_settings = instances[instance_id.to_sym] || instances[instance_id] unless instance_settings raise ConfigurationError, "fleet provider instance is not configured: #{instance_id}" end unless truthy?(dig(instance_settings, :fleet, :respond_to_requests)) raise ConfigurationError, "fleet responses are disabled for provider instance: #{instance_id}" end provider_class.new(deep_symbolize(instance_settings)) end |
.call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) ⇒ Object
Public runner entry point mirrors AMQP delivery callbacks, which carry both delivery and property metadata. rubocop:disable Metrics/ParameterLists
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 68 def call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) envelope = parse_payload(payload) check_envelope!(envelope, provider_family:) provider = build_provider(envelope:, provider_class:, provider_instances:) response = WorkerExecution.call(envelope: envelope, provider: provider) publish_response(envelope, response) ack(delivery || properties) response rescue StandardError => e handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.provider_responder.call', provider_family:) safe_publish_error(envelope, e) if defined?(envelope) && envelope reject(delivery || properties, requeue: requeue_error?(e)) raise end |
.check_envelope!(envelope, provider_family:) ⇒ Object
104 105 106 107 108 109 110 111 112 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 104 def check_envelope!(envelope, provider_family:) reject_legacy_fields!(envelope) REQUIRED_FIELDS.each do |field| raise ArgumentError, "#{field} is required" unless envelope.key?(field) && !envelope[field].nil? end validate_protocol_version!(envelope) validate_provider_family!(envelope, provider_family) end |
.deep_symbolize(value) ⇒ Object
296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 296 def deep_symbolize(value) case value when Hash value.each_with_object({}) do |(key, child), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = deep_symbolize(child) end when Array value.map { |child| deep_symbolize(child) } else value end end |
.dig(hash, *keys) ⇒ Object
284 285 286 287 288 289 290 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 284 def dig(hash, *keys) keys.reduce(hash) do |current, key| break nil unless current.respond_to?(:key?) current[key.to_sym] || current[key.to_s] end end |
.enabled_for?(provider_instances) ⇒ Boolean
rubocop:enable Metrics/ParameterLists
85 86 87 88 89 90 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 85 def enabled_for?(provider_instances) instances = resolve_provider_instances(provider_instances) instances.any? do |_instance_id, settings| truthy?(dig(settings, :fleet, :respond_to_requests)) end end |
.error_code(error) ⇒ Object
249 250 251 252 253 254 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 249 def error_code(error) return 'configuration_error' if error.is_a?(ConfigurationError) return 'policy_error' if error.is_a?(WorkerExecution::PolicyError) 'provider_error' end |
.parse_json(payload) ⇒ Object
206 207 208 209 210 211 212 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 206 def parse_json(payload) if defined?(::Legion::JSON) ::Legion::JSON.parse(payload) else ::JSON.parse(payload) end end |
.parse_payload(payload) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 92 def parse_payload(payload) hash = case payload when FleetEnvelope payload.to_h when String parse_json(payload) else payload.respond_to?(:to_h) ? payload.to_h : {} end FleetEnvelope.new(data: deep_symbolize(hash)) end |
.publish_error(envelope, error) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 150 def publish_error(envelope, error) (:FleetError).new( protocol_version: envelope.protocol_version, request_id: envelope.request_id, correlation_id: envelope.correlation_id, idempotency_key: envelope.idempotency_key, operation: envelope.operation, provider: envelope.provider, provider_instance: envelope.provider_instance, model: envelope.model, reply_to: envelope.reply_to, message_context: envelope., trace_context: envelope.trace_context, code: error_code(error), message: error., error_class: error.class.name, retryable: retryable_error?(error), metadata: {} ).publish end |
.publish_response(envelope, response) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 129 def publish_response(envelope, response) (:FleetResponse).new( protocol_version: envelope.protocol_version, request_id: envelope.request_id, correlation_id: envelope.correlation_id, idempotency_key: envelope.idempotency_key, operation: envelope.operation, provider: envelope.provider, provider_instance: envelope.provider_instance, model: envelope.model, reply_to: envelope.reply_to, message_context: envelope., trace_context: envelope.trace_context, content: response_content(response), tool_calls: response_field(response, :tool_calls) || [], usage: response_usage(response), finish_reason: response_field(response, :finish_reason), metadata: (response) ).publish end |
.reject(delivery, requeue:) ⇒ Object
196 197 198 199 200 201 202 203 204 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 196 def reject(delivery, requeue:) return unless delivery if delivery.respond_to?(:reject) delivery.reject(requeue) elsif delivery.respond_to?(:channel) && delivery.respond_to?(:delivery_tag) delivery.channel.reject(delivery.delivery_tag, requeue) end end |
.reject_legacy_fields!(envelope) ⇒ Object
214 215 216 217 218 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 214 def reject_legacy_fields!(envelope) LEGACY_FIELDS.each do |field| raise ArgumentError, "#{field} is not supported by fleet protocol v2" if envelope.key?(field) end end |
.requeue_error?(error) ⇒ Boolean
237 238 239 240 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 237 def requeue_error?(error) retryable_error?(error) && Settings.value(:fleet, :consumer, :requeue_transient, default: true) != false end |
.resolve_provider_instances(provider_instances) ⇒ Object
232 233 234 235 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 232 def resolve_provider_instances(provider_instances) instances = provider_instances.respond_to?(:call) ? provider_instances.call : provider_instances deep_symbolize(instances || {}) end |
.response_content(response) ⇒ Object
256 257 258 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 256 def response_content(response) response_field(response, :content) || response_field(response, :result) || response.to_s end |
.response_field(response, field) ⇒ Object
276 277 278 279 280 281 282 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 276 def response_field(response, field) return response[field] if response.respond_to?(:key?) && response.key?(field) return response[field.to_s] if response.respond_to?(:key?) && response.key?(field.to_s) return response.public_send(field) if response.respond_to?(field) nil end |
.response_metadata(response) ⇒ Object
271 272 273 274 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 271 def (response) = response_field(response, :metadata) .respond_to?(:to_h) ? deep_symbolize() : {} end |
.response_usage(response) ⇒ Object
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 260 def response_usage(response) usage = response_field(response, :usage) || response_field(response, :tokens) return deep_symbolize(usage) if usage.respond_to?(:to_h) { input_tokens: response_field(response, :input_tokens), output_tokens: response_field(response, :output_tokens), thinking_tokens: response_field(response, :thinking_tokens) }.compact end |
.retryable_error?(error) ⇒ Boolean
242 243 244 245 246 247 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 242 def retryable_error?(error) return false if error.is_a?(ConfigurationError) return false if error.is_a?(WorkerExecution::PolicyError) true end |
.safe_publish_error(envelope, error) ⇒ Object
171 172 173 174 175 176 177 178 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 171 def safe_publish_error(envelope, error) publish_error(envelope, error) rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'llm.fleet.provider_responder.safe_publish_error', error_class: error.class.name) nil end |
.transport_message_class(name) ⇒ Object
180 181 182 183 184 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 180 def (name) ::Legion::Extensions::Llm::Transport::Messages.const_get(name) rescue LoadError, NameError => e raise ConfigurationError, "fleet responder transport unavailable for #{name}: #{e.}" end |
.truthy?(value) ⇒ Boolean
292 293 294 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 292 def truthy?(value) value == true || value.to_s == 'true' end |
.validate_protocol_version!(envelope) ⇒ Object
220 221 222 223 224 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 220 def validate_protocol_version!(envelope) return if envelope.protocol_version == Protocol::VERSION raise ArgumentError, "protocol_version must be #{Protocol::VERSION}" end |
.validate_provider_family!(envelope, provider_family) ⇒ Object
226 227 228 229 230 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 226 def validate_provider_family!(envelope, provider_family) return if envelope.provider.to_s == provider_family.to_s raise ArgumentError, "fleet request provider #{envelope.provider} does not match #{provider_family}" end |