Module: Legion::LLM::Fleet::Dispatcher

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/dispatcher.rb

Constant Summary collapse

ENVELOPE_KEYS =
%i[
  app_id caller correlation_id expires_at idempotency_key identity message_context operation
  model priority protocol_version provider provider_instance reply_to request_id routing_key
  signed_token timeout timeout_seconds trace_context ttl
].freeze
LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze

Class Method Summary collapse

Class Method Details

.build_envelope(operation:, request_opts:, message_context:, routing_key: nil, reply_to: nil) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/legion/llm/fleet/dispatcher.rb', line 58

def build_envelope(operation:, request_opts:, message_context:, routing_key: nil, reply_to: nil)
  provider = fetch_option(request_opts, :provider) || 'ollama'
  reject_legacy_fields!(request_opts)
  provider_instance = fetch_option(request_opts, :provider_instance) ||
                      fetch_option(request_opts, :instance) || 'default'
  model = fetch_option(request_opts, :model)
  timeout = resolve_timeout(operation: operation, override: fetch_option(request_opts, :timeout))
  request_id = next_request_id
  correlation_id = next_request_id
  reply_to ||= ReplyDispatcher.agent_queue_name
  routing_key ||= build_routing_key(
    provider:                provider,
    operation:               operation,
    model:                   model,
    provider_instance:       provider_instance,
    context_window:          context_window_from(request_opts),
    boundary:                fetch_option(request_opts, :network_boundary),
    eligibility_fingerprint: fetch_option(request_opts, :eligibility_fingerprint),
    routing_style:           fetch_option(request_opts, :routing_style)
  )

  envelope = {
    protocol_version:  ::Legion::Extensions::Llm::Fleet::Protocol::VERSION,
    request_id:        request_id,
    correlation_id:    correlation_id,
    idempotency_key:   fetch_option(request_opts, :idempotency_key) || "idem_#{SecureRandom.uuid}",
    operation:         operation,
    provider:          provider,
    provider_instance: provider_instance,
    model:             model,
    params:            request_params(request_opts),
    routing_key:       routing_key,
    reply_to:          reply_to,
    message_context:   message_context || {},
    caller:            fetch_option(request_opts, :caller) || default_caller,
    identity:          Legion::LLM::PublisherIdentity.current,
    trace_context:     fetch_option(request_opts, :trace_context) || {},
    timeout_seconds:   timeout,
    expires_at:        (Time.now.utc + timeout).iso8601,
    ttl:               effective_ttl(request_opts, timeout)
  }
  envelope[:signed_token] = dispatch_auth_required? ? TokenIssuer.issue(envelope) : 'unsigned'
  envelope
end

.build_routing_key(provider:, operation:, model:, provider_instance: nil, context_window: nil, boundary: nil, eligibility_fingerprint: nil, routing_style: nil) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/legion/llm/fleet/dispatcher.rb', line 111

def build_routing_key(provider:, operation:, model:, provider_instance: nil, context_window: nil, boundary: nil,
                      eligibility_fingerprint: nil, routing_style: nil)
  style = routing_style || default_routing_style
  return Lane.offering_key(instance_id: provider_instance || provider, model: model, operation: operation) if style.to_s == 'offering_lane'

  if style.to_s == 'shared_lane'
    return Lane.routing_key(operation: operation, model: model, context_window: context_window,
                            boundary: boundary, eligibility_fingerprint: eligibility_fingerprint)
  end

  "llm.request.#{provider}.#{operation}.#{sanitize_model(model)}"
end

.context_window_from(options) ⇒ Object



130
131
132
133
134
135
136
# File 'lib/legion/llm/fleet/dispatcher.rb', line 130

def context_window_from(options)
  limits = fetch_option(options, :limits) || {}
  fetch_option(options, :context_window) ||
    fetch_option(options, :max_context_size) ||
    fetch_option(options, :max_input_tokens) ||
    fetch_option(limits, :context_window)
end

.default_callerObject



291
292
293
294
295
296
297
# File 'lib/legion/llm/fleet/dispatcher.rb', line 291

def default_caller
  {
    source:       'legion-llm',
    component:    'fleet_dispatcher',
    requested_by: Legion::LLM::PublisherIdentity.requested_by
  }
end

.default_routing_styleObject



124
125
126
127
128
# File 'lib/legion/llm/fleet/dispatcher.rb', line 124

def default_routing_style
  Legion::Settings.dig(:llm, :fleet, :dispatch, :routing_style) ||
    Legion::Settings.dig(:llm, :routing, :tiers, :fleet, :routing_style) ||
    :shared_lane
end

.dispatch(operation: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts) ⇒ Object

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/legion/llm/fleet/dispatcher.rb', line 27

def dispatch(operation: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts)
  operation = normalize_operation(operation || fetch_option(request, :operation) || opts[:operation])
  raise ArgumentError, 'operation is required for fleet dispatch' unless operation

  request_opts = normalize_request(request).merge(opts)
  log.debug "[llm][fleet][dispatcher] action=dispatch.enter operation=#{operation} " \
            "model=#{fetch_option(request_opts, :model)} routing_key=#{routing_key} fleet_available=#{fleet_available?}"
  return error_result('fleet_unavailable', message_context: message_context) unless fleet_available?

  envelope = build_envelope(
    operation:       operation,
    request_opts:    request_opts,
    message_context: message_context,
    routing_key:     routing_key,
    reply_to:        reply_to
  )
  future = register_response(envelope[:correlation_id], expected_delivery(envelope))
  publish_result = publish_request(**envelope)
  unless publish_accepted?(publish_result)
    return publish_error_result(publish_result, envelope[:correlation_id],
                                message_context: message_context)
  end

  wait_for_response(
    envelope[:correlation_id],
    timeout:         envelope[:timeout_seconds],
    message_context: message_context,
    future:          future
  )
end

.dispatch_auth_required?Boolean

Returns:

  • (Boolean)


284
285
286
287
288
289
# File 'lib/legion/llm/fleet/dispatcher.rb', line 284

def dispatch_auth_required?
  value = Legion::Settings.dig(:llm, :fleet, :dispatch, :require_auth)
  return value != false unless value.nil?

  Legion::Settings.dig(:llm, :fleet, :auth, :require_signed_token) != false
end

.effective_ttl(options, timeout) ⇒ Object



138
139
140
141
142
143
# File 'lib/legion/llm/fleet/dispatcher.rb', line 138

def effective_ttl(options, timeout)
  ttl = fetch_option(options, :ttl)
  return ttl if ttl

  fetch_option(options, :expiration_seconds) || timeout
end

.error_result(reason, message_context: {}) ⇒ Object



274
275
276
# File 'lib/legion/llm/fleet/dispatcher.rb', line 274

def error_result(reason, message_context: {})
  { success: false, error: reason, message_context: message_context }
end

.expected_delivery(envelope) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/legion/llm/fleet/dispatcher.rb', line 103

def expected_delivery(envelope)
  {
    protocol_version: envelope[:protocol_version],
    operation:        envelope[:operation],
    correlation_id:   envelope[:correlation_id]
  }
end

.fetch_option(hash, key) ⇒ Object



169
170
171
172
173
174
175
176
# File 'lib/legion/llm/fleet/dispatcher.rb', line 169

def fetch_option(hash, key)
  return nil unless hash.respond_to?(:key?)

  string_key = key.to_s
  return hash[string_key] if hash.key?(string_key)

  hash[key] if hash.key?(key)
end

.fleet_available?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/legion/llm/fleet/dispatcher.rb', line 182

def fleet_available?
  transport_ready? && fleet_enabled?
end

.fleet_enabled?Boolean

Returns:

  • (Boolean)


190
191
192
# File 'lib/legion/llm/fleet/dispatcher.rb', line 190

def fleet_enabled?
  Legion::Settings[:llm][:fleet][:dispatch][:enabled] != false
end

.legacy_field_present?(hash, key) ⇒ Boolean

Returns:

  • (Boolean)


151
152
153
154
155
# File 'lib/legion/llm/fleet/dispatcher.rb', line 151

def legacy_field_present?(hash, key)
  return false unless hash.respond_to?(:key?)

  hash.key?(key) || hash.key?(key.to_s)
end

.next_request_idObject



203
204
205
# File 'lib/legion/llm/fleet/dispatcher.rb', line 203

def next_request_id
  "req_#{SecureRandom.uuid}"
end

.normalize_operation(operation) ⇒ Object



278
279
280
281
282
# File 'lib/legion/llm/fleet/dispatcher.rb', line 278

def normalize_operation(operation)
  return nil if operation.to_s.empty?

  operation.to_sym
end

.normalize_request(request) ⇒ Object



161
162
163
164
165
166
167
# File 'lib/legion/llm/fleet/dispatcher.rb', line 161

def normalize_request(request)
  return {} unless request.respond_to?(:to_h)

  request.to_h.transform_keys do |key|
    key.respond_to?(:to_sym) ? key.to_sym : key
  end
end

.publish_accepted?(publish_result) ⇒ Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/legion/llm/fleet/dispatcher.rb', line 220

def publish_accepted?(publish_result)
  publish_result.is_a?(Hash) && publish_result[:accepted] == true
end

.publish_error_result(publish_result, correlation_id, message_context: {}) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/legion/llm/fleet/dispatcher.rb', line 235

def publish_error_result(publish_result, correlation_id, message_context: {})
  ReplyDispatcher.deregister(correlation_id)
  status = publish_result.is_a?(Hash) ? publish_result[:status]&.to_sym : :failed
  error = case status
          when :unroutable
            'no_fleet_queue'
          when :nacked
            'fleet_backpressure'
          when :confirm_timeout
            'fleet_publish_timeout'
          else
            'fleet_publish_failed'
          end
  {
    success:         false,
    error:           error,
    publish_status:  status,
    correlation_id:  correlation_id,
    message_context: message_context
  }
end

.publish_request(**opts) ⇒ Object



211
212
213
214
215
216
217
218
# File 'lib/legion/llm/fleet/dispatcher.rb', line 211

def publish_request(**opts)
  log.debug("[llm][fleet][dispatcher] action=publish_request correlation_id=#{opts[:correlation_id]} routing_key=#{opts[:routing_key]}")
  require 'legion/extensions/llm/transport/messages/fleet_request'
  ::Legion::Extensions::Llm::Transport::Messages::FleetRequest.new(**opts).publish(request_publish_options)
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.dispatcher.publish_request')
  { accepted: false, status: :failed, error: e.message }
end

.register_response(correlation_id, expected = {}) ⇒ Object



207
208
209
# File 'lib/legion/llm/fleet/dispatcher.rb', line 207

def register_response(correlation_id, expected = {})
  ReplyDispatcher.register(correlation_id, expected: expected)
end

.reject_legacy_fields!(request_opts) ⇒ Object



145
146
147
148
149
# File 'lib/legion/llm/fleet/dispatcher.rb', line 145

def reject_legacy_fields!(request_opts)
  LEGACY_FIELDS.each do |field|
    raise ArgumentError, "#{field} is not supported by fleet protocol v2" if legacy_field_present?(request_opts, field)
  end
end

.request_params(request_opts) ⇒ Object



157
158
159
# File 'lib/legion/llm/fleet/dispatcher.rb', line 157

def request_params(request_opts)
  normalize_request(request_opts).except(*ENVELOPE_KEYS)
end

.request_publish_optionsObject



224
225
226
227
228
229
230
231
232
233
# File 'lib/legion/llm/fleet/dispatcher.rb', line 224

def request_publish_options
  dispatch = Legion::Settings[:llm][:fleet][:dispatch]
  {
    mandatory:                  dispatch[:mandatory],
    publisher_confirm:          dispatch[:publisher_confirm],
    publish_confirm_timeout_ms: dispatch[:publish_confirm_timeout_ms] || 500,
    spool:                      dispatch[:spool],
    return_result:              true
  }
end

.resolve_timeout(operation: :default, request_type: nil, override: nil) ⇒ Object



194
195
196
197
198
199
200
201
# File 'lib/legion/llm/fleet/dispatcher.rb', line 194

def resolve_timeout(operation: :default, request_type: nil, override: nil)
  return override if override

  op = (operation || request_type || :default).to_sym
  dispatch = Legion::Settings.dig(:llm, :fleet, :dispatch) || {}
  timeouts = dispatch[:timeouts] || {}
  fetch_option(timeouts, op) || dispatch[:timeout_seconds] || 30
end

.sanitize_model(model) ⇒ Object



178
179
180
# File 'lib/legion/llm/fleet/dispatcher.rb', line 178

def sanitize_model(model)
  model.to_s.gsub(':', '.')
end

.timeout_result(correlation_id, timeout, message_context: {}) ⇒ Object



269
270
271
272
# File 'lib/legion/llm/fleet/dispatcher.rb', line 269

def timeout_result(correlation_id, timeout, message_context: {})
  { success: false, error: 'fleet_timeout', correlation_id: correlation_id,
    timeout: timeout, message_context: message_context }
end

.transport_ready?Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/legion/llm/fleet/dispatcher.rb', line 186

def transport_ready?
  Legion::Settings.dig(:transport, :connected) == true
end

.wait_for_response(correlation_id, timeout:, message_context: {}, future: nil) ⇒ Object



257
258
259
260
261
262
263
264
265
266
267
# File 'lib/legion/llm/fleet/dispatcher.rb', line 257

def wait_for_response(correlation_id, timeout:, message_context: {}, future: nil)
  log.debug "[llm][fleet][dispatcher] action=wait_for_response correlation_id=#{correlation_id} timeout=#{timeout}"
  future ||= ReplyDispatcher.register(correlation_id)
  result = future.value!(timeout)
  result || timeout_result(correlation_id, timeout, message_context: message_context)
rescue Concurrent::CancelledOperationError => e
  handle_exception(e, level: :debug, handled: true, operation: 'llm.fleet.dispatcher.wait_cancelled')
  timeout_result(correlation_id, timeout, message_context: message_context)
ensure
  ReplyDispatcher.deregister(correlation_id)
end