Module: Legion::LLM::Fleet::Handler

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/handler.rb

Constant Summary collapse

LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze

Class Method Summary collapse

Class Method Details

.build_error(envelope, error) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/legion/llm/fleet/handler.rb', line 84

def build_error(envelope, error)
  {
    success:           false,
    error:             error_code(error),
    protocol_version:  envelope[:protocol_version] || ::Legion::Extensions::Llm::Fleet::Protocol::VERSION,
    request_id:        envelope[:request_id],
    correlation_id:    envelope[:correlation_id],
    idempotency_key:   envelope[:idempotency_key],
    operation:         envelope[:operation],
    provider:          envelope[:provider],
    provider_instance: envelope[:provider_instance],
    model:             envelope[:model],
    reply_to:          envelope[:reply_to],
    message_context:   envelope[:message_context] || {},
    trace_context:     envelope[:trace_context] || {},
    message:           error.message,
    error_class:       error.class.name
  }.compact
end

.build_success(envelope, response) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/legion/llm/fleet/handler.rb', line 62

def build_success(envelope, response)
  {
    success:           true,
    protocol_version:  envelope[:protocol_version],
    request_id:        envelope[:request_id],
    correlation_id:    envelope[:correlation_id],
    idempotency_key:   envelope[:idempotency_key],
    operation:         envelope[:operation],
    provider:          envelope[:provider],
    provider_instance: envelope[:provider_instance],
    model:             response_model(response) || envelope[:model],
    reply_to:          envelope[:reply_to],
    message_context:   envelope[:message_context] || {},
    trace_context:     envelope[:trace_context] || {},
    content:           response_content(response),
    tool_calls:        response_tool_calls(response),
    usage:             response_usage(response),
    finish_reason:     response_finish_reason(response),
    metadata:          (response)
  }.compact
end

.error_code(error) ⇒ Object



216
217
218
219
220
221
222
223
# File 'lib/legion/llm/fleet/handler.rb', line 216

def error_code(error)
  message = error.message.to_s
  return 'invalid_fleet_request' if message.start_with?('invalid_fleet_request')
  return 'provider_not_registered' if message.start_with?('provider_not_registered')
  return 'fleet_policy_denied' if error.is_a?(WorkerExecution::PolicyError)

  'fleet_worker_error'
end

.handle_fleet_request(payload) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/legion/llm/fleet/handler.rb', line 19

def handle_fleet_request(payload)
  envelope = normalize_hash(payload)
  log.debug '[llm][fleet][handler] action=handle_fleet_request.enter ' \
            "operation=#{envelope[:operation]} model=#{envelope[:model]} provider=#{envelope[:provider]}"

  validate_envelope!(envelope)
  provider_resolver = proc { |validated_envelope| resolve_provider(validated_envelope) }
  response = WorkerExecution.call(envelope: envelope, provider: provider_resolver)
  result = build_success(envelope, response)
  publish_response(envelope, result) if envelope[:reply_to]
  result
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.handler.handle_fleet_request')
  result = build_error(envelope || {}, e)
  publish_error(envelope || {}, result) if envelope&.[](:reply_to)
  result
end

.log_reply_publish_failure(publish_result, correlation_id) ⇒ Object



251
252
253
# File 'lib/legion/llm/fleet/handler.rb', line 251

def log_reply_publish_failure(publish_result, correlation_id)
  log.warn("[llm][fleet][handler] action=reply_publish_failed correlation_id=#{correlation_id} status=#{publish_result&.[](:status)}")
end

.normalize_hash(hash) ⇒ Object



155
156
157
158
159
160
161
# File 'lib/legion/llm/fleet/handler.rb', line 155

def normalize_hash(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = normalize_value(value)
  end
end

.normalize_value(value) ⇒ Object



163
164
165
166
167
168
169
170
171
172
# File 'lib/legion/llm/fleet/handler.rb', line 163

def normalize_value(value)
  case value
  when Hash
    normalize_hash(value)
  when Array
    value.map { |entry| normalize_value(entry) }
  else
    value
  end
end

.protocol_version?(value) ⇒ Boolean

Returns:

  • (Boolean)


232
233
234
235
# File 'lib/legion/llm/fleet/handler.rb', line 232

def protocol_version?(value)
  value == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION ||
    value == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION.to_s
end

.publish_accepted?(publish_result) ⇒ Boolean

Returns:

  • (Boolean)


247
248
249
# File 'lib/legion/llm/fleet/handler.rb', line 247

def publish_accepted?(publish_result)
  publish_result.is_a?(Hash) && publish_result[:accepted] == true
end

.publish_error(_envelope, result) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/legion/llm/fleet/handler.rb', line 130

def publish_error(_envelope, result)
  require 'legion/extensions/llm/transport/messages/fleet_error'
  publish_result = ::Legion::Extensions::Llm::Transport::Messages::FleetError.new(
    protocol_version:  result[:protocol_version],
    request_id:        result[:request_id],
    correlation_id:    result[:correlation_id],
    idempotency_key:   result[:idempotency_key],
    operation:         result[:operation],
    provider:          result[:provider],
    provider_instance: result[:provider_instance],
    model:             result[:model],
    reply_to:          result[:reply_to],
    message_context:   result[:message_context],
    trace_context:     result[:trace_context],
    code:              result[:error],
    message:           result[:message],
    error_class:       result[:error_class],
    retryable:         false
  ).publish(reply_publish_options)
  log_reply_publish_failure(publish_result, result[:correlation_id]) unless publish_accepted?(publish_result)
  publish_result
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.handler.publish_error')
end

.publish_response(_envelope, result) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/legion/llm/fleet/handler.rb', line 104

def publish_response(_envelope, result)
  require 'legion/extensions/llm/transport/messages/fleet_response'
  publish_result = ::Legion::Extensions::Llm::Transport::Messages::FleetResponse.new(
    protocol_version:  result[:protocol_version],
    request_id:        result[:request_id],
    correlation_id:    result[:correlation_id],
    idempotency_key:   result[:idempotency_key],
    operation:         result[:operation],
    provider:          result[:provider],
    provider_instance: result[:provider_instance],
    model:             result[:model],
    reply_to:          result[:reply_to],
    message_context:   result[:message_context],
    trace_context:     result[:trace_context],
    content:           result[:content],
    tool_calls:        result[:tool_calls],
    usage:             result[:usage],
    finish_reason:     result[:finish_reason],
    metadata:          result[:metadata]
  ).publish(reply_publish_options)
  log_reply_publish_failure(publish_result, result[:correlation_id]) unless publish_accepted?(publish_result)
  publish_result
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.handler.publish_response')
end

.reply_publish_optionsObject



237
238
239
240
241
242
243
244
245
# File 'lib/legion/llm/fleet/handler.rb', line 237

def reply_publish_options
  {
    mandatory:                  Legion::LLM::Settings.value(:fleet, :responder, :mandatory, default: false),
    publisher_confirm:          Legion::LLM::Settings.value(:fleet, :responder, :publisher_confirm, default: false),
    publish_confirm_timeout_ms: Legion::LLM::Settings.value(:fleet, :responder, :publish_confirm_timeout_ms, default: 500),
    spool:                      Legion::LLM::Settings.value(:fleet, :responder, :spool, default: false),
    return_result:              true
  }
end

.resolve_provider(envelope) ⇒ Object

Raises:

  • (ArgumentError)


53
54
55
56
57
58
59
60
# File 'lib/legion/llm/fleet/handler.rb', line 53

def resolve_provider(envelope)
  provider = envelope[:provider]
  instance = envelope[:provider_instance]
  adapter = Legion::LLM::Call::Registry.for(provider, instance: instance)
  return adapter if adapter

  raise ArgumentError, "provider_not_registered: #{provider}/#{instance}"
end

.responder_auth_required?Boolean

Returns:

  • (Boolean)


225
226
227
228
229
230
# File 'lib/legion/llm/fleet/handler.rb', line 225

def responder_auth_required?
  value = Legion::LLM::Settings.value(:fleet, :responder, :require_auth, default: nil)
  return value != false unless value.nil?

  Legion::LLM::Settings.value(:fleet, :auth, :require_signed_token, default: true) != false
end

.response_content(response) ⇒ Object



174
175
176
177
178
179
# File 'lib/legion/llm/fleet/handler.rb', line 174

def response_content(response)
  return response[:content] || response['content'] || response[:result] || response['result'] if response.is_a?(Hash)
  return response.content if response.respond_to?(:content)

  response
end

.response_finish_reason(response) ⇒ Object



202
203
204
205
206
207
# File 'lib/legion/llm/fleet/handler.rb', line 202

def response_finish_reason(response)
  return response[:finish_reason] || response['finish_reason'] if response.is_a?(Hash)
  return response.finish_reason if response.respond_to?(:finish_reason)

  nil
end

.response_metadata(response) ⇒ Object



209
210
211
212
213
214
# File 'lib/legion/llm/fleet/handler.rb', line 209

def (response)
  return response[:metadata] || response['metadata'] || {} if response.is_a?(Hash)
  return response. if response.respond_to?(:metadata)

  {}
end

.response_model(response) ⇒ Object



181
182
183
184
185
186
# File 'lib/legion/llm/fleet/handler.rb', line 181

def response_model(response)
  return response[:model] || response['model'] if response.is_a?(Hash)
  return response.model if response.respond_to?(:model)

  nil
end

.response_tool_calls(response) ⇒ Object



188
189
190
191
192
193
# File 'lib/legion/llm/fleet/handler.rb', line 188

def response_tool_calls(response)
  return response[:tool_calls] || response['tool_calls'] if response.is_a?(Hash)
  return response.tool_calls if response.respond_to?(:tool_calls)

  nil
end

.response_usage(response) ⇒ Object



195
196
197
198
199
200
# File 'lib/legion/llm/fleet/handler.rb', line 195

def response_usage(response)
  return response[:usage] || response['usage'] || {} if response.is_a?(Hash)
  return response.tokens if response.respond_to?(:tokens)

  {}
end

.validate_envelope!(envelope) ⇒ Object

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/legion/llm/fleet/handler.rb', line 37

def validate_envelope!(envelope)
  raise ArgumentError, 'invalid_fleet_request: unsupported protocol_version' unless protocol_version?(envelope[:protocol_version])

  required_fields = %i[
    request_id correlation_id operation provider provider_instance model params reply_to
    message_context caller trace_context timeout_seconds expires_at idempotency_key
  ]
  required_fields << :signed_token if responder_auth_required?
  required_fields.each do |key|
    raise ArgumentError, "invalid_fleet_request: #{key} is required" if envelope[key].nil?
  end
  LEGACY_FIELDS.each do |key|
    raise ArgumentError, "invalid_fleet_request: #{key} is not supported" if envelope.key?(key)
  end
end