Class: Riffer::Runner::Threaded
- Inherits:
-
Riffer::Runner
- Object
- Riffer::Runner
- Riffer::Runner::Threaded
- Defined in:
- lib/riffer/runner/threaded.rb
Overview
Processes items concurrently using a thread pool.
Maintains up to max_concurrency worker threads that pull items from a shared queue. When a worker finishes one item it immediately picks up the next, so a single slow item does not block other workers.
If multiple workers raise, only the first exception is re-raised after all workers finish; subsequent errors are discarded.
runner = Riffer::Runner::Threaded.new(max_concurrency: 3)
runner.map(items) { |item| expensive_operation(item) }
Constant Summary collapse
- DEFAULT_MAX_CONCURRENCY =
: Integer
5
Instance Method Summary collapse
-
#initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) ⇒ Threaded
constructor
- max_concurrency
-
maximum number of threads to run simultaneously.
- #map(items, context:, &block) ⇒ Object
Constructor Details
#initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) ⇒ Threaded
- max_concurrency
-
maximum number of threads to run simultaneously.
– : (?max_concurrency: Integer) -> void
23 24 25 |
# File 'lib/riffer/runner/threaded.rb', line 23 def initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) @max_concurrency = max_concurrency end |
Instance Method Details
#map(items, context:, &block) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/riffer/runner/threaded.rb', line 29 def map(items, context:, &block) return [] if items.empty? results = Array.new(items.size) errors = Array.new(items.size) queue = Queue.new items.each_with_index { |item, i| queue << [item, i] } workers = [items.size, @max_concurrency].min.times.map do Thread.new do loop do pair = begin queue.pop(true) rescue ThreadError break end item, index = pair begin results[index] = block.call(item) rescue => e errors[index] = e end end end end workers.each(&:join) first_error = errors.compact.first raise first_error if first_error results end |