Module: Legion::Extensions::Llm::Fleet::WorkerExecution
- Extended by:
- Logging::Helper
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/llm/fleet/worker_execution.rb
Overview
Applies responder-side policy and dispatches a fleet request to a local lex-llm provider.
Defined Under Namespace
Classes: PolicyError
Constant Summary collapse
- MAX_IDEMPOTENCY_ENTRIES =
100_000
Class Method Summary collapse
- .auth_required? ⇒ Boolean
- .call(envelope:, provider:) ⇒ Object
- .dispatch_local_provider!(envelope:, provider:) ⇒ Object
- .envelope_value(envelope, key) ⇒ Object
- .evict_oldest_idempotency_entries! ⇒ Object
- .except(hash, *keys) ⇒ Object
- .idempotency_ttl_seconds ⇒ Object
- .mark_idempotency_success!(key) ⇒ Object
- .normalize_hash(hash) ⇒ Object
- .purge_idempotency_cache! ⇒ Object
- .release_idempotency!(key) ⇒ Object
- .release_replay!(claims) ⇒ Object
- .reserve_idempotency_key!(key) ⇒ Object
- .reset_idempotency_cache! ⇒ Object
- .responder_setting(key, default:) ⇒ Object
- .unpack_legacy_options(params) ⇒ Object
- .validate_idempotency!(envelope) ⇒ Object
- .validate_identity!(envelope) ⇒ Object
-
.validate_policy!(_envelope) ⇒ Object
rubocop:disable Naming/PredicateMethod.
Class Method Details
.auth_required? ⇒ Boolean
161 162 163 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 161 def auth_required? Settings.value(:fleet, :auth, :require_signed_token, default: true) != false end |
.call(envelope:, provider:) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 24 def call(envelope:, provider:) claims = nil idempotency_key = nil claims = validate_identity!(envelope) validate_policy!(envelope) idempotency_key = validate_idempotency!(envelope) response = dispatch_local_provider!(envelope: envelope, provider: provider) mark_idempotency_success!(idempotency_key) if idempotency_key TokenValidator.mark_replay!(claims[:jti]) if claims.is_a?(Hash) response rescue TokenError => e handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.worker_execution.identity') release_idempotency!(idempotency_key) if idempotency_key release_replay!(claims) raise PolicyError, e. rescue StandardError => e handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.worker_execution.call') release_idempotency!(idempotency_key) if idempotency_key release_replay!(claims) raise end |
.dispatch_local_provider!(envelope:, provider:) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 69 def dispatch_local_provider!(envelope:, provider:) provider = provider.call(envelope) if provider.respond_to?(:call) && !provider.respond_to?(:chat) operation = envelope_value(envelope, :operation).to_sym params = normalize_hash(envelope_value(envelope, :params) || {}) params = (params) model = envelope_value(envelope, :model) case operation when :chat provider.chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages)) when :stream provider.stream_chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages)) when :embed provider.(text: params[:text], model: model, **except(params, :text)) when :count_tokens provider.count_tokens(messages: params.fetch(:messages, []), model: model, **except(params, :messages)) else raise PolicyError, "unsupported fleet operation: #{operation}" end end |
.envelope_value(envelope, key) ⇒ Object
165 166 167 168 169 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 165 def envelope_value(envelope, key) return nil unless envelope.respond_to?(:key?) envelope[key] || envelope[key.to_s] end |
.evict_oldest_idempotency_entries! ⇒ Object
131 132 133 134 135 136 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 131 def evict_oldest_idempotency_entries! sorted = @idempotency_keys.each_pair.sort_by { |_key, entry| entry[:expires_at] } sorted.first(@idempotency_keys.size - MAX_IDEMPOTENCY_ENTRIES).each_key do |key| @idempotency_keys.delete(key) end end |
.except(hash, *keys) ⇒ Object
179 180 181 182 183 184 185 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 179 def except(hash, *keys) exclusions = keys.map(&:to_sym) hash.each_with_object({}) do |(key, value), result| normalized_key = key.respond_to?(:to_sym) ? key.to_sym : key result[normalized_key] = value unless exclusions.include?(normalized_key) end end |
.idempotency_ttl_seconds ⇒ Object
148 149 150 151 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 148 def idempotency_ttl_seconds ttl = responder_setting(:idempotency_ttl_seconds, default: 600).to_i ttl.positive? ? ttl : 600 end |
.mark_idempotency_success!(key) ⇒ Object
103 104 105 106 107 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 103 def mark_idempotency_success!(key) @idempotency_mutex.synchronize do @idempotency_keys[key.to_s] = { state: :complete, expires_at: Time.now.to_i + idempotency_ttl_seconds } end end |
.normalize_hash(hash) ⇒ Object
171 172 173 174 175 176 177 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 171 def normalize_hash(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = value end end |
.purge_idempotency_cache! ⇒ Object
121 122 123 124 125 126 127 128 129 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 121 def purge_idempotency_cache! @idempotency_mutex.synchronize do now = Time.now.to_i @idempotency_keys.each_pair do |key, entry| @idempotency_keys.delete(key) if entry[:expires_at] <= now end evict_oldest_idempotency_entries! if @idempotency_keys.size > MAX_IDEMPOTENCY_ENTRIES end end |
.release_idempotency!(key) ⇒ Object
109 110 111 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 109 def release_idempotency!(key) @idempotency_mutex.synchronize { @idempotency_keys.delete(key.to_s) } end |
.release_replay!(claims) ⇒ Object
113 114 115 116 117 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 113 def release_replay!(claims) return unless claims.is_a?(Hash) && claims[:jti] TokenValidator.release_replay!(claims[:jti]) end |
.reserve_idempotency_key!(key) ⇒ Object
138 139 140 141 142 143 144 145 146 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 138 def reserve_idempotency_key!(key) @idempotency_mutex.synchronize do now = Time.now.to_i existing = @idempotency_keys[key] raise PolicyError, 'duplicate fleet idempotency key' if existing && existing[:expires_at] > now @idempotency_keys[key] = { state: :inflight, expires_at: now + idempotency_ttl_seconds } end end |
.reset_idempotency_cache! ⇒ Object
98 99 100 101 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 98 def reset_idempotency_cache! @idempotency_keys = Concurrent::Map.new @idempotency_mutex = Mutex.new end |
.responder_setting(key, default:) ⇒ Object
153 154 155 156 157 158 159 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 153 def responder_setting(key, default:) value = Settings.value(:fleet, :responder, key, default: nil) return auth_required? if key == :require_auth && value.nil? return default if value.nil? value end |
.unpack_legacy_options(params) ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 90 def (params) = params.delete(:options) return params unless .is_a?(Hash) normalize_hash().each { |key, value| params[key] = value unless params.key?(key) } params end |
.validate_idempotency!(envelope) ⇒ Object
59 60 61 62 63 64 65 66 67 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 59 def validate_idempotency!(envelope) return nil unless responder_setting(:require_idempotency, default: true) key = envelope_value(envelope, :idempotency_key) raise PolicyError, 'fleet idempotency_key is required' if key.to_s.empty? reserve_idempotency_key!(key.to_s) key.to_s end |
.validate_identity!(envelope) ⇒ Object
46 47 48 49 50 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 46 def validate_identity!(envelope) return true unless responder_setting(:require_auth, default: true) TokenValidator.validate!(token: envelope_value(envelope, :signed_token), envelope: envelope) end |
.validate_policy!(_envelope) ⇒ Object
rubocop:disable Naming/PredicateMethod
52 53 54 55 56 57 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 52 def validate_policy!(_envelope) # rubocop:disable Naming/PredicateMethod return true unless responder_setting(:require_policy, default: false) log.warn('[fleet] require_policy is enabled but no policy engine is configured — allowing request') true end |