Class: LlmCostTracker::StreamCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/stream_collector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {}) ⇒ StreamCollector

Returns a new instance of StreamCollector.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/llm_cost_tracker/stream_collector.rb', line 13

def initialize(provider:, model:, latency_ms: nil, provider_response_id: nil, pricing_mode: nil, metadata: {})
  @provider = provider.to_s
  @model = model
  @latency_ms = latency_ms
  @provider_response_id = provider_response_id
  @pricing_mode = pricing_mode
  @metadata = ValueHelpers.deep_dup( || {})
  @events = []
  @captured_bytes = 0
  @overflowed = false
  @explicit_usage = nil
  @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @finished = false
  @monitor = Monitor.new
end

Instance Attribute Details

#providerObject (readonly)

Returns the value of attribute provider.



11
12
13
# File 'lib/llm_cost_tracker/stream_collector.rb', line 11

def provider
  @provider
end

Instance Method Details

#event(data, type: nil) ⇒ Object Also known as: chunk



49
50
51
52
53
54
55
# File 'lib/llm_cost_tracker/stream_collector.rb', line 49

def event(data, type: nil)
  @monitor.synchronize do
    ensure_open!
    capture_event(data, type: type) unless data.nil?
  end
  self
end

#finish!(errored: false) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/llm_cost_tracker/stream_collector.rb', line 71

def finish!(errored: false)
  snapshot = @monitor.synchronize do
    return if @finished

    @finished = true
    {
      events: @events.dup,
      overflowed: @overflowed,
      explicit_usage: ValueHelpers.deep_dup(@explicit_usage),
      model: @model,
      latency_ms: @latency_ms,
      provider_response_id: @provider_response_id,
      pricing_mode: @pricing_mode,
      metadata: ValueHelpers.deep_dup(@metadata)
    }
  end

  parsed = build_parsed_usage(snapshot)
  Tracker.record(
    provider: parsed.provider,
    model: parsed.model,
    input_tokens: parsed.input_tokens,
    output_tokens: parsed.output_tokens,
    latency_ms: snapshot[:latency_ms] || elapsed_ms,
    stream: true,
    usage_source: parsed.usage_source,
    provider_response_id: parsed.provider_response_id || snapshot[:provider_response_id],
    pricing_mode: snapshot[:pricing_mode],
    metadata: (errored).merge(snapshot[:metadata]).merge(parsed.)
  )
end

#metadataObject



31
# File 'lib/llm_cost_tracker/stream_collector.rb', line 31

def  = @monitor.synchronize { ValueHelpers.deep_dup(@metadata) }

#modelObject



29
# File 'lib/llm_cost_tracker/stream_collector.rb', line 29

def model = @monitor.synchronize { @model }

#model=(value) ⇒ Object



35
36
37
38
39
40
# File 'lib/llm_cost_tracker/stream_collector.rb', line 35

def model=(value)
  @monitor.synchronize do
    ensure_open!
    @model = value
  end
end

#provider_response_idObject



33
# File 'lib/llm_cost_tracker/stream_collector.rb', line 33

def provider_response_id = @monitor.synchronize { @provider_response_id }

#provider_response_id=(value) ⇒ Object



42
43
44
45
46
47
# File 'lib/llm_cost_tracker/stream_collector.rb', line 42

def provider_response_id=(value)
  @monitor.synchronize do
    ensure_open!
    @provider_response_id = value
  end
end

#usage(input_tokens:, output_tokens:, **extra) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/llm_cost_tracker/stream_collector.rb', line 58

def usage(input_tokens:, output_tokens:, **extra)
  @monitor.synchronize do
    ensure_open!
    @explicit_usage = ValueHelpers.deep_dup(
      extra.merge(
        input_tokens: input_tokens.to_i,
        output_tokens: output_tokens.to_i
      )
    )
  end
  self
end