Class: LlmCostTracker::Capture::StreamCollector
- Inherits:
-
Object
- Object
- LlmCostTracker::Capture::StreamCollector
- Defined in:
- lib/llm_cost_tracker/capture/stream_collector.rb
Instance Attribute Summary collapse
-
#provider ⇒ Object
readonly
Returns the value of attribute provider.
Instance Method Summary collapse
- #event(data, type: nil) ⇒ Object
- #finish!(errored: false) ⇒ Object
-
#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {}) ⇒ StreamCollector
constructor
A new instance of StreamCollector.
- #metadata ⇒ Object
- #model ⇒ Object
- #model=(value) ⇒ Object
- #provider_response_id ⇒ Object
- #provider_response_id=(value) ⇒ Object
- #usage(input_tokens:, output_tokens:, **extra) ⇒ Object
Constructor Details
#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {}) ⇒ StreamCollector
Returns a new instance of StreamCollector.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 13 def initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {}) @provider = provider.to_s @model = model @latency_ms = latency_ms @provider_response_id = provider_response_id @pricing_mode = pricing_mode @metadata = ( || {}).deep_dup @events = [] @captured_bytes = 0 @overflowed = false @explicit_usage = nil @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) @finished = false @mutex = Mutex.new end |
Instance Attribute Details
#provider ⇒ Object (readonly)
Returns the value of attribute provider.
11 12 13 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 11 def provider @provider end |
Instance Method Details
#event(data, type: nil) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 55 def event(data, type: nil) @mutex.synchronize do ensure_open! capture_event(data, type: type) unless data.nil? end self end |
#finish!(errored: false) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 75 def finish!(errored: false) snapshot = @mutex.synchronize do return if @finished @finished = true { events: @events.dup, overflowed: @overflowed, explicit_usage: @explicit_usage, model: @model, latency_ms: @latency_ms, provider_response_id: @provider_response_id, pricing_mode: @pricing_mode, metadata: @metadata.deep_dup } end capture = build_usage_capture(snapshot) provider_response_id = capture.provider_response_id || snapshot[:provider_response_id] capture = capture.with(provider_response_id: provider_response_id) Tracker.record( capture: capture, latency_ms: snapshot[:latency_ms] || ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at) * 1000).round, pricing_mode: snapshot[:pricing_mode], metadata: (errored ? { stream_errored: true } : {}).merge(snapshot[:metadata]) ) end |
#metadata ⇒ Object
33 34 35 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 33 def @mutex.synchronize { @metadata.deep_dup } end |
#model ⇒ Object
29 30 31 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 29 def model @mutex.synchronize { @model } end |
#model=(value) ⇒ Object
41 42 43 44 45 46 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 41 def model=(value) @mutex.synchronize do ensure_open! @model = value end end |
#provider_response_id ⇒ Object
37 38 39 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 37 def provider_response_id @mutex.synchronize { @provider_response_id } end |
#provider_response_id=(value) ⇒ Object
48 49 50 51 52 53 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 48 def provider_response_id=(value) @mutex.synchronize do ensure_open! @provider_response_id = value end end |
#usage(input_tokens:, output_tokens:, **extra) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 63 def usage(input_tokens:, output_tokens:, **extra) @mutex.synchronize do ensure_open! @provider_response_id = extra.delete(:provider_response_id) || @provider_response_id @explicit_usage = TokenUsage.from_hash(extra.merge( input_tokens: input_tokens.to_i, output_tokens: output_tokens.to_i )) end self end |