Module: Legion::Extensions::Llm::Fleet::TokenValidator
- Defined in:
- lib/legion/extensions/llm/fleet/token_validator.rb
Overview
Verifies responder-side fleet JWTs and prevents replay on provider nodes.
Class Method Summary collapse
- .accepted_issuer?(value) ⇒ Boolean
- .accepted_issuers ⇒ Object
- .active_replay?(entry, now) ⇒ Boolean
- .algorithm ⇒ Object
- .audience ⇒ Object
- .canonical_value(value) ⇒ Object
- .clock_skew_seconds ⇒ Object
- .ensure_not_replayed!(jti) ⇒ Object
- .issuer ⇒ Object
- .jwt_module ⇒ Object
- .mark_replay!(jti) ⇒ Object
- .purge_replay_cache! ⇒ Object
- .purge_replay_cache_locked!(now) ⇒ Object
- .release_replay!(jti) ⇒ Object
- .replay_entry(state, now = Time.now.to_i) ⇒ Object
- .replay_ttl_seconds ⇒ Object
- .reserve_replay!(jti) ⇒ Object
- .reset_replay_cache! ⇒ Object
- .signing_key ⇒ Object
- .symbolize_keys(hash) ⇒ Object
- .validate!(token:, envelope:, record_replay: true) ⇒ Object
- .validate_envelope_claims!(claims, envelope) ⇒ Object
- .validate_registered_claims!(claims) ⇒ Object
- .validate_request_expiry!(claims) ⇒ Object
Class Method Details
.accepted_issuer?(value) ⇒ Boolean
135 136 137 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 135 def accepted_issuer?(value) accepted_issuers.map(&:to_s).include?(value.to_s) end |
.accepted_issuers ⇒ Object
139 140 141 142 143 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 139 def accepted_issuers issuers = Settings.value(:fleet, :auth, :accepted_issuers, default: [issuer]) issuers = [issuer] if Array(issuers).empty? Array(issuers) end |
.active_replay?(entry, now) ⇒ Boolean
122 123 124 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 122 def active_replay?(entry, now) entry && entry[:expires_at] > now end |
.algorithm ⇒ Object
157 158 159 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 157 def algorithm Settings.value(:fleet, :auth, :algorithm, default: 'HS256') end |
.audience ⇒ Object
153 154 155 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 153 def audience Settings.value(:fleet, :auth, :audience, default: 'lex-llm-fleet-worker') end |
.canonical_value(value) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 187 def canonical_value(value) case value when Hash value.each_with_object({}) do |(key, child), result| result[key.to_s] = canonical_value(child) end.sort.to_h when Array value.map { |child| canonical_value(child) } when Symbol value.to_s else value end end |
.clock_skew_seconds ⇒ Object
145 146 147 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 145 def clock_skew_seconds Settings.value(:fleet, :auth, :max_clock_skew_seconds, default: 30).to_i end |
.ensure_not_replayed!(jti) ⇒ Object
106 107 108 109 110 111 112 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 106 def ensure_not_replayed!(jti) @replay_mutex.synchronize do now = Time.now.to_i purge_replay_cache_locked!(now) raise TokenError, 'fleet token replay detected' if active_replay?(@seen_jtis[jti.to_s], now) end end |
.issuer ⇒ Object
149 150 151 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 149 def issuer Settings.value(:fleet, :auth, :issuer, default: 'legion-llm') end |
.jwt_module ⇒ Object
173 174 175 176 177 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 173 def jwt_module return ::Legion::Crypt::JWT if defined?(::Legion::Crypt::JWT) && ::Legion::Crypt::JWT.respond_to?(:verify) raise TokenError, 'Legion::Crypt::JWT.verify unavailable' end |
.mark_replay!(jti) ⇒ Object
93 94 95 96 97 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 93 def mark_replay!(jti) @replay_mutex.synchronize do @seen_jtis[jti.to_s] = replay_entry(:complete) end end |
.purge_replay_cache! ⇒ Object
114 115 116 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 114 def purge_replay_cache! @replay_mutex.synchronize { purge_replay_cache_locked!(Time.now.to_i) } end |
.purge_replay_cache_locked!(now) ⇒ Object
118 119 120 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 118 def purge_replay_cache_locked!(now) @seen_jtis.each_pair { |jti, entry| @seen_jtis.delete(jti) unless active_replay?(entry, now) } end |
.release_replay!(jti) ⇒ Object
99 100 101 102 103 104 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 99 def release_replay!(jti) @replay_mutex.synchronize do entry = @seen_jtis[jti.to_s] @seen_jtis.delete(jti.to_s) if entry.nil? || entry[:state] == :inflight end end |
.replay_entry(state, now = Time.now.to_i) ⇒ Object
126 127 128 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 126 def replay_entry(state, now = Time.now.to_i) { state: state, expires_at: now + replay_ttl_seconds } end |
.replay_ttl_seconds ⇒ Object
130 131 132 133 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 130 def replay_ttl_seconds ttl = Settings.value(:fleet, :auth, :replay_ttl_seconds, default: 600).to_i ttl.positive? ? ttl : 600 end |
.reserve_replay!(jti) ⇒ Object
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 82 def reserve_replay!(jti) @replay_mutex.synchronize do now = Time.now.to_i purge_replay_cache_locked!(now) existing = @seen_jtis[jti.to_s] raise TokenError, 'fleet token replay detected' if active_replay?(existing, now) @seen_jtis[jti.to_s] = replay_entry(:inflight, now) end end |
.reset_replay_cache! ⇒ Object
41 42 43 44 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 41 def reset_replay_cache! @seen_jtis = Concurrent::Map.new @replay_mutex = Mutex.new end |
.signing_key ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 161 def signing_key if defined?(::Legion::Crypt) && ::Legion::Crypt.respond_to?(:cluster_secret) return ::Legion::Crypt.cluster_secret end raise TokenError, 'no signing key available - Legion::Crypt not initialized' rescue TokenError raise rescue StandardError => e raise TokenError, "no signing key available: #{e.}" end |
.symbolize_keys(hash) ⇒ Object
179 180 181 182 183 184 185 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 179 def symbolize_keys(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = value end end |
.validate!(token:, envelope:, record_replay: true) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 20 def validate!(token:, envelope:, record_replay: true) raise TokenError, 'fleet token is required' if token.to_s.empty? claims = symbolize_keys(jwt_module.verify( token, verification_key: signing_key, issuer: issuer, algorithm: algorithm, verify_issuer: false )) validate_registered_claims!(claims) validate_request_expiry!(claims) validate_envelope_claims!(claims, symbolize_keys(envelope || {})) record_replay ? reserve_replay!(claims[:jti]) : ensure_not_replayed!(claims[:jti]) claims rescue TokenError raise rescue StandardError => e raise TokenError, "fleet token verification failed: #{e.}" end |
.validate_envelope_claims!(claims, envelope) ⇒ Object
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 71 def validate_envelope_claims!(claims, envelope) %i[ request_id correlation_id idempotency_key operation provider provider_instance model reply_to message_context params caller trace_context timeout_seconds expires_at ].each do |key| expected = canonical_value(envelope[key]) actual = canonical_value(claims[key]) raise TokenError, "fleet token #{key} claim mismatch" unless actual == expected end end |
.validate_registered_claims!(claims) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 46 def validate_registered_claims!(claims) now = Time.now.to_i raise TokenError, 'fleet token issuer mismatch' unless accepted_issuer?(claims[:iss]) raise TokenError, 'fleet token audience mismatch' unless claims[:aud].to_s == audience if claims[:exp].nil? || claims[:exp].to_i + clock_skew_seconds <= now raise TokenError, 'fleet token expired' end if claims[:nbf].nil? || claims[:nbf].to_i - clock_skew_seconds > now raise TokenError, 'fleet token not yet valid' end raise TokenError, 'fleet token missing jti' if claims[:jti].to_s.empty? end |
.validate_request_expiry!(claims) ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 61 def validate_request_expiry!(claims) expires_at = claims[:expires_at] raise TokenError, 'fleet request expires_at is required' if expires_at.to_s.empty? expires = Time.iso8601(expires_at.to_s) raise TokenError, 'fleet request expired' if expires + clock_skew_seconds <= Time.now.utc rescue ArgumentError raise TokenError, 'fleet request expires_at is invalid' end |