Module: Legion::LLM::Metering
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/metering.rb,
lib/legion/llm/metering/tokens.rb,
lib/legion/llm/metering/tracker.rb,
lib/legion/llm/metering/estimator.rb
Defined Under Namespace
Modules: Pricing, Recorder, Tokens
Constant Summary
collapse
- DEFAULT_SPOOL_MAX =
10_000
- DEFAULT_SPOOL_FLUSH_BATCH_SLEEP =
0.0
Class Method Summary
collapse
Class Method Details
.const_missing(name) ⇒ Object
Backward-compat: resolve old Legion::LLM::Metering::Exchange, ::Event
164
165
166
167
168
169
170
171
172
173
174
175
|
# File 'lib/legion/llm/metering.rb', line 164
def self.const_missing(name)
case name
when :Exchange
require_relative 'transport/exchanges/metering'
Transport::Exchanges::Metering
when :Event
require_relative 'transport/messages/metering_event'
Transport::Messages::MeteringEvent
else
super
end
end
|
.emit(event) ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/legion/llm/metering.rb', line 26
def emit(event)
event_class = metering_event_class if transport_connected?
if event_class
event_class.new(**event).publish
log.info("[llm][metering] published provider=#{event[:provider]} model=#{event[:model_id]}")
:published
elsif spool_available?
result = spool_event(event)
if result == :spooled
log.info("[llm][metering] spooled provider=#{event[:provider]} model=#{event[:model_id]}")
else
log.warn("[llm][metering] dropped provider=#{event[:provider]} model=#{event[:model_id]} reason=spool_cap")
end
result
else
log.warn("[llm][metering] dropped provider=#{event[:provider]} model=#{event[:model_id]}")
:dropped
end
rescue StandardError => e
handle_exception(e, level: :warn, operation: 'llm.metering.emit')
:dropped
end
|
148
149
150
151
152
|
# File 'lib/legion/llm/metering.rb', line 148
def (response)
return nil unless response.is_a?(Hash)
settings_value(settings_value(response, :meta), :model) || settings_value(response, :model)
end
|
142
143
144
145
146
|
# File 'lib/legion/llm/metering.rb', line 142
def (response)
return nil unless response.is_a?(Hash)
settings_value(settings_value(response, :meta), :provider) || settings_value(response, :provider)
end
|
132
133
134
135
136
137
138
139
140
|
# File 'lib/legion/llm/metering.rb', line 132
def (response)
return { input_tokens: 0, output_tokens: 0 } unless response.is_a?(Hash)
usage = settings_value(response, :usage) || {}
{
input_tokens: settings_value(usage, :input_tokens) || settings_value(usage, :prompt_tokens) || 0,
output_tokens: settings_value(usage, :output_tokens) || settings_value(usage, :completion_tokens) || 0
}
end
|
.flush_spool ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/legion/llm/metering.rb', line 50
def flush_spool
return 0 unless spool_available? && transport_connected?
spool = Legion::Data::Spool.for(Legion::LLM)
throttle = spool_flush_batch_sleep
flushed = spool.flush(:metering) do |event|
emit(event)
sleep(throttle) if throttle.positive?
end
log.info("[llm][metering] spool_flushed count=#{flushed}")
flushed
rescue StandardError => e
handle_exception(e, level: :warn, operation: 'llm.metering.flush_spool')
0
end
|
.install_hook ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/legion/llm/metering.rb', line 66
def install_hook
Legion::LLM::Hooks.after_chat do |response:, model:, **|
usage = (response)
next if usage[:input_tokens].zero? && usage[:output_tokens].zero?
resolved_model = ((response) || model).to_s
resolved_provider = (response)
Metering::Recorder.record(
model: resolved_model,
input_tokens: usage[:input_tokens],
output_tokens: usage[:output_tokens],
provider: resolved_provider
)
emit(
provider: resolved_provider,
model_id: resolved_model,
input_tokens: usage[:input_tokens],
output_tokens: usage[:output_tokens],
event_type: 'llm_completion',
status: response.is_a?(Hash) && response[:error] ? 'failure' : 'success'
)
nil
end
end
|
.load_transport ⇒ Object
17
18
19
20
21
22
|
# File 'lib/legion/llm/metering.rb', line 17
def self.load_transport
return unless defined?(Legion::Transport::Message)
require_relative 'transport/exchanges/metering'
require_relative 'transport/messages/metering_event'
end
|
.metering_event_class ⇒ Object
.settings_value(hash, key) ⇒ Object
154
155
156
157
158
159
160
161
|
# File 'lib/legion/llm/metering.rb', line 154
def settings_value(hash, key)
return nil unless hash.respond_to?(:key?)
string_key = key.to_s
return hash[string_key] if hash.key?(string_key)
hash[key] if hash.key?(key)
end
|
.spool_available? ⇒ Boolean
109
110
111
|
# File 'lib/legion/llm/metering.rb', line 109
def spool_available?
!!defined?(Legion::Data::Spool)
end
|
.spool_event(event) ⇒ Object
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/legion/llm/metering.rb', line 113
def spool_event(event)
spool = Legion::Data::Spool.for(Legion::LLM)
if spool.respond_to?(:count) && spool.count(:metering).to_i >= spool_max_events
log.warn("[llm][metering] spool_full count=#{spool.count(:metering)} max=#{spool_max_events}")
return :dropped
end
spool.write(:metering, event)
:spooled
end
|
.spool_flush_batch_sleep ⇒ Object
.spool_max_events ⇒ Object
.transport_connected? ⇒ Boolean