Class: LlmCostTracker::StreamCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/stream_collector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, metadata: {}) ⇒ StreamCollector

Returns a new instance of StreamCollector.



11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/llm_cost_tracker/stream_collector.rb', line 11

def initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, metadata: {})
  @provider = provider.to_s
  @model = model
  @latency_ms = latency_ms
  @provider_response_id = provider_response_id
  @metadata = ValueHelpers.deep_dup( || {})
  @events = []
  @explicit_usage = nil
  @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @finished = false
  @monitor = Monitor.new
end

Instance Attribute Details

#providerObject (readonly)

Returns the value of attribute provider.



9
10
11
# File 'lib/llm_cost_tracker/stream_collector.rb', line 9

def provider
  @provider
end

Instance Method Details

#event(data, type: nil) ⇒ Object Also known as: chunk



44
45
46
47
48
49
50
# File 'lib/llm_cost_tracker/stream_collector.rb', line 44

def event(data, type: nil)
  @monitor.synchronize do
    ensure_open!
    @events << { event: type, data: ValueHelpers.deep_dup(data) } unless data.nil?
  end
  self
end

#finish!(errored: false) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/llm_cost_tracker/stream_collector.rb', line 66

def finish!(errored: false)
  snapshot = @monitor.synchronize do
    return if @finished

    @finished = true
    {
      events: ValueHelpers.deep_dup(@events),
      explicit_usage: ValueHelpers.deep_dup(@explicit_usage),
      model: @model,
      latency_ms: @latency_ms,
      provider_response_id: @provider_response_id,
      metadata: ValueHelpers.deep_dup(@metadata)
    }
  end

  parsed = build_parsed_usage(snapshot)
  Tracker.record(
    provider: parsed.provider,
    model: parsed.model,
    input_tokens: parsed.input_tokens,
    output_tokens: parsed.output_tokens,
    latency_ms: snapshot[:latency_ms] || elapsed_ms,
    stream: true,
    usage_source: parsed.usage_source,
    provider_response_id: parsed.provider_response_id || snapshot[:provider_response_id],
    metadata: (errored).merge(snapshot[:metadata]).merge(parsed.)
  )
end

#metadataObject



26
# File 'lib/llm_cost_tracker/stream_collector.rb', line 26

def  = @monitor.synchronize { ValueHelpers.deep_dup(@metadata) }

#modelObject



24
# File 'lib/llm_cost_tracker/stream_collector.rb', line 24

def model = @monitor.synchronize { @model }

#model=(value) ⇒ Object



30
31
32
33
34
35
# File 'lib/llm_cost_tracker/stream_collector.rb', line 30

def model=(value)
  @monitor.synchronize do
    ensure_open!
    @model = value
  end
end

#provider_response_idObject



28
# File 'lib/llm_cost_tracker/stream_collector.rb', line 28

def provider_response_id = @monitor.synchronize { @provider_response_id }

#provider_response_id=(value) ⇒ Object



37
38
39
40
41
42
# File 'lib/llm_cost_tracker/stream_collector.rb', line 37

def provider_response_id=(value)
  @monitor.synchronize do
    ensure_open!
    @provider_response_id = value
  end
end

#usage(input_tokens:, output_tokens:, **extra) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/llm_cost_tracker/stream_collector.rb', line 53

def usage(input_tokens:, output_tokens:, **extra)
  @monitor.synchronize do
    ensure_open!
    @explicit_usage = ValueHelpers.deep_dup(
      extra.merge(
        input_tokens: input_tokens.to_i,
        output_tokens: output_tokens.to_i
      )
    )
  end
  self
end