Class: LLM::Stream::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream/queue.rb

Overview

A small queue for collecting streamed tool work. Values can be immediate Function::Return objects or concurrent handles returned by Function#spawn. Calling #wait(strategy) resolves queued work and returns an array of Function::Return values.

Instance Method Summary collapse

Constructor Details

#initialize(stream) ⇒ LLM::Stream::Queue

Parameters:



13
14
15
16
# File 'lib/llm/stream/queue.rb', line 13

def initialize(stream)
  @stream = stream
  @items = []
end

Instance Method Details

#<<(item) ⇒ LLM::Stream::Queue

Enqueue a function return or spawned task.

Parameters:

Returns:



22
23
24
25
# File 'lib/llm/stream/queue.rb', line 22

def <<(item)
  @items << item
  self
end

#empty?Boolean

Returns true when the queue is empty.

Returns:

  • (Boolean)


30
31
32
# File 'lib/llm/stream/queue.rb', line 30

def empty?
  @items.empty?
end

#wait(strategy) ⇒ Array<LLM::Function::Return> Also known as: value

Waits for queued work to finish and returns function results.

Parameters:

  • strategy (Symbol, Array<Symbol>)

    Controls concurrency strategy, or lists the possible concurrency strategies to wait on:

    • ‘:thread`: Use threads

    • ‘:task`: Use async tasks (requires async gem)

    • ‘:fiber`: Use raw fibers

    • ‘:ractor`: Use Ruby ractors (class-based tools only; MCP tools are not supported)

    • ‘[:thread, :ractor]`: Wait for any queued thread or ractor work, in the given order. This is useful when different tools were spawned with different concurrency strategies.

Returns:



47
48
49
50
51
# File 'lib/llm/stream/queue.rb', line 47

def wait(strategy)
  returns, tasks = @items.shift(@items.length).partition { LLM::Function::Return === _1 }
  results = wait_tasks(tasks, strategy)
  returns.concat fire_hooks(tasks, results)
end