Class: Riffer::Runner::Threaded
- Inherits:
-
Riffer::Runner
- Object
- Riffer::Runner
- Riffer::Runner::Threaded
- Defined in:
- lib/riffer/runner/threaded.rb
Overview
Processes items concurrently using a thread pool.
Maintains up to max_concurrency worker threads that pull items from a shared queue. When a worker finishes one item it immediately picks up the next, so a single slow item does not block other workers.
If multiple workers raise, only the first exception is re-raised after all workers finish; subsequent errors are discarded.
runner = Riffer::Runner::Threaded.new(max_concurrency: 3)
runner.map(items) { |item| expensive_operation(item) }
Constant Summary collapse
- DEFAULT_MAX_CONCURRENCY =
5
Instance Method Summary collapse
-
#initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) ⇒ Threaded
constructor
- max_concurrency
-
maximum number of threads to run simultaneously.
- #map(items, context:, &block) ⇒ Object
Constructor Details
#initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) ⇒ Threaded
- max_concurrency
-
maximum number of threads to run simultaneously.
– : (?max_concurrency: Integer) -> void
25 26 27 |
# File 'lib/riffer/runner/threaded.rb', line 25 def initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) @max_concurrency = max_concurrency end |
Instance Method Details
#map(items, context:, &block) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/riffer/runner/threaded.rb', line 31 def map(items, context:, &block) return [] if items.empty? results = Array.new(items.size) errors = Array.new(items.size) queue = Queue.new items.each_with_index { |item, i| queue << [item, i] } workers = [items.size, @max_concurrency].min.times.map do Thread.new do loop do pair = begin queue.pop(true) rescue ThreadError break end item, index = pair begin results[index] = block.call(item) rescue => e errors[index] = e end end end end workers.each(&:join) first_error = errors.compact.first raise first_error if first_error results end |