Class: LlmCostTracker::StreamCollector
- Inherits:
-
Object
- Object
- LlmCostTracker::StreamCollector
- Defined in:
- lib/llm_cost_tracker/stream_collector.rb
Instance Attribute Summary collapse
-
#provider ⇒ Object
readonly
Returns the value of attribute provider.
Instance Method Summary collapse
- #event(data, type: nil) ⇒ Object (also: #chunk)
- #finish!(errored: false) ⇒ Object
-
#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, metadata: {}) ⇒ StreamCollector
constructor
A new instance of StreamCollector.
- #metadata ⇒ Object
- #model ⇒ Object
- #model=(value) ⇒ Object
- #provider_response_id ⇒ Object
- #provider_response_id=(value) ⇒ Object
- #usage(input_tokens:, output_tokens:, **extra) ⇒ Object
Constructor Details
#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, metadata: {}) ⇒ StreamCollector
Returns a new instance of StreamCollector.
11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 11 def initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, metadata: {}) @provider = provider.to_s @model = model @latency_ms = latency_ms @provider_response_id = provider_response_id @metadata = ValueHelpers.deep_dup( || {}) @events = [] @explicit_usage = nil @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) @finished = false @monitor = Monitor.new end |
Instance Attribute Details
#provider ⇒ Object (readonly)
Returns the value of attribute provider.
9 10 11 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 9 def provider @provider end |
Instance Method Details
#event(data, type: nil) ⇒ Object Also known as: chunk
44 45 46 47 48 49 50 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 44 def event(data, type: nil) @monitor.synchronize do ensure_open! @events << { event: type, data: ValueHelpers.deep_dup(data) } unless data.nil? end self end |
#finish!(errored: false) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 66 def finish!(errored: false) snapshot = @monitor.synchronize do return if @finished @finished = true { events: ValueHelpers.deep_dup(@events), explicit_usage: ValueHelpers.deep_dup(@explicit_usage), model: @model, latency_ms: @latency_ms, provider_response_id: @provider_response_id, metadata: ValueHelpers.deep_dup(@metadata) } end parsed = build_parsed_usage(snapshot) Tracker.record( provider: parsed.provider, model: parsed.model, input_tokens: parsed.input_tokens, output_tokens: parsed.output_tokens, latency_ms: snapshot[:latency_ms] || elapsed_ms, stream: true, usage_source: parsed.usage_source, provider_response_id: parsed.provider_response_id || snapshot[:provider_response_id], metadata: (errored).merge(snapshot[:metadata]).merge(parsed.) ) end |
#metadata ⇒ Object
26 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 26 def = @monitor.synchronize { ValueHelpers.deep_dup(@metadata) } |
#model ⇒ Object
24 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 24 def model = @monitor.synchronize { @model } |
#model=(value) ⇒ Object
30 31 32 33 34 35 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 30 def model=(value) @monitor.synchronize do ensure_open! @model = value end end |
#provider_response_id ⇒ Object
28 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 28 def provider_response_id = @monitor.synchronize { @provider_response_id } |
#provider_response_id=(value) ⇒ Object
37 38 39 40 41 42 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 37 def provider_response_id=(value) @monitor.synchronize do ensure_open! @provider_response_id = value end end |
#usage(input_tokens:, output_tokens:, **extra) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/llm_cost_tracker/stream_collector.rb', line 53 def usage(input_tokens:, output_tokens:, **extra) @monitor.synchronize do ensure_open! @explicit_usage = ValueHelpers.deep_dup( extra.merge( input_tokens: input_tokens.to_i, output_tokens: output_tokens.to_i ) ) end self end |