Module: BetterAuth::Async

Defined in:
lib/better_auth/async.rb

Class Method Summary collapse

Class Method Details

.map_concurrent(items, concurrency:, &mapper) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/better_auth/async.rb', line 7

def map_concurrent(items, concurrency:, &mapper)
  list = items.to_a
  return [] if list.empty?

  width = normalized_concurrency(concurrency, list.length)
  results = Array.new(list.length)
  next_index = 0
  first_error = nil
  mutex = Mutex.new
  status = Queue.new

  workers = Array.new(width) do
    Thread.new do
      loop do
        index = mutex.synchronize do
          break if first_error || next_index >= list.length

          current = next_index
          next_index += 1
          current
        end
        break unless index

        begin
          results[index] = mapper.call(list[index], index)
        rescue => error
          mutex.synchronize { first_error ||= error }
          status << [:error, error]
          break
        end
      end
    ensure
      status << [:done, nil]
    end
  end

  done = 0
  while done < workers.length
    type, error = status.pop
    if type == :error
      workers.each { |worker| worker.kill if worker.alive? }
      workers.each(&:join)
      raise error
    end

    done += 1
  end

  raise first_error if first_error

  results
end

.normalized_concurrency(concurrency, item_count) ⇒ Object



60
61
62
63
64
65
66
67
68
# File 'lib/better_auth/async.rb', line 60

def normalized_concurrency(concurrency, item_count)
  raw = begin
    Float(concurrency).floor
  rescue ArgumentError, TypeError
    1
  end
  raw = 1 if raw < 1
  [raw, item_count].min
end