Class: Riffer::Runner::Fibers

Inherits:
Riffer::Runner show all
Includes:
Helpers::Dependencies
Defined in:
lib/riffer/runner/fibers.rb

Overview

Processes items concurrently using fibers via the async gem.

All items run as fibers simultaneously by default. When max_concurrency is set, an Async::Semaphore limits how many fibers execute at once.

If multiple fibers raise, only the first exception is re-raised after all fibers finish; subsequent errors are discarded.

runner = Riffer::Runner::Fibers.new
runner.map(items) { |item| expensive_operation(item) }

Instance Method Summary collapse

Methods included from Helpers::Dependencies

#depends_on

Constructor Details

#initialize(max_concurrency: nil) ⇒ Fibers

max_concurrency

maximum number of fibers to run simultaneously.

When +nil+, all fibers run without limit.

– : (?max_concurrency: Integer?) -> void



24
25
26
27
28
# File 'lib/riffer/runner/fibers.rb', line 24

def initialize(max_concurrency: nil)
  depends_on "async"
  depends_on "async/semaphore" if max_concurrency
  @max_concurrency = max_concurrency
end

Instance Method Details

#map(items, context:, &block) ⇒ Object

– : (Array, context: Hash[Symbol, untyped]?) { (untyped) -> untyped } -> Array



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/riffer/runner/fibers.rb', line 32

def map(items, context:, &block)
  return [] if items.empty?

  results = Array.new(items.size)
  errors = Array.new(items.size)

  Async do
    barrier = Async::Barrier.new
    parent = if @max_concurrency
      Async::Semaphore.new(@max_concurrency, parent: barrier)
    else
      barrier
    end

    items.each_with_index do |item, index|
      parent.async do
        results[index] = block.call(item)
      rescue => e
        errors[index] = e
      end
    end

    barrier.wait
  end

  first_error = errors.compact.first
  raise first_error if first_error

  results
end