Class: Braintrust::Internal::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/braintrust/internal/thread_pool.rb

Overview

Note:

Thread limits are per-call, not global. If your application calls ThreadPool methods from multiple threads concurrently (e.g., web workers, background jobs), each call spawns its own worker threads. Plan your parallelism settings accordingly to avoid excessive thread creation.

Reusable thread pool for concurrent task execution. Uses the strategy pattern to define result handling behavior.

Examples:

Iterate without collecting results (Eval use case)

ThreadPool.each(items, parallelism: 4) do |item|
  process(item)
end

Collect results in order

results = ThreadPool.collect(items, parallelism: 4) do |item|
  transform(item)
end

Defined Under Namespace

Classes: Collect, Each

Constant Summary collapse

DEFAULT_PARALLELISM =
3
MAX_PARALLELISM =
50
STRATEGIES =
{
  each: Each,
  collect: Collect
}.freeze

Class Method Summary collapse

Class Method Details

.collect(items, parallelism: DEFAULT_PARALLELISM) {|item| ... } ⇒ Array

Execute block for each item concurrently, collecting results in order.

Parameters:

  • items (Array, Enumerable)

    Items to process

  • parallelism (Integer) (defaults to: DEFAULT_PARALLELISM)

    Number of worker threads (default: 3)

Yields:

  • (item)

    Block to execute for each item

Returns:

  • (Array)

    Results in same order as input items



113
114
115
# File 'lib/braintrust/internal/thread_pool.rb', line 113

def self.collect(items, parallelism: DEFAULT_PARALLELISM, &block)
  run(items, parallelism: parallelism, strategy: :collect, &block)
end

.each(items, parallelism: DEFAULT_PARALLELISM) {|item| ... } ⇒ nil

Execute block for each item concurrently, discarding results.

Parameters:

  • items (Array, Enumerable)

    Items to process

  • parallelism (Integer) (defaults to: DEFAULT_PARALLELISM)

    Number of worker threads (default: 3)

Yields:

  • (item)

    Block to execute for each item

Returns:

  • (nil)


104
105
106
# File 'lib/braintrust/internal/thread_pool.rb', line 104

def self.each(items, parallelism: DEFAULT_PARALLELISM, &block)
  run(items, parallelism: parallelism, strategy: :each, &block)
end

.run(items, strategy:, parallelism: DEFAULT_PARALLELISM) {|item| ... } ⇒ Object?

Execute block for each item concurrently using the specified strategy. Prefer using .each or .collect convenience methods instead.

Parameters:

  • items (Array, Enumerable)

    Items to process

  • strategy (Symbol, #prepare)

    Strategy for result handling (required)

  • parallelism (Integer) (defaults to: DEFAULT_PARALLELISM)

    Number of worker threads (default: 3)

Yields:

  • (item)

    Block to execute for each item

Returns:

  • (Object, nil)

    Strategy-dependent result



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/braintrust/internal/thread_pool.rb', line 124

def self.run(items, strategy:, parallelism: DEFAULT_PARALLELISM, &block)
  validate_parallelism!(parallelism)

  executor = strategy_instance(strategy)
  all_items = items.to_a

  return executor.sequential_run(all_items, &block) if parallelism == 1
  return executor.empty_result if all_items.empty?

  executor.prepare(all_items)
  executor.enqueue_sentinel(parallelism)

  threads = parallelism.times.map do
    Thread.new { executor.work_loop(&block) }
  end

  threads.each(&:join)
  executor.result
end