Module: Legion::Extensions::Llm::Fleet::TokenValidator

Extended by:
Logging::Helper
Includes:
Logging::Helper
Defined in:
lib/legion/extensions/llm/fleet/token_validator.rb

Overview

Verifies responder-side fleet JWTs and prevents replay on provider nodes.

Class Method Summary collapse

Class Method Details

.accepted_issuer?(value) ⇒ Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 139

def accepted_issuer?(value)
  accepted_issuers.map(&:to_s).include?(value.to_s)
end

.accepted_issuersObject



143
144
145
146
147
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 143

def accepted_issuers
  issuers = Settings.value(:fleet, :auth, :accepted_issuers, default: [issuer])
  issuers = [issuer] if Array(issuers).empty?
  Array(issuers)
end

.active_replay?(entry, now) ⇒ Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 126

def active_replay?(entry, now)
  entry && entry[:expires_at] > now
end

.algorithmObject



161
162
163
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 161

def algorithm
  Settings.value(:fleet, :auth, :algorithm, default: 'HS256')
end

.audienceObject



157
158
159
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 157

def audience
  Settings.value(:fleet, :auth, :audience, default: 'lex-llm-fleet-worker')
end

.canonical_value(value) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 192

def canonical_value(value)
  case value
  when Hash
    value.each_with_object({}) do |(key, child), result|
      result[key.to_s] = canonical_value(child)
    end.sort.to_h
  when Array
    value.map { |child| canonical_value(child) }
  when Symbol
    value.to_s
  else
    value
  end
end

.clock_skew_secondsObject



149
150
151
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 149

def clock_skew_seconds
  Settings.value(:fleet, :auth, :max_clock_skew_seconds, default: 30).to_i
end

.ensure_not_replayed!(jti) ⇒ Object



110
111
112
113
114
115
116
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 110

def ensure_not_replayed!(jti)
  @replay_mutex.synchronize do
    now = Time.now.to_i
    purge_replay_cache_locked!(now)
    raise TokenError, 'fleet token replay detected' if active_replay?(@seen_jtis[jti.to_s], now)
  end
end

.issuerObject



153
154
155
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 153

def issuer
  Settings.value(:fleet, :auth, :issuer, default: 'legion-llm')
end

.jwt_moduleObject

Raises:



178
179
180
181
182
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 178

def jwt_module
  return ::Legion::Crypt::JWT if defined?(::Legion::Crypt::JWT) && ::Legion::Crypt::JWT.respond_to?(:verify)

  raise TokenError, 'Legion::Crypt::JWT.verify unavailable'
end

.mark_replay!(jti) ⇒ Object



97
98
99
100
101
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 97

def mark_replay!(jti)
  @replay_mutex.synchronize do
    @seen_jtis[jti.to_s] = replay_entry(:complete)
  end
end

.purge_replay_cache!Object



118
119
120
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 118

def purge_replay_cache!
  @replay_mutex.synchronize { purge_replay_cache_locked!(Time.now.to_i) }
end

.purge_replay_cache_locked!(now) ⇒ Object



122
123
124
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 122

def purge_replay_cache_locked!(now)
  @seen_jtis.each_pair { |jti, entry| @seen_jtis.delete(jti) unless active_replay?(entry, now) }
end

.release_replay!(jti) ⇒ Object



103
104
105
106
107
108
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 103

def release_replay!(jti)
  @replay_mutex.synchronize do
    entry = @seen_jtis[jti.to_s]
    @seen_jtis.delete(jti.to_s) if entry.nil? || entry[:state] == :inflight
  end
end

.replay_entry(state, now = Time.now.to_i) ⇒ Object



130
131
132
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 130

def replay_entry(state, now = Time.now.to_i)
  { state: state, expires_at: now + replay_ttl_seconds }
end

.replay_ttl_secondsObject



134
135
136
137
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 134

def replay_ttl_seconds
  ttl = Settings.value(:fleet, :auth, :replay_ttl_seconds, default: 600).to_i
  ttl.positive? ? ttl : 600
end

.reserve_replay!(jti) ⇒ Object



86
87
88
89
90
91
92
93
94
95
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 86

def reserve_replay!(jti)
  @replay_mutex.synchronize do
    now = Time.now.to_i
    purge_replay_cache_locked!(now)
    existing = @seen_jtis[jti.to_s]
    raise TokenError, 'fleet token replay detected' if active_replay?(existing, now)

    @seen_jtis[jti.to_s] = replay_entry(:inflight, now)
  end
end

.reset_replay_cache!Object



45
46
47
48
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 45

def reset_replay_cache!
  @seen_jtis = Concurrent::Map.new
  @replay_mutex = Mutex.new
end

.signing_keyObject



165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 165

def signing_key
  if defined?(::Legion::Crypt) && ::Legion::Crypt.respond_to?(:cluster_secret)
    return ::Legion::Crypt.cluster_secret
  end

  raise TokenError, 'no signing key available - Legion::Crypt not initialized'
rescue TokenError
  raise
rescue StandardError => e
  handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.token_validator.signing_key')
  raise TokenError, "no signing key available: #{e.message}"
end

.symbolize_keys(hash) ⇒ Object



184
185
186
187
188
189
190
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 184

def symbolize_keys(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = value
  end
end

.validate!(token:, envelope:, record_replay: true) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 23

def validate!(token:, envelope:, record_replay: true)
  raise TokenError, 'fleet token is required' if token.to_s.empty?

  claims = symbolize_keys(jwt_module.verify(
                            token,
                            verification_key: signing_key,
                            issuer: issuer,
                            algorithm: algorithm,
                            verify_issuer: false
                          ))
  validate_registered_claims!(claims)
  validate_request_expiry!(claims)
  validate_envelope_claims!(claims, symbolize_keys(envelope || {}))
  record_replay ? reserve_replay!(claims[:jti]) : ensure_not_replayed!(claims[:jti])
  claims
rescue TokenError
  raise
rescue StandardError => e
  handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.token_validator.validate')
  raise TokenError, "fleet token verification failed: #{e.message}"
end

.validate_envelope_claims!(claims, envelope) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 75

def validate_envelope_claims!(claims, envelope)
  %i[
    request_id correlation_id idempotency_key operation provider provider_instance
    model reply_to message_context params caller trace_context timeout_seconds expires_at
  ].each do |key|
    expected = canonical_value(envelope[key])
    actual = canonical_value(claims[key])
    raise TokenError, "fleet token #{key} claim mismatch" unless actual == expected
  end
end

.validate_registered_claims!(claims) ⇒ Object

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 50

def validate_registered_claims!(claims)
  now = Time.now.to_i
  raise TokenError, 'fleet token issuer mismatch' unless accepted_issuer?(claims[:iss])
  raise TokenError, 'fleet token audience mismatch' unless claims[:aud].to_s == audience
  if claims[:exp].nil? || claims[:exp].to_i + clock_skew_seconds <= now
    raise TokenError,
          'fleet token expired'
  end
  if claims[:nbf].nil? || claims[:nbf].to_i - clock_skew_seconds > now
    raise TokenError,
          'fleet token not yet valid'
  end
  raise TokenError, 'fleet token missing jti' if claims[:jti].to_s.empty?
end

.validate_request_expiry!(claims) ⇒ Object



65
66
67
68
69
70
71
72
73
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 65

def validate_request_expiry!(claims)
  expires_at = claims[:expires_at]
  raise TokenError, 'fleet request expires_at is required' if expires_at.to_s.empty?

  expires = Time.iso8601(expires_at.to_s)
  raise TokenError, 'fleet request expired' if expires + clock_skew_seconds <= Time.now.utc
rescue ArgumentError
  raise TokenError, 'fleet request expires_at is invalid'
end