Module: Legion::Extensions::Llm::Fleet::TokenValidator
- Extended by:
- Logging::Helper
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/llm/fleet/token_validator.rb
Overview
Verifies responder-side fleet JWTs and prevents replay on provider nodes.
Constant Summary collapse
- SCALAR_CLAIMS =
%i[ request_id correlation_id idempotency_key operation provider provider_instance model reply_to timeout_seconds expires_at ].freeze
- HASHABLE_CLAIMS =
%i[message_context params caller trace_context].freeze
- MAX_REPLAY_ENTRIES =
100_000
Class Method Summary collapse
- .accepted_issuer?(value) ⇒ Boolean
- .accepted_issuers ⇒ Object
- .active_replay?(entry, now) ⇒ Boolean
- .algorithm ⇒ Object
- .audience ⇒ Object
- .canonical_value(value) ⇒ Object
- .clock_skew_seconds ⇒ Object
- .content_hash(value) ⇒ Object
- .ensure_not_replayed!(jti) ⇒ Object
- .evict_oldest_replay_entries! ⇒ Object
- .issuer ⇒ Object
- .jwt_module ⇒ Object
- .mark_replay!(jti) ⇒ Object
- .purge_replay_cache! ⇒ Object
- .purge_replay_cache_locked!(now) ⇒ Object
- .release_replay!(jti) ⇒ Object
- .replay_entry(state, now = Time.now.to_i) ⇒ Object
- .replay_ttl_seconds ⇒ Object
- .reserve_replay!(jti) ⇒ Object
- .reset_replay_cache! ⇒ Object
- .signing_key ⇒ Object
- .symbolize_keys(hash) ⇒ Object
- .validate!(token:, envelope:, record_replay: true) ⇒ Object
- .validate_envelope_claims!(claims, envelope) ⇒ Object
- .validate_registered_claims!(claims) ⇒ Object
- .validate_request_expiry!(claims) ⇒ Object
Class Method Details
.accepted_issuer?(value) ⇒ Boolean
162 163 164 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 162 def accepted_issuer?(value) accepted_issuers.map(&:to_s).include?(value.to_s) end |
.accepted_issuers ⇒ Object
166 167 168 169 170 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 166 def accepted_issuers issuers = Settings.value(:fleet, :auth, :accepted_issuers, default: [issuer]) issuers = [issuer] if Array(issuers).empty? Array(issuers) end |
.active_replay?(entry, now) ⇒ Boolean
149 150 151 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 149 def active_replay?(entry, now) entry && entry[:expires_at] > now end |
.algorithm ⇒ Object
184 185 186 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 184 def algorithm Settings.value(:fleet, :auth, :algorithm, default: 'HS256') end |
.audience ⇒ Object
180 181 182 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 180 def audience Settings.value(:fleet, :auth, :audience, default: 'lex-llm-fleet-worker') end |
.canonical_value(value) ⇒ Object
215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 215 def canonical_value(value) case value when Hash value.each_with_object({}) do |(key, child), result| result[key.to_s] = canonical_value(child) end.sort.to_h when Array value.map { |child| canonical_value(child) } when Symbol value.to_s else value end end |
.clock_skew_seconds ⇒ Object
172 173 174 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 172 def clock_skew_seconds Settings.value(:fleet, :auth, :max_clock_skew_seconds, default: 30).to_i end |
.content_hash(value) ⇒ Object
96 97 98 99 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 96 def content_hash(value) require 'digest' Digest::SHA256.hexdigest(canonical_value(value).to_s) end |
.ensure_not_replayed!(jti) ⇒ Object
125 126 127 128 129 130 131 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 125 def ensure_not_replayed!(jti) @replay_mutex.synchronize do now = Time.now.to_i purge_replay_cache_locked!(now) raise TokenError, 'fleet token replay detected' if active_replay?(@seen_jtis[jti.to_s], now) end end |
.evict_oldest_replay_entries! ⇒ Object
144 145 146 147 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 144 def evict_oldest_replay_entries! sorted = @seen_jtis.each_pair.sort_by { |_jti, entry| entry[:expires_at] } sorted.first(@seen_jtis.size - MAX_REPLAY_ENTRIES).each_key { |jti| @seen_jtis.delete(jti) } end |
.issuer ⇒ Object
176 177 178 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 176 def issuer Settings.value(:fleet, :auth, :issuer, default: 'legion-llm') end |
.jwt_module ⇒ Object
201 202 203 204 205 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 201 def jwt_module return ::Legion::Crypt::JWT if defined?(::Legion::Crypt::JWT) && ::Legion::Crypt::JWT.respond_to?(:verify) raise TokenError, 'Legion::Crypt::JWT.verify unavailable' end |
.mark_replay!(jti) ⇒ Object
112 113 114 115 116 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 112 def mark_replay!(jti) @replay_mutex.synchronize do @seen_jtis[jti.to_s] = replay_entry(:complete) end end |
.purge_replay_cache! ⇒ Object
133 134 135 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 133 def purge_replay_cache! @replay_mutex.synchronize { purge_replay_cache_locked!(Time.now.to_i) } end |
.purge_replay_cache_locked!(now) ⇒ Object
139 140 141 142 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 139 def purge_replay_cache_locked!(now) @seen_jtis.each_pair { |jti, entry| @seen_jtis.delete(jti) unless active_replay?(entry, now) } evict_oldest_replay_entries! if @seen_jtis.size > MAX_REPLAY_ENTRIES end |
.release_replay!(jti) ⇒ Object
118 119 120 121 122 123 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 118 def release_replay!(jti) @replay_mutex.synchronize do entry = @seen_jtis[jti.to_s] @seen_jtis.delete(jti.to_s) if entry.nil? || entry[:state] == :inflight end end |
.replay_entry(state, now = Time.now.to_i) ⇒ Object
153 154 155 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 153 def replay_entry(state, now = Time.now.to_i) { state: state, expires_at: now + replay_ttl_seconds } end |
.replay_ttl_seconds ⇒ Object
157 158 159 160 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 157 def replay_ttl_seconds ttl = Settings.value(:fleet, :auth, :replay_ttl_seconds, default: 600).to_i ttl.positive? ? ttl : 600 end |
.reserve_replay!(jti) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 101 def reserve_replay!(jti) @replay_mutex.synchronize do now = Time.now.to_i purge_replay_cache_locked!(now) existing = @seen_jtis[jti.to_s] raise TokenError, 'fleet token replay detected' if active_replay?(existing, now) @seen_jtis[jti.to_s] = replay_entry(:inflight, now) end end |
.reset_replay_cache! ⇒ Object
45 46 47 48 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 45 def reset_replay_cache! @seen_jtis = Concurrent::Map.new @replay_mutex = Mutex.new end |
.signing_key ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 188 def signing_key if defined?(::Legion::Crypt) && ::Legion::Crypt.respond_to?(:cluster_secret) return ::Legion::Crypt.cluster_secret end raise TokenError, 'no signing key available - Legion::Crypt not initialized' rescue TokenError raise rescue StandardError => e handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.token_validator.signing_key') raise TokenError, "no signing key available: #{e.}" end |
.symbolize_keys(hash) ⇒ Object
207 208 209 210 211 212 213 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 207 def symbolize_keys(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = value end end |
.validate!(token:, envelope:, record_replay: true) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 23 def validate!(token:, envelope:, record_replay: true) raise TokenError, 'fleet token is required' if token.to_s.empty? claims = symbolize_keys(jwt_module.verify( token, verification_key: signing_key, issuer: issuer, algorithm: algorithm, verify_issuer: true )) validate_registered_claims!(claims) validate_request_expiry!(claims) validate_envelope_claims!(claims, symbolize_keys(envelope || {})) record_replay ? reserve_replay!(claims[:jti]) : ensure_not_replayed!(claims[:jti]) claims rescue TokenError raise rescue StandardError => e handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.token_validator.validate') raise TokenError, "fleet token verification failed: #{e.}" end |
.validate_envelope_claims!(claims, envelope) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 81 def validate_envelope_claims!(claims, envelope) SCALAR_CLAIMS.each do |key| expected = canonical_value(envelope[key]) actual = canonical_value(claims[key]) raise TokenError, "fleet token #{key} claim mismatch" unless actual == expected end HASHABLE_CLAIMS.each do |key| hash_key = :"#{key}_hash" expected_hash = content_hash(envelope[key]) actual_hash = claims[hash_key] || content_hash(claims[key]) raise TokenError, "fleet token #{key} hash mismatch" unless actual_hash == expected_hash end end |
.validate_registered_claims!(claims) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 50 def validate_registered_claims!(claims) now = Time.now.to_i raise TokenError, 'fleet token issuer mismatch' unless accepted_issuer?(claims[:iss]) raise TokenError, 'fleet token audience mismatch' unless claims[:aud].to_s == audience if claims[:exp].nil? || claims[:exp].to_i + clock_skew_seconds <= now raise TokenError, 'fleet token expired' end if claims[:nbf].nil? || claims[:nbf].to_i - clock_skew_seconds > now raise TokenError, 'fleet token not yet valid' end raise TokenError, 'fleet token missing jti' if claims[:jti].to_s.empty? end |
.validate_request_expiry!(claims) ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 65 def validate_request_expiry!(claims) expires_at = claims[:expires_at] raise TokenError, 'fleet request expires_at is required' if expires_at.to_s.empty? expires = Time.iso8601(expires_at.to_s) raise TokenError, 'fleet request expired' if expires + clock_skew_seconds <= Time.now.utc rescue ArgumentError raise TokenError, 'fleet request expires_at is invalid' end |