Module: Legion::Extensions::Llm::Fleet::TokenValidator

Extended by:
Logging::Helper
Includes:
Logging::Helper
Defined in:
lib/legion/extensions/llm/fleet/token_validator.rb

Overview

Verifies responder-side fleet JWTs and prevents replay on provider nodes.

Constant Summary collapse

SCALAR_CLAIMS =
%i[
  request_id correlation_id idempotency_key operation provider provider_instance
  model reply_to timeout_seconds expires_at
].freeze
HASHABLE_CLAIMS =
%i[message_context params caller trace_context].freeze
MAX_REPLAY_ENTRIES =
100_000

Class Method Summary collapse

Class Method Details

.accepted_issuer?(value) ⇒ Boolean

Returns:

  • (Boolean)


162
163
164
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 162

def accepted_issuer?(value)
  accepted_issuers.map(&:to_s).include?(value.to_s)
end

.accepted_issuersObject



166
167
168
169
170
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 166

def accepted_issuers
  issuers = Settings.value(:fleet, :auth, :accepted_issuers, default: [issuer])
  issuers = [issuer] if Array(issuers).empty?
  Array(issuers)
end

.active_replay?(entry, now) ⇒ Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 149

def active_replay?(entry, now)
  entry && entry[:expires_at] > now
end

.algorithmObject



184
185
186
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 184

def algorithm
  Settings.value(:fleet, :auth, :algorithm, default: 'HS256')
end

.audienceObject



180
181
182
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 180

def audience
  Settings.value(:fleet, :auth, :audience, default: 'lex-llm-fleet-worker')
end

.canonical_value(value) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 215

def canonical_value(value)
  case value
  when Hash
    value.each_with_object({}) do |(key, child), result|
      result[key.to_s] = canonical_value(child)
    end.sort.to_h
  when Array
    value.map { |child| canonical_value(child) }
  when Symbol
    value.to_s
  else
    value
  end
end

.clock_skew_secondsObject



172
173
174
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 172

def clock_skew_seconds
  Settings.value(:fleet, :auth, :max_clock_skew_seconds, default: 30).to_i
end

.content_hash(value) ⇒ Object



96
97
98
99
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 96

def content_hash(value)
  require 'digest'
  Digest::SHA256.hexdigest(canonical_value(value).to_s)
end

.ensure_not_replayed!(jti) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 125

def ensure_not_replayed!(jti)
  @replay_mutex.synchronize do
    now = Time.now.to_i
    purge_replay_cache_locked!(now)
    raise TokenError, 'fleet token replay detected' if active_replay?(@seen_jtis[jti.to_s], now)
  end
end

.evict_oldest_replay_entries!Object



144
145
146
147
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 144

def evict_oldest_replay_entries!
  sorted = @seen_jtis.each_pair.sort_by { |_jti, entry| entry[:expires_at] }
  sorted.first(@seen_jtis.size - MAX_REPLAY_ENTRIES).each_key { |jti| @seen_jtis.delete(jti) }
end

.issuerObject



176
177
178
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 176

def issuer
  Settings.value(:fleet, :auth, :issuer, default: 'legion-llm')
end

.jwt_moduleObject

Raises:



201
202
203
204
205
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 201

def jwt_module
  return ::Legion::Crypt::JWT if defined?(::Legion::Crypt::JWT) && ::Legion::Crypt::JWT.respond_to?(:verify)

  raise TokenError, 'Legion::Crypt::JWT.verify unavailable'
end

.mark_replay!(jti) ⇒ Object



112
113
114
115
116
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 112

def mark_replay!(jti)
  @replay_mutex.synchronize do
    @seen_jtis[jti.to_s] = replay_entry(:complete)
  end
end

.purge_replay_cache!Object



133
134
135
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 133

def purge_replay_cache!
  @replay_mutex.synchronize { purge_replay_cache_locked!(Time.now.to_i) }
end

.purge_replay_cache_locked!(now) ⇒ Object



139
140
141
142
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 139

def purge_replay_cache_locked!(now)
  @seen_jtis.each_pair { |jti, entry| @seen_jtis.delete(jti) unless active_replay?(entry, now) }
  evict_oldest_replay_entries! if @seen_jtis.size > MAX_REPLAY_ENTRIES
end

.release_replay!(jti) ⇒ Object



118
119
120
121
122
123
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 118

def release_replay!(jti)
  @replay_mutex.synchronize do
    entry = @seen_jtis[jti.to_s]
    @seen_jtis.delete(jti.to_s) if entry.nil? || entry[:state] == :inflight
  end
end

.replay_entry(state, now = Time.now.to_i) ⇒ Object



153
154
155
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 153

def replay_entry(state, now = Time.now.to_i)
  { state: state, expires_at: now + replay_ttl_seconds }
end

.replay_ttl_secondsObject



157
158
159
160
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 157

def replay_ttl_seconds
  ttl = Settings.value(:fleet, :auth, :replay_ttl_seconds, default: 600).to_i
  ttl.positive? ? ttl : 600
end

.reserve_replay!(jti) ⇒ Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 101

def reserve_replay!(jti)
  @replay_mutex.synchronize do
    now = Time.now.to_i
    purge_replay_cache_locked!(now)
    existing = @seen_jtis[jti.to_s]
    raise TokenError, 'fleet token replay detected' if active_replay?(existing, now)

    @seen_jtis[jti.to_s] = replay_entry(:inflight, now)
  end
end

.reset_replay_cache!Object



45
46
47
48
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 45

def reset_replay_cache!
  @seen_jtis = Concurrent::Map.new
  @replay_mutex = Mutex.new
end

.signing_keyObject



188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 188

def signing_key
  if defined?(::Legion::Crypt) && ::Legion::Crypt.respond_to?(:cluster_secret)
    return ::Legion::Crypt.cluster_secret
  end

  raise TokenError, 'no signing key available - Legion::Crypt not initialized'
rescue TokenError
  raise
rescue StandardError => e
  handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.token_validator.signing_key')
  raise TokenError, "no signing key available: #{e.message}"
end

.symbolize_keys(hash) ⇒ Object



207
208
209
210
211
212
213
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 207

def symbolize_keys(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = value
  end
end

.validate!(token:, envelope:, record_replay: true) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 23

def validate!(token:, envelope:, record_replay: true)
  raise TokenError, 'fleet token is required' if token.to_s.empty?

  claims = symbolize_keys(jwt_module.verify(
                            token,
                            verification_key: signing_key,
                            issuer: issuer,
                            algorithm: algorithm,
                            verify_issuer: true
                          ))
  validate_registered_claims!(claims)
  validate_request_expiry!(claims)
  validate_envelope_claims!(claims, symbolize_keys(envelope || {}))
  record_replay ? reserve_replay!(claims[:jti]) : ensure_not_replayed!(claims[:jti])
  claims
rescue TokenError
  raise
rescue StandardError => e
  handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.token_validator.validate')
  raise TokenError, "fleet token verification failed: #{e.message}"
end

.validate_envelope_claims!(claims, envelope) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 81

def validate_envelope_claims!(claims, envelope)
  SCALAR_CLAIMS.each do |key|
    expected = canonical_value(envelope[key])
    actual = canonical_value(claims[key])
    raise TokenError, "fleet token #{key} claim mismatch" unless actual == expected
  end

  HASHABLE_CLAIMS.each do |key|
    hash_key = :"#{key}_hash"
    expected_hash = content_hash(envelope[key])
    actual_hash = claims[hash_key] || content_hash(claims[key])
    raise TokenError, "fleet token #{key} hash mismatch" unless actual_hash == expected_hash
  end
end

.validate_registered_claims!(claims) ⇒ Object

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 50

def validate_registered_claims!(claims)
  now = Time.now.to_i
  raise TokenError, 'fleet token issuer mismatch' unless accepted_issuer?(claims[:iss])
  raise TokenError, 'fleet token audience mismatch' unless claims[:aud].to_s == audience
  if claims[:exp].nil? || claims[:exp].to_i + clock_skew_seconds <= now
    raise TokenError,
          'fleet token expired'
  end
  if claims[:nbf].nil? || claims[:nbf].to_i - clock_skew_seconds > now
    raise TokenError,
          'fleet token not yet valid'
  end
  raise TokenError, 'fleet token missing jti' if claims[:jti].to_s.empty?
end

.validate_request_expiry!(claims) ⇒ Object



65
66
67
68
69
70
71
72
73
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 65

def validate_request_expiry!(claims)
  expires_at = claims[:expires_at]
  raise TokenError, 'fleet request expires_at is required' if expires_at.to_s.empty?

  expires = Time.iso8601(expires_at.to_s)
  raise TokenError, 'fleet request expired' if expires + clock_skew_seconds <= Time.now.utc
rescue ArgumentError
  raise TokenError, 'fleet request expires_at is invalid'
end