Module: Legion::Extensions::Llm::Fleet::WorkerExecution
- Defined in:
- lib/legion/extensions/llm/fleet/worker_execution.rb
Overview
Applies responder-side policy and dispatches a fleet request to a local lex-llm provider.
Defined Under Namespace
Classes: PolicyError
Class Method Summary collapse
- .auth_required? ⇒ Boolean
- .call(envelope:, provider:) ⇒ Object
- .dispatch_local_provider!(envelope:, provider:) ⇒ Object
- .envelope_value(envelope, key) ⇒ Object
- .except(hash, *keys) ⇒ Object
- .idempotency_ttl_seconds ⇒ Object
- .mark_idempotency_success!(key) ⇒ Object
- .normalize_hash(hash) ⇒ Object
- .purge_idempotency_cache! ⇒ Object
- .release_idempotency!(key) ⇒ Object
- .release_replay!(claims) ⇒ Object
- .reserve_idempotency_key!(key) ⇒ Object
- .reset_idempotency_cache! ⇒ Object
- .responder_setting(key, default:) ⇒ Object
- .validate_idempotency!(envelope) ⇒ Object
- .validate_identity!(envelope) ⇒ Object
- .validate_policy!(_envelope) ⇒ Object
Class Method Details
.auth_required? ⇒ Boolean
136 137 138 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 136 def auth_required? Settings.value(:fleet, :auth, :require_signed_token, default: true) != false end |
.call(envelope:, provider:) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 21 def call(envelope:, provider:) claims = nil idempotency_key = nil claims = validate_identity!(envelope) validate_policy!(envelope) idempotency_key = validate_idempotency!(envelope) response = dispatch_local_provider!(envelope: envelope, provider: provider) mark_idempotency_success!(idempotency_key) if idempotency_key TokenValidator.mark_replay!(claims[:jti]) if claims.is_a?(Hash) response rescue TokenError => e release_idempotency!(idempotency_key) if idempotency_key release_replay!(claims) raise PolicyError, e. rescue StandardError release_idempotency!(idempotency_key) if idempotency_key release_replay!(claims) raise end |
.dispatch_local_provider!(envelope:, provider:) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 63 def dispatch_local_provider!(envelope:, provider:) provider = provider.call(envelope) if provider.respond_to?(:call) && !provider.respond_to?(:chat) operation = envelope_value(envelope, :operation).to_sym params = normalize_hash(envelope_value(envelope, :params) || {}) model = envelope_value(envelope, :model) case operation when :chat provider.chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages)) when :stream provider.stream_chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages)) when :embed provider.(text: params[:text], model: model, **except(params, :text)) when :count_tokens provider.count_tokens(messages: params.fetch(:messages, []), model: model, **except(params, :messages)) else raise PolicyError, "unsupported fleet operation: #{operation}" end end |
.envelope_value(envelope, key) ⇒ Object
140 141 142 143 144 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 140 def envelope_value(envelope, key) return nil unless envelope.respond_to?(:key?) envelope[key] || envelope[key.to_s] end |
.except(hash, *keys) ⇒ Object
154 155 156 157 158 159 160 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 154 def except(hash, *keys) exclusions = keys.map(&:to_sym) hash.each_with_object({}) do |(key, value), result| normalized_key = key.respond_to?(:to_sym) ? key.to_sym : key result[normalized_key] = value unless exclusions.include?(normalized_key) end end |
.idempotency_ttl_seconds ⇒ Object
123 124 125 126 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 123 def idempotency_ttl_seconds ttl = responder_setting(:idempotency_ttl_seconds, default: 600).to_i ttl.positive? ? ttl : 600 end |
.mark_idempotency_success!(key) ⇒ Object
88 89 90 91 92 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 88 def mark_idempotency_success!(key) @idempotency_mutex.synchronize do @idempotency_keys[key.to_s] = { state: :complete, expires_at: Time.now.to_i + idempotency_ttl_seconds } end end |
.normalize_hash(hash) ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 146 def normalize_hash(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = value end end |
.purge_idempotency_cache! ⇒ Object
104 105 106 107 108 109 110 111 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 104 def purge_idempotency_cache! @idempotency_mutex.synchronize do now = Time.now.to_i @idempotency_keys.each_pair do |key, entry| @idempotency_keys.delete(key) if entry[:expires_at] <= now end end end |
.release_idempotency!(key) ⇒ Object
94 95 96 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 94 def release_idempotency!(key) @idempotency_mutex.synchronize { @idempotency_keys.delete(key.to_s) } end |
.release_replay!(claims) ⇒ Object
98 99 100 101 102 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 98 def release_replay!(claims) return unless claims.is_a?(Hash) && claims[:jti] TokenValidator.release_replay!(claims[:jti]) end |
.reserve_idempotency_key!(key) ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 113 def reserve_idempotency_key!(key) @idempotency_mutex.synchronize do now = Time.now.to_i existing = @idempotency_keys[key] raise PolicyError, 'duplicate fleet idempotency key' if existing && existing[:expires_at] > now @idempotency_keys[key] = { state: :inflight, expires_at: now + idempotency_ttl_seconds } end end |
.reset_idempotency_cache! ⇒ Object
83 84 85 86 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 83 def reset_idempotency_cache! @idempotency_keys = Concurrent::Map.new @idempotency_mutex = Mutex.new end |
.responder_setting(key, default:) ⇒ Object
128 129 130 131 132 133 134 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 128 def responder_setting(key, default:) value = Settings.value(:fleet, :responder, key, default: nil) return auth_required? if key == :require_auth && value.nil? return default if value.nil? value end |
.validate_idempotency!(envelope) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 53 def validate_idempotency!(envelope) return nil unless responder_setting(:require_idempotency, default: true) key = envelope_value(envelope, :idempotency_key) raise PolicyError, 'fleet idempotency_key is required' if key.to_s.empty? reserve_idempotency_key!(key.to_s) key.to_s end |
.validate_identity!(envelope) ⇒ Object
41 42 43 44 45 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 41 def validate_identity!(envelope) return true unless responder_setting(:require_auth, default: true) TokenValidator.validate!(token: envelope_value(envelope, :signed_token), envelope: envelope) end |
.validate_policy!(_envelope) ⇒ Object
47 48 49 50 51 |
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 47 def validate_policy!(_envelope) return true unless responder_setting(:require_policy, default: false) raise PolicyError, 'fleet responder policy enforcement unavailable' end |