Module: Legion::LLM::Metering
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/metering.rb,
lib/legion/llm/metering/tokens.rb,
lib/legion/llm/metering/tracker.rb,
lib/legion/llm/metering/estimator.rb
Defined Under Namespace
Modules: Pricing, Recorder, Tokens
Constant Summary collapse
- DEFAULT_SPOOL_MAX =
10_000- DEFAULT_SPOOL_FLUSH_BATCH_SLEEP =
0.0
Class Method Summary collapse
- .attributed_event(event) ⇒ Object
-
.const_missing(name) ⇒ Object
Backward-compat: resolve old Legion::LLM::Metering::Exchange, ::Event.
- .emit(event) ⇒ Object
- .extract_hash_value(hash, key) ⇒ Object
- .extract_model(response) ⇒ Object
- .extract_provider(response) ⇒ Object
- .extract_usage(response) ⇒ Object
- .flush_spool ⇒ Object
- .install_hook ⇒ Object
- .load_transport ⇒ Object
- .metering_event_class ⇒ Object
- .spool_available? ⇒ Boolean
- .spool_event(event) ⇒ Object
- .spool_flush_batch_sleep ⇒ Object
- .spool_max_events ⇒ Object
- .transport_connected? ⇒ Boolean
Class Method Details
.attributed_event(event) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/legion/llm/metering.rb', line 52 def attributed_event(event) source = event.is_a?(Hash) ? event.dup : {} source[:identity] = Legion::LLM::PublisherIdentity.current source[:caller] ||= Legion::LLM::PublisherIdentity.caller_hash source end |
.const_missing(name) ⇒ Object
Backward-compat: resolve old Legion::LLM::Metering::Exchange, ::Event
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/legion/llm/metering.rb', line 174 def self.const_missing(name) case name when :Exchange require_relative 'transport/exchanges/metering' Transport::Exchanges::Metering when :Event require_relative 'transport/messages/metering_event' Transport::Messages::MeteringEvent else super end end |
.emit(event) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/legion/llm/metering.rb', line 27 def emit(event) event = attributed_event(event) event_class = metering_event_class if transport_connected? if event_class event_class.new(**event).publish log.info("[llm][metering] published provider=#{event[:provider]} model=#{event[:model_id]}") :published elsif spool_available? result = spool_event(event) if result == :spooled log.info("[llm][metering] spooled provider=#{event[:provider]} model=#{event[:model_id]}") else log.warn("[llm][metering] dropped provider=#{event[:provider]} model=#{event[:model_id]} reason=spool_cap") end result else log.warn("[llm][metering] dropped provider=#{event[:provider]} model=#{event[:model_id]}") :dropped end rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.metering.emit') :dropped end |
.extract_hash_value(hash, key) ⇒ Object
164 165 166 167 168 169 170 171 |
# File 'lib/legion/llm/metering.rb', line 164 def extract_hash_value(hash, key) return nil unless hash.respond_to?(:key?) string_key = key.to_s return hash[string_key] if hash.key?(string_key) hash[key] if hash.key?(key) end |
.extract_model(response) ⇒ Object
158 159 160 161 162 |
# File 'lib/legion/llm/metering.rb', line 158 def extract_model(response) return nil unless response.is_a?(Hash) extract_hash_value(extract_hash_value(response, :meta), :model) || extract_hash_value(response, :model) end |
.extract_provider(response) ⇒ Object
152 153 154 155 156 |
# File 'lib/legion/llm/metering.rb', line 152 def extract_provider(response) return nil unless response.is_a?(Hash) extract_hash_value(extract_hash_value(response, :meta), :provider) || extract_hash_value(response, :provider) end |
.extract_usage(response) ⇒ Object
142 143 144 145 146 147 148 149 150 |
# File 'lib/legion/llm/metering.rb', line 142 def extract_usage(response) return { input_tokens: 0, output_tokens: 0 } unless response.is_a?(Hash) usage = extract_hash_value(response, :usage) || {} { input_tokens: extract_hash_value(usage, :input_tokens) || extract_hash_value(usage, :prompt_tokens) || 0, output_tokens: extract_hash_value(usage, :output_tokens) || extract_hash_value(usage, :completion_tokens) || 0 } end |
.flush_spool ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/legion/llm/metering.rb', line 59 def flush_spool return 0 unless spool_available? && transport_connected? spool = Legion::Data::Spool.for(Legion::LLM) throttle = spool_flush_batch_sleep flushed = spool.flush(:metering) do |event| emit(event) sleep(throttle) if throttle.positive? end log.info("[llm][metering] spool_flushed count=#{flushed}") flushed rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.metering.flush_spool') 0 end |
.install_hook ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/legion/llm/metering.rb', line 75 def install_hook Legion::LLM::Hooks.after_chat do |response:, model:, caller: nil, **| usage = extract_usage(response) next if usage[:input_tokens].zero? && usage[:output_tokens].zero? resolved_model = (extract_model(response) || model).to_s resolved_provider = extract_provider(response) Metering::Recorder.record( model: resolved_model, input_tokens: usage[:input_tokens], output_tokens: usage[:output_tokens], provider: resolved_provider ) emit( provider: resolved_provider, model_id: resolved_model, input_tokens: usage[:input_tokens], output_tokens: usage[:output_tokens], caller: caller, event_type: 'llm_completion', status: response.is_a?(Hash) && response[:error] ? 'failure' : 'success' ) nil end end |
.load_transport ⇒ Object
18 19 20 21 22 23 |
# File 'lib/legion/llm/metering.rb', line 18 def self.load_transport return unless defined?(Legion::Transport::Message) require_relative 'transport/exchanges/metering' require_relative 'transport/messages/metering_event' end |
.metering_event_class ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/legion/llm/metering.rb', line 107 def metering_event_class return Legion::LLM::Transport::Messages::MeteringEvent if defined?(Legion::LLM::Transport::Messages::MeteringEvent) load_transport return Legion::LLM::Transport::Messages::MeteringEvent if defined?(Legion::LLM::Transport::Messages::MeteringEvent) Legion::LLM::Metering::Event rescue NameError, LoadError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.metering.event_class') nil end |
.spool_available? ⇒ Boolean
119 120 121 |
# File 'lib/legion/llm/metering.rb', line 119 def spool_available? !!defined?(Legion::Data::Spool) end |
.spool_event(event) ⇒ Object
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/legion/llm/metering.rb', line 123 def spool_event(event) spool = Legion::Data::Spool.for(Legion::LLM) if spool.respond_to?(:count) && spool.count(:metering).to_i >= spool_max_events log.warn("[llm][metering] spool_full count=#{spool.count(:metering)} max=#{spool_max_events}") return :dropped end spool.write(:metering, event) :spooled end |
.spool_flush_batch_sleep ⇒ Object
138 139 140 |
# File 'lib/legion/llm/metering.rb', line 138 def spool_flush_batch_sleep Legion::LLM::Settings.value(:metering, :spool, :flush_batch_sleep, default: DEFAULT_SPOOL_FLUSH_BATCH_SLEEP).to_f end |
.spool_max_events ⇒ Object
134 135 136 |
# File 'lib/legion/llm/metering.rb', line 134 def spool_max_events Legion::LLM::Settings.value(:metering, :spool, :max_events, default: DEFAULT_SPOOL_MAX).to_i end |
.transport_connected? ⇒ Boolean
103 104 105 |
# File 'lib/legion/llm/metering.rb', line 103 def transport_connected? Legion::LLM::Settings.transport_connected? end |