Class: Riffer::Runner::Fibers

Inherits:
Riffer::Runner show all
Includes:
Helpers::Dependencies
Defined in:
lib/riffer/runner/fibers.rb

Overview

Processes items concurrently using fibers via the async gem.

All items run as fibers simultaneously by default. When max_concurrency is set, an Async::Semaphore limits how many fibers execute at once.

If multiple fibers raise, only the first exception is re-raised after all fibers finish; subsequent errors are discarded.

runner = Riffer::Runner::Fibers.new
runner.map(items) { |item| expensive_operation(item) }

Instance Method Summary collapse

Methods included from Helpers::Dependencies

#depends_on

Constructor Details

#initialize(max_concurrency: nil) ⇒ Fibers

max_concurrency

maximum number of fibers to run simultaneously.

When +nil+, all fibers run without limit.

– : (?max_concurrency: Integer?) -> void



26
27
28
29
30
# File 'lib/riffer/runner/fibers.rb', line 26

def initialize(max_concurrency: nil)
  depends_on "async"
  depends_on "async/semaphore" if max_concurrency
  @max_concurrency = max_concurrency
end

Instance Method Details

#map(items, context:, &block) ⇒ Object

– : (Array, context: Riffer::Agent::Context?) { (untyped) -> untyped } -> Array



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/riffer/runner/fibers.rb', line 34

def map(items, context:, &block)
  return [] if items.empty?

  results = Array.new(items.size)
  errors = Array.new(items.size)

  Async do
    barrier = Async::Barrier.new
    max = @max_concurrency
    parent = if max
      Async::Semaphore.new(max, parent: barrier)
    else
      barrier
    end

    items.each_with_index do |item, index|
      parent.async do
        results[index] = block.call(item)
      rescue => e
        errors[index] = e
      end
    end

    barrier.wait
  end

  first_error = errors.compact.first
  raise first_error if first_error

  results
end