Module: Legion::Extensions::Llm::Fleet::WorkerExecution

Extended by:
Logging::Helper
Includes:
Logging::Helper
Defined in:
lib/legion/extensions/llm/fleet/worker_execution.rb

Overview

Applies responder-side policy and dispatches a fleet request to a local lex-llm provider.

Defined Under Namespace

Classes: PolicyError

Class Method Summary collapse

Class Method Details

.auth_required?Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 150

def auth_required?
  Settings.value(:fleet, :auth, :require_signed_token, default: true) != false
end

.call(envelope:, provider:) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 24

def call(envelope:, provider:)
  claims = nil
  idempotency_key = nil
  claims = validate_identity!(envelope)
  validate_policy!(envelope)
  idempotency_key = validate_idempotency!(envelope)
  response = dispatch_local_provider!(envelope: envelope, provider: provider)
  mark_idempotency_success!(idempotency_key) if idempotency_key
  TokenValidator.mark_replay!(claims[:jti]) if claims.is_a?(Hash)
  response
rescue TokenError => e
  handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.worker_execution.identity')
  release_idempotency!(idempotency_key) if idempotency_key
  release_replay!(claims)
  raise PolicyError, e.message
rescue StandardError => e
  handle_exception(e, level: :warn, handled: false, operation: 'llm.fleet.worker_execution.call')
  release_idempotency!(idempotency_key) if idempotency_key
  release_replay!(claims)
  raise
end

.dispatch_local_provider!(envelope:, provider:) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 68

def dispatch_local_provider!(envelope:, provider:)
  provider = provider.call(envelope) if provider.respond_to?(:call) && !provider.respond_to?(:chat)
  operation = envelope_value(envelope, :operation).to_sym
  params = normalize_hash(envelope_value(envelope, :params) || {})
  params = unpack_legacy_options(params)
  model = envelope_value(envelope, :model)

  case operation
  when :chat
    provider.chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages))
  when :stream
    provider.stream_chat(messages: params.fetch(:messages, []), model: model, **except(params, :messages))
  when :embed
    provider.embed(text: params[:text], model: model, **except(params, :text))
  when :count_tokens
    provider.count_tokens(messages: params.fetch(:messages, []), model: model, **except(params, :messages))
  else
    raise PolicyError, "unsupported fleet operation: #{operation}"
  end
end

.envelope_value(envelope, key) ⇒ Object



154
155
156
157
158
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 154

def envelope_value(envelope, key)
  return nil unless envelope.respond_to?(:key?)

  envelope[key] || envelope[key.to_s]
end

.except(hash, *keys) ⇒ Object



168
169
170
171
172
173
174
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 168

def except(hash, *keys)
  exclusions = keys.map(&:to_sym)
  hash.each_with_object({}) do |(key, value), result|
    normalized_key = key.respond_to?(:to_sym) ? key.to_sym : key
    result[normalized_key] = value unless exclusions.include?(normalized_key)
  end
end

.idempotency_ttl_secondsObject



137
138
139
140
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 137

def idempotency_ttl_seconds
  ttl = responder_setting(:idempotency_ttl_seconds, default: 600).to_i
  ttl.positive? ? ttl : 600
end

.mark_idempotency_success!(key) ⇒ Object



102
103
104
105
106
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 102

def mark_idempotency_success!(key)
  @idempotency_mutex.synchronize do
    @idempotency_keys[key.to_s] = { state: :complete, expires_at: Time.now.to_i + idempotency_ttl_seconds }
  end
end

.normalize_hash(hash) ⇒ Object



160
161
162
163
164
165
166
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 160

def normalize_hash(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = value
  end
end

.purge_idempotency_cache!Object



118
119
120
121
122
123
124
125
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 118

def purge_idempotency_cache!
  @idempotency_mutex.synchronize do
    now = Time.now.to_i
    @idempotency_keys.each_pair do |key, entry|
      @idempotency_keys.delete(key) if entry[:expires_at] <= now
    end
  end
end

.release_idempotency!(key) ⇒ Object



108
109
110
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 108

def release_idempotency!(key)
  @idempotency_mutex.synchronize { @idempotency_keys.delete(key.to_s) }
end

.release_replay!(claims) ⇒ Object



112
113
114
115
116
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 112

def release_replay!(claims)
  return unless claims.is_a?(Hash) && claims[:jti]

  TokenValidator.release_replay!(claims[:jti])
end

.reserve_idempotency_key!(key) ⇒ Object



127
128
129
130
131
132
133
134
135
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 127

def reserve_idempotency_key!(key)
  @idempotency_mutex.synchronize do
    now = Time.now.to_i
    existing = @idempotency_keys[key]
    raise PolicyError, 'duplicate fleet idempotency key' if existing && existing[:expires_at] > now

    @idempotency_keys[key] = { state: :inflight, expires_at: now + idempotency_ttl_seconds }
  end
end

.reset_idempotency_cache!Object



97
98
99
100
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 97

def reset_idempotency_cache!
  @idempotency_keys = Concurrent::Map.new
  @idempotency_mutex = Mutex.new
end

.responder_setting(key, default:) ⇒ Object



142
143
144
145
146
147
148
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 142

def responder_setting(key, default:)
  value = Settings.value(:fleet, :responder, key, default: nil)
  return auth_required? if key == :require_auth && value.nil?
  return default if value.nil?

  value
end

.unpack_legacy_options(params) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 89

def unpack_legacy_options(params)
  options = params.delete(:options)
  return params unless options.is_a?(Hash)

  normalize_hash(options).each { |key, value| params[key] = value unless params.key?(key) }
  params
end

.validate_idempotency!(envelope) ⇒ Object

Raises:



58
59
60
61
62
63
64
65
66
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 58

def validate_idempotency!(envelope)
  return nil unless responder_setting(:require_idempotency, default: true)

  key = envelope_value(envelope, :idempotency_key)
  raise PolicyError, 'fleet idempotency_key is required' if key.to_s.empty?

  reserve_idempotency_key!(key.to_s)
  key.to_s
end

.validate_identity!(envelope) ⇒ Object



46
47
48
49
50
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 46

def validate_identity!(envelope)
  return true unless responder_setting(:require_auth, default: true)

  TokenValidator.validate!(token: envelope_value(envelope, :signed_token), envelope: envelope)
end

.validate_policy!(_envelope) ⇒ Object

Raises:



52
53
54
55
56
# File 'lib/legion/extensions/llm/fleet/worker_execution.rb', line 52

def validate_policy!(_envelope)
  return true unless responder_setting(:require_policy, default: false)

  raise PolicyError, 'fleet responder policy enforcement unavailable'
end