Class: LlmCostTracker::Capture::StreamCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/capture/stream_collector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {}) ⇒ StreamCollector

Returns a new instance of StreamCollector.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 13

def initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {})
  @provider = provider.to_s
  @model = model
  @latency_ms = latency_ms
  @provider_response_id = provider_response_id
  @pricing_mode = pricing_mode
  @metadata = ( || {}).deep_dup
  @events = []
  @captured_bytes = 0
  @overflowed = false
  @explicit_usage = nil
  @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @finished = false
  @mutex = Mutex.new
end

Instance Attribute Details

#providerObject (readonly)

Returns the value of attribute provider.



11
12
13
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 11

def provider
  @provider
end

Instance Method Details

#event(data, type: nil) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 55

def event(data, type: nil)
  @mutex.synchronize do
    ensure_open!
    capture_event(data, type: type) unless data.nil?
  end
  self
end

#finish!(errored: false) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 75

def finish!(errored: false)
  snapshot = @mutex.synchronize do
    return if @finished

    @finished = true
    {
      events: @events.dup,
      overflowed: @overflowed,
      explicit_usage: @explicit_usage,
      model: @model,
      latency_ms: @latency_ms,
      provider_response_id: @provider_response_id,
      pricing_mode: @pricing_mode,
      metadata: @metadata.deep_dup
    }
  end

  capture = build_usage_capture(snapshot)
  provider_response_id = capture.provider_response_id || snapshot[:provider_response_id]
  capture = capture.with(provider_response_id: provider_response_id)

  Tracker.record(
    capture: capture,
    latency_ms: snapshot[:latency_ms] ||
      ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at) * 1000).round,
    pricing_mode: snapshot[:pricing_mode],
    metadata: (errored ? { stream_errored: true } : {}).merge(snapshot[:metadata])
  )
end

#metadataObject



33
34
35
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 33

def 
  @mutex.synchronize { @metadata.deep_dup }
end

#modelObject



29
30
31
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 29

def model
  @mutex.synchronize { @model }
end

#model=(value) ⇒ Object



41
42
43
44
45
46
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 41

def model=(value)
  @mutex.synchronize do
    ensure_open!
    @model = value
  end
end

#provider_response_idObject



37
38
39
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 37

def provider_response_id
  @mutex.synchronize { @provider_response_id }
end

#provider_response_id=(value) ⇒ Object



48
49
50
51
52
53
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 48

def provider_response_id=(value)
  @mutex.synchronize do
    ensure_open!
    @provider_response_id = value
  end
end

#usage(input_tokens:, output_tokens:, **extra) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
# File 'lib/llm_cost_tracker/capture/stream_collector.rb', line 63

def usage(input_tokens:, output_tokens:, **extra)
  @mutex.synchronize do
    ensure_open!
    @provider_response_id = extra.delete(:provider_response_id) || @provider_response_id
    @explicit_usage = TokenUsage.from_hash(extra.merge(
                                             input_tokens: input_tokens.to_i,
                                             output_tokens: output_tokens.to_i
                                           ))
  end
  self
end