Class: LlmCostTracker::Capture::StreamTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_cost_tracker/capture/stream_tracker.rb

Instance Method Summary collapse

Constructor Details

#initialize(stream, collector, active, finish) ⇒ StreamTracker

Returns a new instance of StreamTracker.



11
12
13
14
15
16
17
18
19
# File 'lib/llm_cost_tracker/capture/stream_tracker.rb', line 11

def initialize(stream, collector, active, finish)
  @stream = stream
  @collector = collector
  @active = active
  @finish = finish || proc { |errored:| @collector.finish!(errored: errored) }
  @finished = false
  @capture_failed = false
  @mutex = Mutex.new
end

Instance Method Details

#wrapObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/llm_cost_tracker/capture/stream_tracker.rb', line 21

def wrap
  return @stream unless @stream

  iterator_wrapped = false
  if @stream.instance_variable_defined?(:@iterator)
    iterator = @stream.instance_variable_get(:@iterator)
    if iterator.respond_to?(:each)
      @stream.instance_variable_set(:@iterator, Enumerator.new do |yielder|
        each_from(iterator) { |event| yielder << event }
      end)
      iterator_wrapped = true
    end
  end
  wrap_each if !iterator_wrapped && @stream.respond_to?(:each)

  @stream
rescue StandardError => e
  Logging.warn("stream integration failed to install wrapper: #{e.class}: #{e.message}")
  @stream
end