Class: LLM::Stream::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream/queue.rb

Overview

A small queue for collecting streamed tool work. Values can be immediate Function::Return objects or concurrent handles returned by Function#spawn. Calling #wait resolves queued work and returns an array of Function::Return values.

Instance Method Summary collapse

Constructor Details

#initialize(stream) ⇒ LLM::Stream::Queue

Parameters:



13
14
15
16
# File 'lib/llm/stream/queue.rb', line 13

def initialize(stream)
  @stream = stream
  @items = []
end

Instance Method Details

#<<(item) ⇒ LLM::Stream::Queue

Enqueue a function return or spawned task.

Parameters:

Returns:



22
23
24
25
# File 'lib/llm/stream/queue.rb', line 22

def <<(item)
  @items << item
  self
end

#empty?Boolean

Returns true when the queue is empty.

Returns:

  • (Boolean)


30
31
32
# File 'lib/llm/stream/queue.rb', line 30

def empty?
  @items.empty?
end

#interrupt!nil Also known as: cancel!

Returns:

  • (nil)


36
37
38
39
# File 'lib/llm/stream/queue.rb', line 36

def interrupt!
  @items.each(&:interrupt!)
  nil
end

#waitArray<LLM::Function::Return> Also known as: value

Waits for queued work to finish and returns function results.

Queued work is waited according to the actual task types that were enqueued, so callers do not need to provide a strategy here.

Returns:



49
50
51
52
53
# File 'lib/llm/stream/queue.rb', line 49

def wait
  returns, tasks = @items.shift(@items.length).partition { LLM::Function::Return === _1 }
  results = wait_tasks(tasks)
  returns.concat fire_hooks(tasks, results)
end