Module: Legion::LLM::Fleet::Dispatcher

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/dispatcher.rb

Class Method Summary collapse

Class Method Details

.build_routing_key(provider:, request_type:, model:, provider_instance: nil, context_window: nil, boundary: nil, eligibility_fingerprint: nil, routing_style: nil) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/legion/llm/fleet/dispatcher.rb', line 77

def build_routing_key(provider:, request_type:, model:, provider_instance: nil, context_window: nil, boundary: nil,
                      eligibility_fingerprint: nil, routing_style: nil)
  style = routing_style || default_routing_style
  return Lane.offering_key(instance_id: provider_instance || provider, model: model, operation: request_type) if style.to_s == 'offering_lane'

  if style.to_s == 'shared_lane'
    return Lane.routing_key(operation: request_type, model: model, context_window: context_window,
                            boundary: boundary, eligibility_fingerprint: eligibility_fingerprint)
  end

  "llm.request.#{provider}.#{request_type}.#{sanitize_model(model)}"
end

.context_window_from(options) ⇒ Object



99
100
101
102
103
104
105
# File 'lib/legion/llm/fleet/dispatcher.rb', line 99

def context_window_from(options)
  limits = fetch_option(options, :limits) || {}
  fetch_option(options, :context_window) ||
    fetch_option(options, :max_context_size) ||
    fetch_option(options, :max_input_tokens) ||
    fetch_option(limits, :context_window)
end

.default_routing_styleObject



90
91
92
93
94
95
96
97
# File 'lib/legion/llm/fleet/dispatcher.rb', line 90

def default_routing_style
  Legion::LLM::Settings.value(:routing, :tiers, :fleet, :routing_style) ||
    Legion::LLM::Settings.value(:routing, :fleet, :routing_style) ||
    :shared_lane
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.dispatcher.default_routing_style')
  :shared_lane
end

.dispatch(model: nil, messages: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts) ⇒ Object

Backwards-compatible shim: supports old (model:, messages:) and new (request:, message_context:) callers



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/legion/llm/fleet/dispatcher.rb', line 14

def dispatch(model: nil, messages: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts)
  return error_result('fleet_unavailable', message_context: message_context) unless fleet_available?

  # Old calling convention: build minimal params from model/messages
  if request.nil? && (model || messages)
    provider = opts[:provider] || 'ollama'
    request_type = opts[:request_type] || 'chat'
    routing_key ||= build_routing_key(provider: provider, request_type: request_type, model: model,
                                      provider_instance: opts[:provider_instance],
                                      context_window: context_window_from(opts),
                                      boundary: opts[:network_boundary],
                                      eligibility_fingerprint: opts[:eligibility_fingerprint],
                                      routing_style: opts[:routing_style])
    reply_to ||= ReplyDispatcher.agent_queue_name
    correlation_id = next_correlation_id
    future = register_response(correlation_id)
    timeout = resolve_timeout(request_type: request_type, override: opts[:timeout])
    publish_opts = opts.except(:timeout).merge(ttl: effective_ttl(opts, timeout))
    publish_result = publish_request(
      correlation_id: correlation_id,
      routing_key: routing_key, reply_to: reply_to,
      provider: provider, model: model, request_type: request_type,
      messages: messages, message_context: message_context, **publish_opts
    )
    return publish_error_result(publish_result, correlation_id, message_context: message_context) unless publish_accepted?(publish_result)

    return wait_for_response(correlation_id, timeout: timeout, message_context: message_context, future: future)
  end

  # New calling convention
  request_opts =
    if request.respond_to?(:to_h)
      request.to_h.transform_keys(&:to_sym)
    else
      {}
    end
  request_opts = request_opts.merge(opts)

  provider = request_opts[:provider] || 'ollama'
  request_type = request_opts[:request_type] || 'chat'
  model = request_opts[:model]
  routing_key ||= build_routing_key(provider: provider, request_type: request_type, model: model,
                                    provider_instance: request_opts[:provider_instance],
                                    context_window: context_window_from(request_opts),
                                    boundary: request_opts[:network_boundary],
                                    eligibility_fingerprint: request_opts[:eligibility_fingerprint],
                                    routing_style: request_opts[:routing_style] || opts[:routing_style])
  reply_to ||= ReplyDispatcher.agent_queue_name
  correlation_id = next_correlation_id
  future = register_response(correlation_id)
  timeout = resolve_timeout(request_type: request_type, override: request_opts[:timeout] || opts[:timeout])
  request_opts[:ttl] = effective_ttl(request_opts, timeout)
  publish_result = publish_request(
    correlation_id: correlation_id,
    routing_key: routing_key, reply_to: reply_to,
    provider: provider, model: model, request_type: request_type,
    message_context: message_context, **request_opts.except(:provider, :model, :request_type, :timeout)
  )
  return publish_error_result(publish_result, correlation_id, message_context: message_context) unless publish_accepted?(publish_result)

  wait_for_response(correlation_id, timeout: timeout, message_context: message_context, future: future)
end

.effective_ttl(options, timeout) ⇒ Object



107
108
109
# File 'lib/legion/llm/fleet/dispatcher.rb', line 107

def effective_ttl(options, timeout)
  fetch_option(options, :ttl) || fetch_option(options, :expiration_seconds) || timeout
end

.error_result(reason, message_context: {}) ⇒ Object



221
222
223
# File 'lib/legion/llm/fleet/dispatcher.rb', line 221

def error_result(reason, message_context: {})
  { success: false, error: reason, message_context: message_context }
end

.fetch_option(hash, key) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/legion/llm/fleet/dispatcher.rb', line 111

def fetch_option(hash, key)
  return nil unless hash.respond_to?(:key?)

  string_key = key.to_s
  return hash[string_key] if hash.key?(string_key)

  hash[key] if hash.key?(key)
end

.fleet_available?Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/legion/llm/fleet/dispatcher.rb', line 132

def fleet_available?
  transport_ready? && fleet_enabled?
end

.fleet_enabled?Boolean

Returns:

  • (Boolean)


140
141
142
143
144
145
# File 'lib/legion/llm/fleet/dispatcher.rb', line 140

def fleet_enabled?
  routing = Legion::LLM::Settings.value(:routing, default: {})
  return true unless routing.is_a?(Hash)

  fetch_option(routing, :use_fleet) != false
end

.nested_fetch(hash, *keys) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/legion/llm/fleet/dispatcher.rb', line 120

def nested_fetch(hash, *keys)
  keys.reduce(hash) do |current, key|
    return nil unless current.respond_to?(:key?)

    fetch_option(current, key)
  end
end

.next_correlation_idObject



160
161
162
# File 'lib/legion/llm/fleet/dispatcher.rb', line 160

def next_correlation_id
  "req_#{SecureRandom.uuid}"
end

.publish_accepted?(publish_result) ⇒ Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/legion/llm/fleet/dispatcher.rb', line 180

def publish_accepted?(publish_result)
  publish_result.is_a?(Hash) ? publish_result[:accepted] == true : true
end

.publish_error_result(publish_result, correlation_id, message_context: {}) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/legion/llm/fleet/dispatcher.rb', line 184

def publish_error_result(publish_result, correlation_id, message_context: {})
  ReplyDispatcher.deregister(correlation_id)
  status = publish_result.is_a?(Hash) ? publish_result[:status]&.to_sym : :failed
  error = case status
          when :unroutable
            'no_fleet_queue'
          when :nacked
            'fleet_backpressure'
          when :confirm_timeout
            'fleet_publish_timeout'
          else
            'fleet_publish_failed'
          end
  {
    success:         false,
    error:           error,
    publish_status:  status,
    correlation_id:  correlation_id,
    message_context: message_context
  }
end

.publish_request(correlation_id: next_correlation_id, **opts) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/legion/llm/fleet/dispatcher.rb', line 168

def publish_request(correlation_id: next_correlation_id, **opts)
  opts[:fleet_correlation_id] = correlation_id
  log.debug("[llm][fleet][dispatcher] action=publish_request correlation_id=#{correlation_id} routing_key=#{opts[:routing_key]}")

  if defined?(Legion::LLM::Transport::Messages::FleetRequest)
    Legion::LLM::Transport::Messages::FleetRequest.new(**opts).publish
  else
    log.debug('[llm][fleet][dispatcher] action=skip_publish reason=transport_not_loaded')
    { accepted: false, status: :failed, error: 'transport_not_loaded' }
  end
end

.register_response(correlation_id, expected = {}) ⇒ Object



164
165
166
# File 'lib/legion/llm/fleet/dispatcher.rb', line 164

def register_response(correlation_id, expected = {})
  ReplyDispatcher.register(correlation_id, expected: expected)
end

.resolve_timeout(request_type: :default, override: nil) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/legion/llm/fleet/dispatcher.rb', line 147

def resolve_timeout(request_type: :default, override: nil)
  return override if override

  fleet = Legion::LLM::Settings.value(:routing, :tiers, :fleet) ||
          Legion::LLM::Settings.value(:routing, :fleet) ||
          {}
  timeouts = fetch_option(fleet, :timeouts) || {}
  fetch_option(timeouts, request_type.to_sym) || fetch_option(fleet, :timeout_seconds) || 30
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.dispatcher.resolve_timeout')
  30
end

.sanitize_model(model) ⇒ Object



128
129
130
# File 'lib/legion/llm/fleet/dispatcher.rb', line 128

def sanitize_model(model)
  model.to_s.gsub(':', '.')
end

.timeout_result(correlation_id, timeout, message_context: {}) ⇒ Object



216
217
218
219
# File 'lib/legion/llm/fleet/dispatcher.rb', line 216

def timeout_result(correlation_id, timeout, message_context: {})
  { success: false, error: 'fleet_timeout', correlation_id: correlation_id,
    timeout: timeout, message_context: message_context }
end

.transport_ready?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/legion/llm/fleet/dispatcher.rb', line 136

def transport_ready?
  Legion::LLM::Settings.transport_connected?
end

.wait_for_response(correlation_id, timeout:, message_context: {}, future: nil) ⇒ Object



206
207
208
209
210
211
212
213
214
# File 'lib/legion/llm/fleet/dispatcher.rb', line 206

def wait_for_response(correlation_id, timeout:, message_context: {}, future: nil)
  future ||= ReplyDispatcher.register(correlation_id)
  result = future.value!(timeout)
  result || timeout_result(correlation_id, timeout, message_context: message_context)
rescue Concurrent::CancelledOperationError
  timeout_result(correlation_id, timeout, message_context: message_context)
ensure
  ReplyDispatcher.deregister(correlation_id)
end