Module: Legion::Extensions::Llm::Fleet::ProviderResponder
- Defined in:
- lib/legion/extensions/llm/fleet/provider_responder.rb
Overview
Shared implementation for provider-owned fleet responder runners.
Defined Under Namespace
Classes: ConfigurationError, FleetEnvelope
Constant Summary collapse
- REQUIRED_FIELDS =
%i[ request_id correlation_id idempotency_key operation provider provider_instance model params reply_to message_context caller trace_context signed_token timeout_seconds expires_at protocol_version ].freeze
- LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze
Class Method Summary collapse
- .ack(delivery) ⇒ Object
- .build_provider(envelope:, provider_class:, provider_instances:) ⇒ Object
-
.call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) ⇒ Object
Public runner entry point mirrors AMQP delivery callbacks, which carry both delivery and property metadata.
- .check_envelope!(envelope, provider_family:) ⇒ Object
- .deep_symbolize(value) ⇒ Object
- .dig(hash, *keys) ⇒ Object
-
.enabled_for?(provider_instances) ⇒ Boolean
rubocop:enable Metrics/ParameterLists.
- .error_code(error) ⇒ Object
- .parse_json(payload) ⇒ Object
- .parse_payload(payload) ⇒ Object
- .publish_error(envelope, error) ⇒ Object
- .publish_response(envelope, response) ⇒ Object
- .reject(delivery, requeue:) ⇒ Object
- .reject_legacy_fields!(envelope) ⇒ Object
- .requeue_error?(error) ⇒ Boolean
- .resolve_provider_instances(provider_instances) ⇒ Object
- .response_content(response) ⇒ Object
- .response_field(response, field) ⇒ Object
- .response_metadata(response) ⇒ Object
- .response_usage(response) ⇒ Object
- .retryable_error?(error) ⇒ Boolean
- .safe_publish_error(envelope, error) ⇒ Object
- .transport_message_class(name) ⇒ Object
- .truthy?(value) ⇒ Boolean
- .validate_protocol_version!(envelope) ⇒ Object
- .validate_provider_family!(envelope, provider_family) ⇒ Object
Class Method Details
.ack(delivery) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 178 def ack(delivery) return unless delivery if delivery.respond_to?(:ack) delivery.ack elsif delivery.respond_to?(:channel) && delivery.respond_to?(:delivery_tag) delivery.channel.ack(delivery.delivery_tag) end end |
.build_provider(envelope:, provider_class:, provider_instances:) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 109 def build_provider(envelope:, provider_class:, provider_instances:) instances = resolve_provider_instances(provider_instances) instance_id = envelope.provider_instance.to_s instance_settings = instances[instance_id.to_sym] || instances[instance_id] unless instance_settings raise ConfigurationError, "fleet provider instance is not configured: #{instance_id}" end unless truthy?(dig(instance_settings, :fleet, :respond_to_requests)) raise ConfigurationError, "fleet responses are disabled for provider instance: #{instance_id}" end provider_class.new(deep_symbolize(instance_settings)) end |
.call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) ⇒ Object
Public runner entry point mirrors AMQP delivery callbacks, which carry both delivery and property metadata. rubocop:disable Metrics/ParameterLists
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 65 def call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) envelope = parse_payload(payload) check_envelope!(envelope, provider_family:) provider = build_provider(envelope:, provider_class:, provider_instances:) response = WorkerExecution.call(envelope: envelope, provider: provider) publish_response(envelope, response) ack(delivery || properties) response rescue StandardError => e safe_publish_error(envelope, e) if defined?(envelope) && envelope reject(delivery || properties, requeue: requeue_error?(e)) raise end |
.check_envelope!(envelope, provider_family:) ⇒ Object
99 100 101 102 103 104 105 106 107 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 99 def check_envelope!(envelope, provider_family:) reject_legacy_fields!(envelope) REQUIRED_FIELDS.each do |field| raise ArgumentError, "#{field} is required" unless envelope.key?(field) && !envelope[field].nil? end validate_protocol_version!(envelope) validate_provider_family!(envelope, provider_family) end |
.deep_symbolize(value) ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 288 def deep_symbolize(value) case value when Hash value.each_with_object({}) do |(key, child), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = deep_symbolize(child) end when Array value.map { |child| deep_symbolize(child) } else value end end |
.dig(hash, *keys) ⇒ Object
276 277 278 279 280 281 282 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 276 def dig(hash, *keys) keys.reduce(hash) do |current, key| break nil unless current.respond_to?(:key?) current[key.to_sym] || current[key.to_s] end end |
.enabled_for?(provider_instances) ⇒ Boolean
rubocop:enable Metrics/ParameterLists
80 81 82 83 84 85 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 80 def enabled_for?(provider_instances) instances = resolve_provider_instances(provider_instances) instances.any? do |_instance_id, settings| truthy?(dig(settings, :fleet, :respond_to_requests)) end end |
.error_code(error) ⇒ Object
241 242 243 244 245 246 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 241 def error_code(error) return 'configuration_error' if error.is_a?(ConfigurationError) return 'policy_error' if error.is_a?(WorkerExecution::PolicyError) 'provider_error' end |
.parse_json(payload) ⇒ Object
198 199 200 201 202 203 204 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 198 def parse_json(payload) if defined?(::Legion::JSON) ::Legion::JSON.parse(payload) else ::JSON.parse(payload) end end |
.parse_payload(payload) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 87 def parse_payload(payload) hash = case payload when FleetEnvelope payload.to_h when String parse_json(payload) else payload.respond_to?(:to_h) ? payload.to_h : {} end FleetEnvelope.new(data: deep_symbolize(hash)) end |
.publish_error(envelope, error) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 145 def publish_error(envelope, error) (:FleetError).new( protocol_version: envelope.protocol_version, request_id: envelope.request_id, correlation_id: envelope.correlation_id, idempotency_key: envelope.idempotency_key, operation: envelope.operation, provider: envelope.provider, provider_instance: envelope.provider_instance, model: envelope.model, reply_to: envelope.reply_to, message_context: envelope., trace_context: envelope.trace_context, code: error_code(error), message: error., error_class: error.class.name, retryable: retryable_error?(error), metadata: {} ).publish end |
.publish_response(envelope, response) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 124 def publish_response(envelope, response) (:FleetResponse).new( protocol_version: envelope.protocol_version, request_id: envelope.request_id, correlation_id: envelope.correlation_id, idempotency_key: envelope.idempotency_key, operation: envelope.operation, provider: envelope.provider, provider_instance: envelope.provider_instance, model: envelope.model, reply_to: envelope.reply_to, message_context: envelope., trace_context: envelope.trace_context, content: response_content(response), tool_calls: response_field(response, :tool_calls) || [], usage: response_usage(response), finish_reason: response_field(response, :finish_reason), metadata: (response) ).publish end |
.reject(delivery, requeue:) ⇒ Object
188 189 190 191 192 193 194 195 196 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 188 def reject(delivery, requeue:) return unless delivery if delivery.respond_to?(:reject) delivery.reject(requeue) elsif delivery.respond_to?(:channel) && delivery.respond_to?(:delivery_tag) delivery.channel.reject(delivery.delivery_tag, requeue) end end |
.reject_legacy_fields!(envelope) ⇒ Object
206 207 208 209 210 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 206 def reject_legacy_fields!(envelope) LEGACY_FIELDS.each do |field| raise ArgumentError, "#{field} is not supported by fleet protocol v2" if envelope.key?(field) end end |
.requeue_error?(error) ⇒ Boolean
229 230 231 232 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 229 def requeue_error?(error) retryable_error?(error) && Settings.value(:fleet, :consumer, :requeue_transient, default: true) != false end |
.resolve_provider_instances(provider_instances) ⇒ Object
224 225 226 227 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 224 def resolve_provider_instances(provider_instances) instances = provider_instances.respond_to?(:call) ? provider_instances.call : provider_instances deep_symbolize(instances || {}) end |
.response_content(response) ⇒ Object
248 249 250 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 248 def response_content(response) response_field(response, :content) || response_field(response, :result) || response.to_s end |
.response_field(response, field) ⇒ Object
268 269 270 271 272 273 274 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 268 def response_field(response, field) return response[field] if response.respond_to?(:key?) && response.key?(field) return response[field.to_s] if response.respond_to?(:key?) && response.key?(field.to_s) return response.public_send(field) if response.respond_to?(field) nil end |
.response_metadata(response) ⇒ Object
263 264 265 266 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 263 def (response) = response_field(response, :metadata) .respond_to?(:to_h) ? deep_symbolize() : {} end |
.response_usage(response) ⇒ Object
252 253 254 255 256 257 258 259 260 261 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 252 def response_usage(response) usage = response_field(response, :usage) || response_field(response, :tokens) return deep_symbolize(usage) if usage.respond_to?(:to_h) { input_tokens: response_field(response, :input_tokens), output_tokens: response_field(response, :output_tokens), thinking_tokens: response_field(response, :thinking_tokens) }.compact end |
.retryable_error?(error) ⇒ Boolean
234 235 236 237 238 239 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 234 def retryable_error?(error) return false if error.is_a?(ConfigurationError) return false if error.is_a?(WorkerExecution::PolicyError) true end |
.safe_publish_error(envelope, error) ⇒ Object
166 167 168 169 170 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 166 def safe_publish_error(envelope, error) publish_error(envelope, error) rescue StandardError nil end |
.transport_message_class(name) ⇒ Object
172 173 174 175 176 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 172 def (name) ::Legion::Extensions::Llm::Transport::Messages.const_get(name) rescue LoadError, NameError => e raise ConfigurationError, "fleet responder transport unavailable for #{name}: #{e.}" end |
.truthy?(value) ⇒ Boolean
284 285 286 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 284 def truthy?(value) value == true || value.to_s == 'true' end |
.validate_protocol_version!(envelope) ⇒ Object
212 213 214 215 216 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 212 def validate_protocol_version!(envelope) return if envelope.protocol_version == Protocol::VERSION raise ArgumentError, "protocol_version must be #{Protocol::VERSION}" end |
.validate_provider_family!(envelope, provider_family) ⇒ Object
218 219 220 221 222 |
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 218 def validate_provider_family!(envelope, provider_family) return if envelope.provider.to_s == provider_family.to_s raise ArgumentError, "fleet request provider #{envelope.provider} does not match #{provider_family}" end |