Class: Riffer::Runner::Fibers

Inherits:
Riffer::Runner show all
Defined in:
lib/riffer/runner/fibers.rb

Overview

Processes items concurrently using fibers via the async gem. max_concurrency caps simultaneous fibers via an Async::Semaphore. If multiple fibers raise, only the first exception is re-raised after all finish.

Instance Method Summary collapse

Constructor Details

#initialize(max_concurrency: nil) ⇒ Fibers

– : (?max_concurrency: Integer?) -> void



13
14
15
16
17
# File 'lib/riffer/runner/fibers.rb', line 13

def initialize(max_concurrency: nil)
  depends_on "async"
  depends_on "async/semaphore" if max_concurrency
  @max_concurrency = max_concurrency
end

Instance Method Details

#map(items, context:, &block) ⇒ Object

– : (Array, context: Riffer::Agent::Context?) { (untyped) -> untyped } -> Array



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/riffer/runner/fibers.rb', line 21

def map(items, context:, &block)
  return [] if items.empty?

  results = Array.new(items.size)
  errors = Array.new(items.size)

  Async do
    barrier = Async::Barrier.new
    max = @max_concurrency
    parent = if max
      Async::Semaphore.new(max, parent: barrier)
    else
      barrier
    end

    items.each_with_index do |item, index|
      parent.async do
        results[index] = block.call(item)
      rescue => e
        errors[index] = e
      end
    end

    barrier.wait
  end

  first_error = errors.compact.first
  raise first_error if first_error

  results
end