Class: LlmCostTracker::StreamCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/stream_collector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider:, model:, latency_ms: nil, metadata: {}) ⇒ StreamCollector

Returns a new instance of StreamCollector.



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/llm_cost_tracker/stream_collector.rb', line 11

def initialize(provider:, model:, latency_ms: nil, metadata: {})
  @provider = provider.to_s
  @model = model
  @latency_ms = latency_ms
  @metadata = ValueHelpers.deep_dup( || {})
  @events = []
  @explicit_usage = nil
  @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @finished = false
  @monitor = Monitor.new
end

Instance Attribute Details

#providerObject (readonly)

Returns the value of attribute provider.



9
10
11
# File 'lib/llm_cost_tracker/stream_collector.rb', line 9

def provider
  @provider
end

Instance Method Details

#event(data, type: nil) ⇒ Object Also known as: chunk



38
39
40
41
42
43
44
# File 'lib/llm_cost_tracker/stream_collector.rb', line 38

def event(data, type: nil)
  @monitor.synchronize do
    ensure_open!
    @events << { event: type, data: ValueHelpers.deep_dup(data) } unless data.nil?
  end
  self
end

#finish!(errored: false) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/llm_cost_tracker/stream_collector.rb', line 60

def finish!(errored: false)
  snapshot = @monitor.synchronize do
    return if @finished

    @finished = true
    {
      events: ValueHelpers.deep_dup(@events),
      explicit_usage: ValueHelpers.deep_dup(@explicit_usage),
      model: @model,
      latency_ms: @latency_ms,
      metadata: ValueHelpers.deep_dup(@metadata)
    }
  end

  parsed = build_parsed_usage(snapshot)
  Tracker.record(
    provider: parsed.provider,
    model: parsed.model,
    input_tokens: parsed.input_tokens,
    output_tokens: parsed.output_tokens,
    latency_ms: snapshot[:latency_ms] || elapsed_ms,
    stream: true,
    usage_source: parsed.usage_source,
    metadata: (errored).merge(snapshot[:metadata]).merge(parsed.)
  )
end

#metadataObject



27
28
29
# File 'lib/llm_cost_tracker/stream_collector.rb', line 27

def 
  @monitor.synchronize { ValueHelpers.deep_dup(@metadata) }
end

#modelObject



23
24
25
# File 'lib/llm_cost_tracker/stream_collector.rb', line 23

def model
  @monitor.synchronize { @model }
end

#model=(value) ⇒ Object



31
32
33
34
35
36
# File 'lib/llm_cost_tracker/stream_collector.rb', line 31

def model=(value)
  @monitor.synchronize do
    ensure_open!
    @model = value
  end
end

#usage(input_tokens:, output_tokens:, **extra) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/llm_cost_tracker/stream_collector.rb', line 47

def usage(input_tokens:, output_tokens:, **extra)
  @monitor.synchronize do
    ensure_open!
    @explicit_usage = ValueHelpers.deep_dup(
      extra.merge(
        input_tokens: input_tokens.to_i,
        output_tokens: output_tokens.to_i
      )
    )
  end
  self
end