Module: Legion::Extensions::Llm::Fleet::TokenValidator

Defined in:
lib/legion/extensions/llm/fleet/token_validator.rb

Overview

Verifies responder-side fleet JWTs and prevents replay on provider nodes.

Class Method Summary collapse

Class Method Details

.accepted_issuer?(value) ⇒ Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 135

def accepted_issuer?(value)
  accepted_issuers.map(&:to_s).include?(value.to_s)
end

.accepted_issuersObject



139
140
141
142
143
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 139

def accepted_issuers
  issuers = Settings.value(:fleet, :auth, :accepted_issuers, default: [issuer])
  issuers = [issuer] if Array(issuers).empty?
  Array(issuers)
end

.active_replay?(entry, now) ⇒ Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 122

def active_replay?(entry, now)
  entry && entry[:expires_at] > now
end

.algorithmObject



157
158
159
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 157

def algorithm
  Settings.value(:fleet, :auth, :algorithm, default: 'HS256')
end

.audienceObject



153
154
155
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 153

def audience
  Settings.value(:fleet, :auth, :audience, default: 'lex-llm-fleet-worker')
end

.canonical_value(value) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 187

def canonical_value(value)
  case value
  when Hash
    value.each_with_object({}) do |(key, child), result|
      result[key.to_s] = canonical_value(child)
    end.sort.to_h
  when Array
    value.map { |child| canonical_value(child) }
  when Symbol
    value.to_s
  else
    value
  end
end

.clock_skew_secondsObject



145
146
147
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 145

def clock_skew_seconds
  Settings.value(:fleet, :auth, :max_clock_skew_seconds, default: 30).to_i
end

.ensure_not_replayed!(jti) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 106

def ensure_not_replayed!(jti)
  @replay_mutex.synchronize do
    now = Time.now.to_i
    purge_replay_cache_locked!(now)
    raise TokenError, 'fleet token replay detected' if active_replay?(@seen_jtis[jti.to_s], now)
  end
end

.issuerObject



149
150
151
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 149

def issuer
  Settings.value(:fleet, :auth, :issuer, default: 'legion-llm')
end

.jwt_moduleObject

Raises:



173
174
175
176
177
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 173

def jwt_module
  return ::Legion::Crypt::JWT if defined?(::Legion::Crypt::JWT) && ::Legion::Crypt::JWT.respond_to?(:verify)

  raise TokenError, 'Legion::Crypt::JWT.verify unavailable'
end

.mark_replay!(jti) ⇒ Object



93
94
95
96
97
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 93

def mark_replay!(jti)
  @replay_mutex.synchronize do
    @seen_jtis[jti.to_s] = replay_entry(:complete)
  end
end

.purge_replay_cache!Object



114
115
116
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 114

def purge_replay_cache!
  @replay_mutex.synchronize { purge_replay_cache_locked!(Time.now.to_i) }
end

.purge_replay_cache_locked!(now) ⇒ Object



118
119
120
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 118

def purge_replay_cache_locked!(now)
  @seen_jtis.each_pair { |jti, entry| @seen_jtis.delete(jti) unless active_replay?(entry, now) }
end

.release_replay!(jti) ⇒ Object



99
100
101
102
103
104
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 99

def release_replay!(jti)
  @replay_mutex.synchronize do
    entry = @seen_jtis[jti.to_s]
    @seen_jtis.delete(jti.to_s) if entry.nil? || entry[:state] == :inflight
  end
end

.replay_entry(state, now = Time.now.to_i) ⇒ Object



126
127
128
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 126

def replay_entry(state, now = Time.now.to_i)
  { state: state, expires_at: now + replay_ttl_seconds }
end

.replay_ttl_secondsObject



130
131
132
133
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 130

def replay_ttl_seconds
  ttl = Settings.value(:fleet, :auth, :replay_ttl_seconds, default: 600).to_i
  ttl.positive? ? ttl : 600
end

.reserve_replay!(jti) ⇒ Object



82
83
84
85
86
87
88
89
90
91
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 82

def reserve_replay!(jti)
  @replay_mutex.synchronize do
    now = Time.now.to_i
    purge_replay_cache_locked!(now)
    existing = @seen_jtis[jti.to_s]
    raise TokenError, 'fleet token replay detected' if active_replay?(existing, now)

    @seen_jtis[jti.to_s] = replay_entry(:inflight, now)
  end
end

.reset_replay_cache!Object



41
42
43
44
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 41

def reset_replay_cache!
  @seen_jtis = Concurrent::Map.new
  @replay_mutex = Mutex.new
end

.signing_keyObject



161
162
163
164
165
166
167
168
169
170
171
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 161

def signing_key
  if defined?(::Legion::Crypt) && ::Legion::Crypt.respond_to?(:cluster_secret)
    return ::Legion::Crypt.cluster_secret
  end

  raise TokenError, 'no signing key available - Legion::Crypt not initialized'
rescue TokenError
  raise
rescue StandardError => e
  raise TokenError, "no signing key available: #{e.message}"
end

.symbolize_keys(hash) ⇒ Object



179
180
181
182
183
184
185
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 179

def symbolize_keys(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = value
  end
end

.validate!(token:, envelope:, record_replay: true) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 20

def validate!(token:, envelope:, record_replay: true)
  raise TokenError, 'fleet token is required' if token.to_s.empty?

  claims = symbolize_keys(jwt_module.verify(
                            token,
                            verification_key: signing_key,
                            issuer: issuer,
                            algorithm: algorithm,
                            verify_issuer: false
                          ))
  validate_registered_claims!(claims)
  validate_request_expiry!(claims)
  validate_envelope_claims!(claims, symbolize_keys(envelope || {}))
  record_replay ? reserve_replay!(claims[:jti]) : ensure_not_replayed!(claims[:jti])
  claims
rescue TokenError
  raise
rescue StandardError => e
  raise TokenError, "fleet token verification failed: #{e.message}"
end

.validate_envelope_claims!(claims, envelope) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 71

def validate_envelope_claims!(claims, envelope)
  %i[
    request_id correlation_id idempotency_key operation provider provider_instance
    model reply_to message_context params caller trace_context timeout_seconds expires_at
  ].each do |key|
    expected = canonical_value(envelope[key])
    actual = canonical_value(claims[key])
    raise TokenError, "fleet token #{key} claim mismatch" unless actual == expected
  end
end

.validate_registered_claims!(claims) ⇒ Object

Raises:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 46

def validate_registered_claims!(claims)
  now = Time.now.to_i
  raise TokenError, 'fleet token issuer mismatch' unless accepted_issuer?(claims[:iss])
  raise TokenError, 'fleet token audience mismatch' unless claims[:aud].to_s == audience
  if claims[:exp].nil? || claims[:exp].to_i + clock_skew_seconds <= now
    raise TokenError,
          'fleet token expired'
  end
  if claims[:nbf].nil? || claims[:nbf].to_i - clock_skew_seconds > now
    raise TokenError,
          'fleet token not yet valid'
  end
  raise TokenError, 'fleet token missing jti' if claims[:jti].to_s.empty?
end

.validate_request_expiry!(claims) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/legion/extensions/llm/fleet/token_validator.rb', line 61

def validate_request_expiry!(claims)
  expires_at = claims[:expires_at]
  raise TokenError, 'fleet request expires_at is required' if expires_at.to_s.empty?

  expires = Time.iso8601(expires_at.to_s)
  raise TokenError, 'fleet request expired' if expires + clock_skew_seconds <= Time.now.utc
rescue ArgumentError
  raise TokenError, 'fleet request expires_at is invalid'
end