Module: Legion::Extensions::Llm::Fleet::ProviderResponder

Defined in:
lib/legion/extensions/llm/fleet/provider_responder.rb

Overview

Shared implementation for provider-owned fleet responder runners.

Defined Under Namespace

Classes: ConfigurationError, FleetEnvelope

Constant Summary collapse

REQUIRED_FIELDS =
%i[
  request_id correlation_id idempotency_key operation provider provider_instance model params reply_to
  message_context caller trace_context signed_token timeout_seconds expires_at protocol_version
].freeze
LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze

Class Method Summary collapse

Class Method Details

.ack(delivery) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 178

def ack(delivery)
  return unless delivery

  if delivery.respond_to?(:ack)
    delivery.ack
  elsif delivery.respond_to?(:channel) && delivery.respond_to?(:delivery_tag)
    delivery.channel.ack(delivery.delivery_tag)
  end
end

.build_provider(envelope:, provider_class:, provider_instances:) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 109

def build_provider(envelope:, provider_class:, provider_instances:)
  instances = resolve_provider_instances(provider_instances)
  instance_id = envelope.provider_instance.to_s
  instance_settings = instances[instance_id.to_sym] || instances[instance_id]
  unless instance_settings
    raise ConfigurationError,
          "fleet provider instance is not configured: #{instance_id}"
  end
  unless truthy?(dig(instance_settings, :fleet, :respond_to_requests))
    raise ConfigurationError, "fleet responses are disabled for provider instance: #{instance_id}"
  end

  provider_class.new(deep_symbolize(instance_settings))
end

.call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil) ⇒ Object

Public runner entry point mirrors AMQP delivery callbacks, which carry both delivery and property metadata. rubocop:disable Metrics/ParameterLists



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 65

def call(payload:, provider_family:, provider_class:, provider_instances:, delivery: nil, properties: nil)
  envelope = parse_payload(payload)
  check_envelope!(envelope, provider_family:)
  provider = build_provider(envelope:, provider_class:, provider_instances:)
  response = WorkerExecution.call(envelope: envelope, provider: provider)
  publish_response(envelope, response)
  ack(delivery || properties)
  response
rescue StandardError => e
  safe_publish_error(envelope, e) if defined?(envelope) && envelope
  reject(delivery || properties, requeue: requeue_error?(e))
  raise
end

.check_envelope!(envelope, provider_family:) ⇒ Object



99
100
101
102
103
104
105
106
107
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 99

def check_envelope!(envelope, provider_family:)
  reject_legacy_fields!(envelope)
  REQUIRED_FIELDS.each do |field|
    raise ArgumentError, "#{field} is required" unless envelope.key?(field) && !envelope[field].nil?
  end

  validate_protocol_version!(envelope)
  validate_provider_family!(envelope, provider_family)
end

.deep_symbolize(value) ⇒ Object



288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 288

def deep_symbolize(value)
  case value
  when Hash
    value.each_with_object({}) do |(key, child), result|
      result[key.respond_to?(:to_sym) ? key.to_sym : key] = deep_symbolize(child)
    end
  when Array
    value.map { |child| deep_symbolize(child) }
  else
    value
  end
end

.dig(hash, *keys) ⇒ Object



276
277
278
279
280
281
282
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 276

def dig(hash, *keys)
  keys.reduce(hash) do |current, key|
    break nil unless current.respond_to?(:key?)

    current[key.to_sym] || current[key.to_s]
  end
end

.enabled_for?(provider_instances) ⇒ Boolean

rubocop:enable Metrics/ParameterLists

Returns:

  • (Boolean)


80
81
82
83
84
85
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 80

def enabled_for?(provider_instances)
  instances = resolve_provider_instances(provider_instances)
  instances.any? do |_instance_id, settings|
    truthy?(dig(settings, :fleet, :respond_to_requests))
  end
end

.error_code(error) ⇒ Object



241
242
243
244
245
246
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 241

def error_code(error)
  return 'configuration_error' if error.is_a?(ConfigurationError)
  return 'policy_error' if error.is_a?(WorkerExecution::PolicyError)

  'provider_error'
end

.parse_json(payload) ⇒ Object



198
199
200
201
202
203
204
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 198

def parse_json(payload)
  if defined?(::Legion::JSON)
    ::Legion::JSON.parse(payload)
  else
    ::JSON.parse(payload)
  end
end

.parse_payload(payload) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 87

def parse_payload(payload)
  hash = case payload
         when FleetEnvelope
           payload.to_h
         when String
           parse_json(payload)
         else
           payload.respond_to?(:to_h) ? payload.to_h : {}
         end
  FleetEnvelope.new(data: deep_symbolize(hash))
end

.publish_error(envelope, error) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 145

def publish_error(envelope, error)
  transport_message_class(:FleetError).new(
    protocol_version: envelope.protocol_version,
    request_id: envelope.request_id,
    correlation_id: envelope.correlation_id,
    idempotency_key: envelope.idempotency_key,
    operation: envelope.operation,
    provider: envelope.provider,
    provider_instance: envelope.provider_instance,
    model: envelope.model,
    reply_to: envelope.reply_to,
    message_context: envelope.message_context,
    trace_context: envelope.trace_context,
    code: error_code(error),
    message: error.message,
    error_class: error.class.name,
    retryable: retryable_error?(error),
    metadata: {}
  ).publish
end

.publish_response(envelope, response) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 124

def publish_response(envelope, response)
  transport_message_class(:FleetResponse).new(
    protocol_version: envelope.protocol_version,
    request_id: envelope.request_id,
    correlation_id: envelope.correlation_id,
    idempotency_key: envelope.idempotency_key,
    operation: envelope.operation,
    provider: envelope.provider,
    provider_instance: envelope.provider_instance,
    model: envelope.model,
    reply_to: envelope.reply_to,
    message_context: envelope.message_context,
    trace_context: envelope.trace_context,
    content: response_content(response),
    tool_calls: response_field(response, :tool_calls) || [],
    usage: response_usage(response),
    finish_reason: response_field(response, :finish_reason),
    metadata: (response)
  ).publish
end

.reject(delivery, requeue:) ⇒ Object



188
189
190
191
192
193
194
195
196
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 188

def reject(delivery, requeue:)
  return unless delivery

  if delivery.respond_to?(:reject)
    delivery.reject(requeue)
  elsif delivery.respond_to?(:channel) && delivery.respond_to?(:delivery_tag)
    delivery.channel.reject(delivery.delivery_tag, requeue)
  end
end

.reject_legacy_fields!(envelope) ⇒ Object



206
207
208
209
210
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 206

def reject_legacy_fields!(envelope)
  LEGACY_FIELDS.each do |field|
    raise ArgumentError, "#{field} is not supported by fleet protocol v2" if envelope.key?(field)
  end
end

.requeue_error?(error) ⇒ Boolean

Returns:

  • (Boolean)


229
230
231
232
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 229

def requeue_error?(error)
  retryable_error?(error) &&
    Settings.value(:fleet, :consumer, :requeue_transient, default: true) != false
end

.resolve_provider_instances(provider_instances) ⇒ Object



224
225
226
227
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 224

def resolve_provider_instances(provider_instances)
  instances = provider_instances.respond_to?(:call) ? provider_instances.call : provider_instances
  deep_symbolize(instances || {})
end

.response_content(response) ⇒ Object



248
249
250
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 248

def response_content(response)
  response_field(response, :content) || response_field(response, :result) || response.to_s
end

.response_field(response, field) ⇒ Object



268
269
270
271
272
273
274
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 268

def response_field(response, field)
  return response[field] if response.respond_to?(:key?) && response.key?(field)
  return response[field.to_s] if response.respond_to?(:key?) && response.key?(field.to_s)
  return response.public_send(field) if response.respond_to?(field)

  nil
end

.response_metadata(response) ⇒ Object



263
264
265
266
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 263

def (response)
   = response_field(response, :metadata)
  .respond_to?(:to_h) ? deep_symbolize() : {}
end

.response_usage(response) ⇒ Object



252
253
254
255
256
257
258
259
260
261
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 252

def response_usage(response)
  usage = response_field(response, :usage) || response_field(response, :tokens)
  return deep_symbolize(usage) if usage.respond_to?(:to_h)

  {
    input_tokens: response_field(response, :input_tokens),
    output_tokens: response_field(response, :output_tokens),
    thinking_tokens: response_field(response, :thinking_tokens)
  }.compact
end

.retryable_error?(error) ⇒ Boolean

Returns:

  • (Boolean)


234
235
236
237
238
239
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 234

def retryable_error?(error)
  return false if error.is_a?(ConfigurationError)
  return false if error.is_a?(WorkerExecution::PolicyError)

  true
end

.safe_publish_error(envelope, error) ⇒ Object



166
167
168
169
170
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 166

def safe_publish_error(envelope, error)
  publish_error(envelope, error)
rescue StandardError
  nil
end

.transport_message_class(name) ⇒ Object



172
173
174
175
176
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 172

def transport_message_class(name)
  ::Legion::Extensions::Llm::Transport::Messages.const_get(name)
rescue LoadError, NameError => e
  raise ConfigurationError, "fleet responder transport unavailable for #{name}: #{e.message}"
end

.truthy?(value) ⇒ Boolean

Returns:

  • (Boolean)


284
285
286
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 284

def truthy?(value)
  value == true || value.to_s == 'true'
end

.validate_protocol_version!(envelope) ⇒ Object

Raises:

  • (ArgumentError)


212
213
214
215
216
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 212

def validate_protocol_version!(envelope)
  return if envelope.protocol_version == Protocol::VERSION

  raise ArgumentError, "protocol_version must be #{Protocol::VERSION}"
end

.validate_provider_family!(envelope, provider_family) ⇒ Object

Raises:

  • (ArgumentError)


218
219
220
221
222
# File 'lib/legion/extensions/llm/fleet/provider_responder.rb', line 218

def validate_provider_family!(envelope, provider_family)
  return if envelope.provider.to_s == provider_family.to_s

  raise ArgumentError, "fleet request provider #{envelope.provider} does not match #{provider_family}"
end