Module: Legion::Extensions::Llm::Fleet::WorkerExecution

Defined in:
lib/legion/extensions/llm/fleet/worker_execution.rb

Overview

Applies responder-side policy and dispatches a fleet request to a local lex-llm provider.

Defined Under Namespace

Classes: PolicyError

Class Method Summary collapse

Class Method Details

.auth_required?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 136

def auth_required?
  Settings.value(:fleet, :auth, :require_signed_token, default: true) != false
end

.call(envelope:, provider:) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 21

def call(envelope:, provider:)
  claims = nil
  idempotency_key = nil
  claims = validate_identity!(envelope)
  validate_policy!(envelope)
  idempotency_key = validate_idempotency!(envelope)
  response = dispatch_local_provider!(envelope: envelope, provider: provider)
  mark_idempotency_success!(idempotency_key) if idempotency_key
  TokenValidator.mark_replay!(claims[:jti]) if claims.is_a?(Hash)
  response
rescue TokenError => e
  release_idempotency!(idempotency_key) if idempotency_key
  release_replay!(claims)
  raise PolicyError, e.message
rescue StandardError
  release_idempotency!(idempotency_key) if idempotency_key
  release_replay!(claims)
  raise
end

.dispatch_local_provider!(envelope:, provider:) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 63

def dispatch_local_provider!(envelope:, provider:)
  provider = provider.call(envelope) if provider.respond_to?(:call) && !provider.respond_to?(:chat)
  operation = envelope_value(envelope, :operation).to_sym
  params = normalize_hash(envelope_value(envelope, :params) || {})
  model = envelope_value(envelope, :model)

  case operation
  when :chat
    provider.chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages))
  when :stream
    provider.stream_chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages))
  when :embed
    provider.embed(text: params[:text], model: model, **except(params, :text))
  when :count_tokens
    provider.count_tokens(messages: params.fetch(:messages, []), model: model, **except(params, :messages))
  else
    raise PolicyError, "unsupported fleet operation: #{operation}"
  end
end

.envelope_value(envelope, key) ⇒ Object



140
141
142
143
144
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 140

def envelope_value(envelope, key)
  return nil unless envelope.respond_to?(:key?)

  envelope[key] || envelope[key.to_s]
end

.except(hash, *keys) ⇒ Object



154
155
156
157
158
159
160
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 154

def except(hash, *keys)
  exclusions = keys.map(&:to_sym)
  hash.each_with_object({}) do |(key, value), result|
    normalized_key = key.respond_to?(:to_sym) ? key.to_sym : key
    result[normalized_key] = value unless exclusions.include?(normalized_key)
  end
end

.idempotency_ttl_secondsObject



123
124
125
126
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 123

def idempotency_ttl_seconds
  ttl = responder_setting(:idempotency_ttl_seconds, default: 600).to_i
  ttl.positive? ? ttl : 600
end

.mark_idempotency_success!(key) ⇒ Object



88
89
90
91
92
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 88

def mark_idempotency_success!(key)
  @idempotency_mutex.synchronize do
    @idempotency_keys[key.to_s] = { state: :complete, expires_at: Time.now.to_i + idempotency_ttl_seconds }
  end
end

.normalize_hash(hash) ⇒ Object



146
147
148
149
150
151
152
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 146

def normalize_hash(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = value
  end
end

.purge_idempotency_cache!Object



104
105
106
107
108
109
110
111
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 104

def purge_idempotency_cache!
  @idempotency_mutex.synchronize do
    now = Time.now.to_i
    @idempotency_keys.each_pair do |key, entry|
      @idempotency_keys.delete(key) if entry[:expires_at] <= now
    end
  end
end

.release_idempotency!(key) ⇒ Object



94
95
96
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 94

def release_idempotency!(key)
  @idempotency_mutex.synchronize { @idempotency_keys.delete(key.to_s) }
end

.release_replay!(claims) ⇒ Object



98
99
100
101
102
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 98

def release_replay!(claims)
  return unless claims.is_a?(Hash) && claims[:jti]

  TokenValidator.release_replay!(claims[:jti])
end

.reserve_idempotency_key!(key) ⇒ Object



113
114
115
116
117
118
119
120
121
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 113

def reserve_idempotency_key!(key)
  @idempotency_mutex.synchronize do
    now = Time.now.to_i
    existing = @idempotency_keys[key]
    raise PolicyError, 'duplicate fleet idempotency key' if existing && existing[:expires_at] > now

    @idempotency_keys[key] = { state: :inflight, expires_at: now + idempotency_ttl_seconds }
  end
end

.reset_idempotency_cache!Object



83
84
85
86
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 83

def reset_idempotency_cache!
  @idempotency_keys = Concurrent::Map.new
  @idempotency_mutex = Mutex.new
end

.responder_setting(key, default:) ⇒ Object



128
129
130
131
132
133
134
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 128

def responder_setting(key, default:)
  value = Settings.value(:fleet, :responder, key, default: nil)
  return auth_required? if key == :require_auth && value.nil?
  return default if value.nil?

  value
end

.validate_idempotency!(envelope) ⇒ Object

Raises:



53
54
55
56
57
58
59
60
61
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 53

def validate_idempotency!(envelope)
  return nil unless responder_setting(:require_idempotency, default: true)

  key = envelope_value(envelope, :idempotency_key)
  raise PolicyError, 'fleet idempotency_key is required' if key.to_s.empty?

  reserve_idempotency_key!(key.to_s)
  key.to_s
end

.validate_identity!(envelope) ⇒ Object



41
42
43
44
45
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 41

def validate_identity!(envelope)
  return true unless responder_setting(:require_auth, default: true)

  TokenValidator.validate!(token: envelope_value(envelope, :signed_token), envelope: envelope)
end

.validate_policy!(_envelope) ⇒ Object

Raises:



47
48
49
50
51
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 47

def validate_policy!(_envelope)
  return true unless responder_setting(:require_policy, default: false)

  raise PolicyError, 'fleet responder policy enforcement unavailable'
end