Module: Legion::LLM::Metering

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/metering.rb,
lib/legion/llm/metering/tokens.rb,
lib/legion/llm/metering/tracker.rb,
lib/legion/llm/metering/estimator.rb

Defined Under Namespace

Modules: Pricing, Recorder, Tokens

Constant Summary collapse

SPOOL_DIR =
File.expand_path('~/.legionio/data/spool/metering')
SPOOL_FILE =
File.join(SPOOL_DIR, 'events.jsonl').freeze
SPOOL_MUTEX =
Mutex.new

Class Method Summary collapse

Class Method Details

.attributed_event(event) ⇒ Object



47
48
49
50
51
52
# File 'lib/legion/llm/metering.rb', line 47

def attributed_event(event)
  source = event.is_a?(Hash) ? event.dup : {}
  source[:identity] ||= Legion::LLM::PublisherIdentity.current
  source[:caller] ||= Legion::LLM::PublisherIdentity.caller_hash
  source
end

.const_missing(name) ⇒ Object

Backward-compat: resolve old Legion::LLM::Metering::Exchange, ::Event



285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/legion/llm/metering.rb', line 285

def self.const_missing(name)
  case name
  when :Exchange
    require_relative 'transport/exchanges/metering'
    Transport::Exchanges::Metering
  when :Event
    require_relative 'transport/messages/metering_event'
    Transport::Messages::MeteringEvent
  else
    super
  end
end

.decrypt_spool_line(line) ⇒ Object



214
215
216
217
218
219
220
221
# File 'lib/legion/llm/metering.rb', line 214

def decrypt_spool_line(line)
  return line unless defined?(Legion::Crypt) && Legion::Crypt.respond_to?(:decrypt)
  return line if line.start_with?('{')

  Legion::Crypt.decrypt(line)
rescue StandardError
  line
end

.emit(event) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/legion/llm/metering.rb', line 29

def emit(event)
  event = attributed_event(event)
  event_class = metering_event_class if transport_connected?

  if event_class
    event_class.new(**event).publish
    log.info("[llm][metering] published provider=#{event[:provider]} model=#{event[:model_id]}")
    :published
  else
    spool_event(event)
    log.info("[llm][metering] spooled provider=#{event[:provider]} model=#{event[:model_id]} reason=transport_unavailable")
    :spooled
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.emit')
  :dropped
end

.encrypt_spool?Boolean

Returns:

  • (Boolean)


206
207
208
209
210
211
212
# File 'lib/legion/llm/metering.rb', line 206

def encrypt_spool?
  defined?(Legion::Crypt) &&
    Legion::Crypt.respond_to?(:encrypt) &&
    Legion::Settings.dig(:llm, :compliance, :encrypt_spool) == true
rescue StandardError
  false
end

.enforce_max_eventsObject



249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/legion/llm/metering.rb', line 249

def enforce_max_events
  path = spool_file_path
  return unless File.exist?(path)

  max = spool_settings[:max_events] || 10_000
  lines = File.readlines(path, chomp: true)
  return if lines.size < max

  # Drop oldest events to make room
  trimmed = lines.last(max - 1)
  File.write(path, trimmed.map { |l| "#{l}\n" }.join)
  log.debug("[llm][metering] enforce_max_events trimmed=#{lines.size - trimmed.size} max=#{max}")
end

.ensure_spool_dirObject



263
264
265
# File 'lib/legion/llm/metering.rb', line 263

def ensure_spool_dir
  FileUtils.mkdir_p(spool_dir_path)
end

.extract_dispatch_path(response) ⇒ Object



336
337
338
339
340
# File 'lib/legion/llm/metering.rb', line 336

def extract_dispatch_path(response)
  return nil unless response.is_a?(Hash)

  response[:dispatch_path] || response[:tier] || response.dig(:routing, :tier)
end

.extract_error_category(response) ⇒ Object



312
313
314
315
316
# File 'lib/legion/llm/metering.rb', line 312

def extract_error_category(response)
  return nil unless response.is_a?(Hash)

  response[:error_category] || response.dig(:error, :category)
end

.extract_error_code(response) ⇒ Object



318
319
320
321
322
# File 'lib/legion/llm/metering.rb', line 318

def extract_error_code(response)
  return nil unless response.is_a?(Hash)

  response[:error_code] || response.dig(:error, :code)
end

.extract_error_message(response) ⇒ Object



324
325
326
327
328
# File 'lib/legion/llm/metering.rb', line 324

def extract_error_message(response)
  return nil unless response.is_a?(Hash)

  response[:error_message] || response.dig(:error, :message)
end

.extract_finish_reason(response) ⇒ Object

— Extractor helpers for after_chat hook —



300
301
302
303
304
# File 'lib/legion/llm/metering.rb', line 300

def extract_finish_reason(response)
  return nil unless response.is_a?(Hash)

  response[:finish_reason] || response.dig(:stop, :reason) || response.dig(:choices, 0, :finish_reason)
end

.extract_hash_value(hash, key) ⇒ Object



178
179
180
181
182
183
184
185
# File 'lib/legion/llm/metering.rb', line 178

def extract_hash_value(hash, key)
  return nil unless hash.respond_to?(:key?)

  string_key = key.to_s
  return hash[string_key] if hash.key?(string_key)

  hash[key] if hash.key?(key)
end

.extract_latency_ms(response) ⇒ Object



348
349
350
351
352
# File 'lib/legion/llm/metering.rb', line 348

def extract_latency_ms(response)
  return nil unless response.is_a?(Hash)

  (response[:latency_ms] || response.dig(:timing, :latency_ms) || 0).to_i
end

.extract_model(response) ⇒ Object



172
173
174
175
176
# File 'lib/legion/llm/metering.rb', line 172

def extract_model(response)
  return nil unless response.is_a?(Hash)

  extract_hash_value(extract_hash_value(response, :meta), :model) || extract_hash_value(response, :model)
end

.extract_provider(response) ⇒ Object



166
167
168
169
170
# File 'lib/legion/llm/metering.rb', line 166

def extract_provider(response)
  return nil unless response.is_a?(Hash)

  extract_hash_value(extract_hash_value(response, :meta), :provider) || extract_hash_value(response, :provider)
end

.extract_provider_instance(response) ⇒ Object



330
331
332
333
334
# File 'lib/legion/llm/metering.rb', line 330

def extract_provider_instance(response)
  return nil unless response.is_a?(Hash)

  response[:provider_instance] || response.dig(:routing, :provider_instance) || response[:instance]
end

.extract_route_attempts(response) ⇒ Object



342
343
344
345
346
# File 'lib/legion/llm/metering.rb', line 342

def extract_route_attempts(response)
  return nil unless response.is_a?(Hash)

  (response[:route_attempts] || 0).to_i
end

.extract_status(response) ⇒ Object



360
361
362
363
364
# File 'lib/legion/llm/metering.rb', line 360

def extract_status(response)
  return 'success' unless response.is_a?(Hash)

  response[:error] ? 'failure' : 'success'
end

.extract_tier(response) ⇒ Object



306
307
308
309
310
# File 'lib/legion/llm/metering.rb', line 306

def extract_tier(response)
  return nil unless response.is_a?(Hash)

  response[:tier] || response.dig(:routing, :tier)
end

.extract_usage(response) ⇒ Object



156
157
158
159
160
161
162
163
164
# File 'lib/legion/llm/metering.rb', line 156

def extract_usage(response)
  return { input_tokens: 0, output_tokens: 0 } unless response.is_a?(Hash)

  usage = extract_hash_value(response, :usage) || {}
  {
    input_tokens:  extract_hash_value(usage, :input_tokens) || extract_hash_value(usage, :prompt_tokens) || 0,
    output_tokens: extract_hash_value(usage, :output_tokens) || extract_hash_value(usage, :completion_tokens) || 0
  }
end

.extract_wall_clock_ms(response) ⇒ Object



354
355
356
357
358
# File 'lib/legion/llm/metering.rb', line 354

def extract_wall_clock_ms(response)
  return nil unless response.is_a?(Hash)

  (response[:wall_clock_ms] || response.dig(:timing, :wall_clock_ms) || 0).to_i
end

.flush_spoolObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/legion/llm/metering.rb', line 54

def flush_spool
  return 0 unless File.exist?(spool_file_path)

  event_class = metering_event_class
  unless event_class && transport_connected?
    log.debug('[llm][metering] flush_spool skipped reason=transport_unavailable')
    return 0
  end

  # Read and truncate atomically under the mutex so no events written
  # between read and truncate can be silently lost.
  events = SPOOL_MUTEX.synchronize do
    path = spool_file_path
    return 0 unless File.exist?(path)

    lines = File.readlines(path, chomp: true)
    parsed = lines.filter_map do |line|
      next if line.strip.empty?

      decrypted = decrypt_spool_line(line)
      Legion::JSON.load(decrypted)
    end
    File.write(path, '')
    parsed
  end

  return 0 if events.empty?

  batch_sleep = spool_settings[:flush_batch_sleep] || 0.0
  flushed = 0

  events.each_with_index do |event_data, index|
    event_class.new(**event_data).publish
    flushed += 1
    sleep(batch_sleep) if batch_sleep.positive? && index < events.size - 1
  end

  log.info("[llm][metering] flush_spool flushed=#{flushed}")
  flushed
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.flush_spool')
  0
end

.install_hookObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/legion/llm/metering.rb', line 98

def install_hook
  Legion::LLM::Hooks.after_chat do |response:, model:, caller: nil, **|
    usage = extract_usage(response)
    next if usage[:input_tokens].zero? && usage[:output_tokens].zero?

    resolved_model    = (extract_model(response) || model).to_s
    resolved_provider = extract_provider(response)

    Metering::Recorder.record(
      model:         resolved_model,
      input_tokens:  usage[:input_tokens],
      output_tokens: usage[:output_tokens],
      provider:      resolved_provider
    )

    emit(
      provider:              resolved_provider,
      model_id:              resolved_model,
      request_type:          'chat',
      tier:                  extract_tier(response),
      input_tokens:          usage[:input_tokens],
      output_tokens:         usage[:output_tokens],
      thinking_tokens:       usage[:thinking_tokens] || 0,
      total_tokens:          usage[:input_tokens] + usage[:output_tokens],
      finish_reason:         extract_finish_reason(response),
      error_category:        extract_error_category(response),
      error_code:            extract_error_code(response),
      error_message:         extract_error_message(response),
      provider_instance:     extract_provider_instance(response),
      dispatch_path:         extract_dispatch_path(response),
      route_attempts:        extract_route_attempts(response),
      provider_response_ref: response.respond_to?(:provider_response_id) ? response.provider_response_id : nil,
      latency_ms:            extract_latency_ms(response),
      wall_clock_ms:         extract_wall_clock_ms(response),
      caller:                caller,
      event_type:            'llm_completion',
      status:                extract_status(response)
    )
    nil
  end
end

.load_transportObject



20
21
22
23
24
25
# File 'lib/legion/llm/metering.rb', line 20

def self.load_transport
  return unless defined?(Legion::Transport::Message)

  require_relative 'transport/exchanges/metering'
  require_relative 'transport/messages/metering_event'
end

.metering_event_classObject



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/legion/llm/metering.rb', line 144

def metering_event_class
  return Legion::LLM::Transport::Messages::MeteringEvent if defined?(Legion::LLM::Transport::Messages::MeteringEvent)

  load_transport
  return Legion::LLM::Transport::Messages::MeteringEvent if defined?(Legion::LLM::Transport::Messages::MeteringEvent)

  Legion::LLM::Metering::Event
rescue NameError, LoadError => e
  handle_exception(e, level: :warn, handled: true, operation: 'llm.metering.event_class')
  nil
end

.read_spoolObject



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/legion/llm/metering.rb', line 223

def read_spool
  SPOOL_MUTEX.synchronize do
    path = spool_file_path
    return [] unless File.exist?(path)

    lines = File.readlines(path, chomp: true)
    lines.filter_map do |line|
      next if line.strip.empty?

      Legion::JSON.load(line)
    end
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.read_spool')
  []
end

.spool_dir_pathObject



280
281
282
# File 'lib/legion/llm/metering.rb', line 280

def spool_dir_path
  File.dirname(spool_file_path)
end

.spool_event(event) ⇒ Object

— Spool internals (private) —



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/legion/llm/metering.rb', line 189

def spool_event(event)
  SPOOL_MUTEX.synchronize do
    ensure_spool_dir
    enforce_max_events
    json = Legion::JSON.dump(event)
    line = if encrypt_spool?
             Legion::Crypt.encrypt(json)
           else
             json
           end
    File.open(spool_file_path, 'a') { |f| f.puts(line) }
  end
  log.debug("[llm][metering] spool_event written provider=#{event[:provider]} model=#{event[:model_id]}")
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.spool_event')
end

.spool_file_pathObject

Resolve spool file path at call time, honouring operator-configured paths (e.g. for containerised deployments where $HOME is not writable). Falls back to the compile-time SPOOL_FILE constant.



275
276
277
278
# File 'lib/legion/llm/metering.rb', line 275

def spool_file_path
  configured = spool_settings[:path]
  configured && !configured.to_s.strip.empty? ? configured.to_s : SPOOL_FILE
end

.spool_settingsObject



267
268
269
270
# File 'lib/legion/llm/metering.rb', line 267

def spool_settings
  settings = Legion::Settings.dig(:llm, :metering, :spool) || {}
  settings.is_a?(Hash) ? settings : {}
end

.transport_connected?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/legion/llm/metering.rb', line 140

def transport_connected?
  Legion::Settings.dig(:transport, :connected) == true
end

.truncate_spoolObject



240
241
242
243
244
245
246
247
# File 'lib/legion/llm/metering.rb', line 240

def truncate_spool
  SPOOL_MUTEX.synchronize do
    path = spool_file_path
    File.write(path, '') if File.exist?(path)
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.truncate_spool')
end