Class: LlmCostTracker::Capture::StreamTracker
- Inherits:
-
Object
- Object
- LlmCostTracker::Capture::StreamTracker
- Defined in:
- lib/llm_cost_tracker/capture/stream_tracker.rb
Instance Method Summary collapse
-
#initialize(stream:, collector:, active:, finish: nil) ⇒ StreamTracker
constructor
A new instance of StreamTracker.
- #wrap ⇒ Object
Constructor Details
#initialize(stream:, collector:, active:, finish: nil) ⇒ StreamTracker
Returns a new instance of StreamTracker.
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/llm_cost_tracker/capture/stream_tracker.rb', line 11 def initialize(stream:, collector:, active:, finish: nil) @stream = stream @collector = collector @active = active @finish = finish || proc { |errored| collector.finish!(errored: errored) } @finished_ref = [false] @attempted_ref = [false] @capture_failed = false @mutex = Mutex.new end |
Instance Method Details
#wrap ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/llm_cost_tracker/capture/stream_tracker.rb', line 22 def wrap return @stream unless @stream iterator_wrapped = false if @stream.instance_variable_defined?(:@iterator) iterator = @stream.instance_variable_get(:@iterator) if iterator.respond_to?(:each) @stream.instance_variable_set(:@iterator, Enumerator.new do |yielder| each_from(iterator) { |event| yielder << event } end) iterator_wrapped = true end end each_wrapped = false if !iterator_wrapped && @stream.respond_to?(:each) wrap_each each_wrapped = true end unless iterator_wrapped || each_wrapped Logging.warn( "stream integration found no wrappable iterator on #{@stream.class} " \ "(missing both `@iterator` ivar and `#each`); usage will not be captured" ) end register_orphan_finalizer @stream rescue StandardError => e Logging.warn("stream integration failed to install wrapper: #{e.class}: #{e.}") @stream end |