Class: AtomicRuby::AtomicThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/atomic-ruby/atomic_thread_pool.rb

Defined Under Namespace

Classes: InvalidWorkQueueingError, UnsupportedWorkTypeError

Instance Method Summary collapse

Constructor Details

#initialize(size:, name: nil) ⇒ AtomicThreadPool

Returns a new instance of AtomicThreadPool.



11
12
13
14
15
16
17
18
19
20
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 11

def initialize(size:, name: nil)
  @size = size
  @name = name
  @queue = Atom.new([])
  @threads = []
  @started_threads = Atom.new(0)
  @stopping = AtomicBoolean.new(false)

  start
end

Instance Method Details

#<<(work) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 22

def <<(work)
  unless work.is_a?(Proc) || work == :stop
    raise UnsupportedWorkTypeError, "expected work to be a `Proc`, got #{work.class}"
  end

  if @stopping.true?
    raise InvalidWorkQueueingError, "cannot queue work during or after pool shutdown"
  end

  @queue.swap { |current_queue| current_queue += [work] }
  true
end

#lengthObject



35
36
37
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 35

def length
  @threads.select(&:alive?).length
end

#queue_lengthObject



39
40
41
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 39

def queue_length
  @queue.value.length
end

#shutdownObject



43
44
45
46
47
# File 'lib/atomic-ruby/atomic_thread_pool.rb', line 43

def shutdown
  self << :stop
  @threads.each(&:join)
  true
end