Module: Legion::LLM::Fleet::Dispatcher
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/fleet/dispatcher.rb
Constant Summary collapse
- ENVELOPE_KEYS =
%i[ app_id caller correlation_id expires_at idempotency_key message_context operation model priority protocol_version provider provider_instance reply_to request_id routing_key signed_token timeout timeout_seconds trace_context ttl ].freeze
- LEGACY_FIELDS =
%i[schema_version request_type fleet_correlation_id].freeze
Class Method Summary collapse
- .build_envelope(operation:, request_opts:, message_context:, routing_key: nil, reply_to: nil) ⇒ Object
- .build_routing_key(provider:, operation:, model:, provider_instance: nil, context_window: nil, boundary: nil, eligibility_fingerprint: nil, routing_style: nil) ⇒ Object
- .context_window_from(options) ⇒ Object
- .default_caller ⇒ Object
- .default_routing_style ⇒ Object
- .dispatch(operation: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts) ⇒ Object
- .dispatch_auth_required? ⇒ Boolean
- .effective_ttl(options, timeout) ⇒ Object
- .error_result(reason, message_context: {}) ⇒ Object
- .expected_delivery(envelope) ⇒ Object
- .fetch_option(hash, key) ⇒ Object
- .fleet_available? ⇒ Boolean
- .fleet_enabled? ⇒ Boolean
- .legacy_field_present?(hash, key) ⇒ Boolean
- .next_request_id ⇒ Object
- .normalize_operation(operation) ⇒ Object
- .normalize_request(request) ⇒ Object
- .publish_accepted?(publish_result) ⇒ Boolean
- .publish_error_result(publish_result, correlation_id, message_context: {}) ⇒ Object
- .publish_request(**opts) ⇒ Object
- .register_response(correlation_id, expected = {}) ⇒ Object
- .reject_legacy_fields!(request_opts) ⇒ Object
- .request_params(request_opts) ⇒ Object
- .request_publish_options ⇒ Object
- .resolve_timeout(operation: :default, request_type: nil, override: nil) ⇒ Object
- .sanitize_model(model) ⇒ Object
- .timeout_result(correlation_id, timeout, message_context: {}) ⇒ Object
- .transport_ready? ⇒ Boolean
- .wait_for_response(correlation_id, timeout:, message_context: {}, future: nil) ⇒ Object
Class Method Details
.build_envelope(operation:, request_opts:, message_context:, routing_key: nil, reply_to: nil) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 57 def build_envelope(operation:, request_opts:, message_context:, routing_key: nil, reply_to: nil) provider = fetch_option(request_opts, :provider) || 'ollama' reject_legacy_fields!(request_opts) provider_instance = fetch_option(request_opts, :provider_instance) || fetch_option(request_opts, :instance) || 'default' model = fetch_option(request_opts, :model) timeout = resolve_timeout(operation: operation, override: fetch_option(request_opts, :timeout)) request_id = next_request_id correlation_id = next_request_id reply_to ||= ReplyDispatcher.agent_queue_name routing_key ||= build_routing_key( provider: provider, operation: operation, model: model, provider_instance: provider_instance, context_window: context_window_from(request_opts), boundary: fetch_option(request_opts, :network_boundary), eligibility_fingerprint: fetch_option(request_opts, :eligibility_fingerprint), routing_style: fetch_option(request_opts, :routing_style) ) envelope = { protocol_version: ::Legion::Extensions::Llm::Fleet::Protocol::VERSION, request_id: request_id, correlation_id: correlation_id, idempotency_key: fetch_option(request_opts, :idempotency_key) || "idem_#{SecureRandom.uuid}", operation: operation, provider: provider, provider_instance: provider_instance, model: model, params: request_params(request_opts), routing_key: routing_key, reply_to: reply_to, message_context: || {}, caller: fetch_option(request_opts, :caller) || default_caller, trace_context: fetch_option(request_opts, :trace_context) || {}, timeout_seconds: timeout, expires_at: (Time.now.utc + timeout).iso8601, ttl: effective_ttl(request_opts, timeout) } envelope[:signed_token] = dispatch_auth_required? ? TokenIssuer.issue(envelope) : 'unsigned' envelope end |
.build_routing_key(provider:, operation:, model:, provider_instance: nil, context_window: nil, boundary: nil, eligibility_fingerprint: nil, routing_style: nil) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 109 def build_routing_key(provider:, operation:, model:, provider_instance: nil, context_window: nil, boundary: nil, eligibility_fingerprint: nil, routing_style: nil) style = routing_style || default_routing_style return Lane.offering_key(instance_id: provider_instance || provider, model: model, operation: operation) if style.to_s == 'offering_lane' if style.to_s == 'shared_lane' return Lane.routing_key(operation: operation, model: model, context_window: context_window, boundary: boundary, eligibility_fingerprint: eligibility_fingerprint) end "llm.request.#{provider}.#{operation}.#{sanitize_model(model)}" end |
.context_window_from(options) ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 131 def context_window_from() limits = fetch_option(, :limits) || {} fetch_option(, :context_window) || fetch_option(, :max_context_size) || fetch_option(, :max_input_tokens) || fetch_option(limits, :context_window) end |
.default_caller ⇒ Object
295 296 297 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 295 def default_caller { source: 'legion-llm', component: 'fleet_dispatcher' } end |
.default_routing_style ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 122 def default_routing_style Legion::LLM::Settings.value(:fleet, :dispatch, :routing_style) || Legion::LLM::Settings.value(:routing, :tiers, :fleet, :routing_style) || :shared_lane rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.dispatcher.default_routing_style') :shared_lane end |
.dispatch(operation: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 26 def dispatch(operation: nil, request: nil, message_context: {}, routing_key: nil, reply_to: nil, **opts) operation = normalize_operation(operation || fetch_option(request, :operation) || opts[:operation]) raise ArgumentError, 'operation is required for fleet dispatch' unless operation request_opts = normalize_request(request).merge(opts) log.debug "[llm][fleet][dispatcher] action=dispatch.enter operation=#{operation} " \ "model=#{fetch_option(request_opts, :model)} routing_key=#{routing_key} fleet_available=#{fleet_available?}" return error_result('fleet_unavailable', message_context: ) unless fleet_available? envelope = build_envelope( operation: operation, request_opts: request_opts, message_context: , routing_key: routing_key, reply_to: reply_to ) future = register_response(envelope[:correlation_id], expected_delivery(envelope)) publish_result = publish_request(**envelope) unless publish_accepted?(publish_result) return publish_error_result(publish_result, envelope[:correlation_id], message_context: ) end wait_for_response( envelope[:correlation_id], timeout: envelope[:timeout_seconds], message_context: , future: future ) end |
.dispatch_auth_required? ⇒ Boolean
288 289 290 291 292 293 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 288 def dispatch_auth_required? value = Legion::LLM::Settings.value(:fleet, :dispatch, :require_auth, default: nil) return value != false unless value.nil? Legion::LLM::Settings.value(:fleet, :auth, :require_signed_token, default: true) != false end |
.effective_ttl(options, timeout) ⇒ Object
139 140 141 142 143 144 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 139 def effective_ttl(, timeout) ttl = fetch_option(, :ttl) return ttl if ttl fetch_option(, :expiration_seconds) || timeout end |
.error_result(reason, message_context: {}) ⇒ Object
278 279 280 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 278 def error_result(reason, message_context: {}) { success: false, error: reason, message_context: } end |
.expected_delivery(envelope) ⇒ Object
101 102 103 104 105 106 107 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 101 def expected_delivery(envelope) { protocol_version: envelope[:protocol_version], operation: envelope[:operation], correlation_id: envelope[:correlation_id] } end |
.fetch_option(hash, key) ⇒ Object
170 171 172 173 174 175 176 177 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 170 def fetch_option(hash, key) return nil unless hash.respond_to?(:key?) string_key = key.to_s return hash[string_key] if hash.key?(string_key) hash[key] if hash.key?(key) end |
.fleet_available? ⇒ Boolean
183 184 185 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 183 def fleet_available? transport_ready? && fleet_enabled? end |
.fleet_enabled? ⇒ Boolean
191 192 193 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 191 def fleet_enabled? Legion::LLM::Settings.value(:fleet, :dispatch, :enabled, default: true) != false end |
.legacy_field_present?(hash, key) ⇒ Boolean
152 153 154 155 156 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 152 def legacy_field_present?(hash, key) return false unless hash.respond_to?(:key?) hash.key?(key) || hash.key?(key.to_s) end |
.next_request_id ⇒ Object
208 209 210 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 208 def next_request_id "req_#{SecureRandom.uuid}" end |
.normalize_operation(operation) ⇒ Object
282 283 284 285 286 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 282 def normalize_operation(operation) return nil if operation.to_s.empty? operation.to_sym end |
.normalize_request(request) ⇒ Object
162 163 164 165 166 167 168 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 162 def normalize_request(request) return {} unless request.respond_to?(:to_h) request.to_h.transform_keys do |key| key.respond_to?(:to_sym) ? key.to_sym : key end end |
.publish_accepted?(publish_result) ⇒ Boolean
225 226 227 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 225 def publish_accepted?(publish_result) publish_result.is_a?(Hash) && publish_result[:accepted] == true end |
.publish_error_result(publish_result, correlation_id, message_context: {}) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 239 def publish_error_result(publish_result, correlation_id, message_context: {}) ReplyDispatcher.deregister(correlation_id) status = publish_result.is_a?(Hash) ? publish_result[:status]&.to_sym : :failed error = case status when :unroutable 'no_fleet_queue' when :nacked 'fleet_backpressure' when :confirm_timeout 'fleet_publish_timeout' else 'fleet_publish_failed' end { success: false, error: error, publish_status: status, correlation_id: correlation_id, message_context: } end |
.publish_request(**opts) ⇒ Object
216 217 218 219 220 221 222 223 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 216 def publish_request(**opts) log.debug("[llm][fleet][dispatcher] action=publish_request correlation_id=#{opts[:correlation_id]} routing_key=#{opts[:routing_key]}") require 'legion/extensions/llm/transport/messages/fleet_request' ::Legion::Extensions::Llm::Transport::Messages::FleetRequest.new(**opts).publish() rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.dispatcher.publish_request') { accepted: false, status: :failed, error: e. } end |
.register_response(correlation_id, expected = {}) ⇒ Object
212 213 214 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 212 def register_response(correlation_id, expected = {}) ReplyDispatcher.register(correlation_id, expected: expected) end |
.reject_legacy_fields!(request_opts) ⇒ Object
146 147 148 149 150 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 146 def reject_legacy_fields!(request_opts) LEGACY_FIELDS.each do |field| raise ArgumentError, "#{field} is not supported by fleet protocol v2" if legacy_field_present?(request_opts, field) end end |
.request_params(request_opts) ⇒ Object
158 159 160 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 158 def request_params(request_opts) normalize_request(request_opts).except(*ENVELOPE_KEYS) end |
.request_publish_options ⇒ Object
229 230 231 232 233 234 235 236 237 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 229 def { mandatory: Legion::LLM::Settings.value(:fleet, :dispatch, :mandatory, default: true), publisher_confirm: Legion::LLM::Settings.value(:fleet, :dispatch, :publisher_confirm, default: true), publish_confirm_timeout_ms: Legion::LLM::Settings.value(:fleet, :dispatch, :publish_confirm_timeout_ms, default: 500), spool: Legion::LLM::Settings.value(:fleet, :dispatch, :spool, default: false), return_result: true } end |
.resolve_timeout(operation: :default, request_type: nil, override: nil) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 195 def resolve_timeout(operation: :default, request_type: nil, override: nil) return override if override op = (operation || request_type || :default).to_sym timeouts = Legion::LLM::Settings.value(:fleet, :dispatch, :timeouts, default: {}) || {} fetch_option(timeouts, op) || Legion::LLM::Settings.value(:fleet, :dispatch, :timeout_seconds, default: 30) || 30 rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.dispatcher.resolve_timeout') 30 end |
.sanitize_model(model) ⇒ Object
179 180 181 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 179 def sanitize_model(model) model.to_s.gsub(':', '.') end |
.timeout_result(correlation_id, timeout, message_context: {}) ⇒ Object
273 274 275 276 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 273 def timeout_result(correlation_id, timeout, message_context: {}) { success: false, error: 'fleet_timeout', correlation_id: correlation_id, timeout: timeout, message_context: } end |
.transport_ready? ⇒ Boolean
187 188 189 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 187 def transport_ready? Legion::LLM::Settings.transport_connected? end |
.wait_for_response(correlation_id, timeout:, message_context: {}, future: nil) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/legion/llm/fleet/dispatcher.rb', line 261 def wait_for_response(correlation_id, timeout:, message_context: {}, future: nil) log.debug "[llm][fleet][dispatcher] action=wait_for_response correlation_id=#{correlation_id} timeout=#{timeout}" future ||= ReplyDispatcher.register(correlation_id) result = future.value!(timeout) result || timeout_result(correlation_id, timeout, message_context: ) rescue Concurrent::CancelledOperationError => e handle_exception(e, level: :debug, handled: true, operation: 'llm.fleet.dispatcher.wait_cancelled') timeout_result(correlation_id, timeout, message_context: ) ensure ReplyDispatcher.deregister(correlation_id) end |