Module: Legion::LLM::Metering

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/metering.rb,
lib/legion/llm/metering/tokens.rb,
lib/legion/llm/metering/tracker.rb,
lib/legion/llm/metering/estimator.rb

Defined Under Namespace

Modules: Pricing, Recorder, Tokens

Constant Summary collapse

DEFAULT_SPOOL_MAX =
10_000
DEFAULT_SPOOL_FLUSH_BATCH_SLEEP =
0.0

Class Method Summary collapse

Class Method Details

.const_missing(name) ⇒ Object

Backward-compat: resolve old Legion::LLM::Metering::Exchange, ::Event



164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/legion/llm/metering.rb', line 164

def self.const_missing(name)
  case name
  when :Exchange
    require_relative 'transport/exchanges/metering'
    Transport::Exchanges::Metering
  when :Event
    require_relative 'transport/messages/metering_event'
    Transport::Messages::MeteringEvent
  else
    super
  end
end

.emit(event) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/legion/llm/metering.rb', line 26

def emit(event)
  event_class = metering_event_class if transport_connected?

  if event_class
    event_class.new(**event).publish
    log.info("[llm][metering] published provider=#{event[:provider]} model=#{event[:model_id]}")
    :published
  elsif spool_available?
    result = spool_event(event)
    if result == :spooled
      log.info("[llm][metering] spooled provider=#{event[:provider]} model=#{event[:model_id]}")
    else
      log.warn("[llm][metering] dropped provider=#{event[:provider]} model=#{event[:model_id]} reason=spool_cap")
    end
    result
  else
    log.warn("[llm][metering] dropped provider=#{event[:provider]} model=#{event[:model_id]}")
    :dropped
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.emit')
  :dropped
end

.extract_model(response) ⇒ Object



148
149
150
151
152
# File 'lib/legion/llm/metering.rb', line 148

def extract_model(response)
  return nil unless response.is_a?(Hash)

  settings_value(settings_value(response, :meta), :model) || settings_value(response, :model)
end

.extract_provider(response) ⇒ Object



142
143
144
145
146
# File 'lib/legion/llm/metering.rb', line 142

def extract_provider(response)
  return nil unless response.is_a?(Hash)

  settings_value(settings_value(response, :meta), :provider) || settings_value(response, :provider)
end

.extract_usage(response) ⇒ Object



132
133
134
135
136
137
138
139
140
# File 'lib/legion/llm/metering.rb', line 132

def extract_usage(response)
  return { input_tokens: 0, output_tokens: 0 } unless response.is_a?(Hash)

  usage = settings_value(response, :usage) || {}
  {
    input_tokens:  settings_value(usage, :input_tokens) || settings_value(usage, :prompt_tokens) || 0,
    output_tokens: settings_value(usage, :output_tokens) || settings_value(usage, :completion_tokens) || 0
  }
end

.flush_spoolObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/legion/llm/metering.rb', line 50

def flush_spool
  return 0 unless spool_available? && transport_connected?

  spool = Legion::Data::Spool.for(Legion::LLM)
  throttle = spool_flush_batch_sleep
  flushed = spool.flush(:metering) do |event|
    emit(event)
    sleep(throttle) if throttle.positive?
  end
  log.info("[llm][metering] spool_flushed count=#{flushed}")
  flushed
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.metering.flush_spool')
  0
end

.install_hookObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/legion/llm/metering.rb', line 66

def install_hook
  Legion::LLM::Hooks.after_chat do |response:, model:, **|
    usage = extract_usage(response)
    next if usage[:input_tokens].zero? && usage[:output_tokens].zero?

    resolved_model    = (extract_model(response) || model).to_s
    resolved_provider = extract_provider(response)

    Metering::Recorder.record(
      model:         resolved_model,
      input_tokens:  usage[:input_tokens],
      output_tokens: usage[:output_tokens],
      provider:      resolved_provider
    )

    emit(
      provider:      resolved_provider,
      model_id:      resolved_model,
      input_tokens:  usage[:input_tokens],
      output_tokens: usage[:output_tokens],
      event_type:    'llm_completion',
      status:        response.is_a?(Hash) && response[:error] ? 'failure' : 'success'
    )
    nil
  end
end

.load_transportObject



17
18
19
20
21
22
# File 'lib/legion/llm/metering.rb', line 17

def self.load_transport
  return unless defined?(Legion::Transport::Message)

  require_relative 'transport/exchanges/metering'
  require_relative 'transport/messages/metering_event'
end

.metering_event_classObject



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/legion/llm/metering.rb', line 97

def metering_event_class
  return Legion::LLM::Transport::Messages::MeteringEvent if defined?(Legion::LLM::Transport::Messages::MeteringEvent)

  load_transport
  return Legion::LLM::Transport::Messages::MeteringEvent if defined?(Legion::LLM::Transport::Messages::MeteringEvent)

  Legion::LLM::Metering::Event
rescue NameError, LoadError => e
  handle_exception(e, level: :warn, handled: true, operation: 'llm.metering.event_class')
  nil
end

.settings_value(hash, key) ⇒ Object



154
155
156
157
158
159
160
161
# File 'lib/legion/llm/metering.rb', line 154

def settings_value(hash, key)
  return nil unless hash.respond_to?(:key?)

  string_key = key.to_s
  return hash[string_key] if hash.key?(string_key)

  hash[key] if hash.key?(key)
end

.spool_available?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/legion/llm/metering.rb', line 109

def spool_available?
  !!defined?(Legion::Data::Spool)
end

.spool_event(event) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/legion/llm/metering.rb', line 113

def spool_event(event)
  spool = Legion::Data::Spool.for(Legion::LLM)
  if spool.respond_to?(:count) && spool.count(:metering).to_i >= spool_max_events
    log.warn("[llm][metering] spool_full count=#{spool.count(:metering)} max=#{spool_max_events}")
    return :dropped
  end

  spool.write(:metering, event)
  :spooled
end

.spool_flush_batch_sleepObject



128
129
130
# File 'lib/legion/llm/metering.rb', line 128

def spool_flush_batch_sleep
  Legion::LLM::Settings.value(:metering, :spool, :flush_batch_sleep, default: DEFAULT_SPOOL_FLUSH_BATCH_SLEEP).to_f
end

.spool_max_eventsObject



124
125
126
# File 'lib/legion/llm/metering.rb', line 124

def spool_max_events
  Legion::LLM::Settings.value(:metering, :spool, :max_events, default: DEFAULT_SPOOL_MAX).to_i
end

.transport_connected?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/legion/llm/metering.rb', line 93

def transport_connected?
  Legion::LLM::Settings.transport_connected?
end