Class: RactorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor-pool.rb,
lib/ractor-pool/version.rb

Overview

A thread-safe, lock-free pool of Ractor workers with a coordinator pattern for distributing work.

RactorPool manages a fixed number of worker ractors that process work items in parallel. Work is distributed on-demand to idle workers, ensuring efficient utilisation. Results are collected and passed to a result handler running in a separate thread.

Examples:

Basic usage

results = []
worker = -> (work) { work * 2 }
pool = RactorPool.new(size: 4, worker: worker) { |result| results << result }

10.times { |index| pool << index }
pool.shutdown

p results #=> [2, 0, 6, 4, 14, 10, 8, 16, 18, 12]

Without result handler

counter = Atom.new(0)
worker = proc do |work|
  counter.swap { |current_value| current_value + 1 }
  work * 2
end
pool = RactorPool.new(size: 4, worker: worker)

10.times { |index| pool << index }
pool.shutdown

p counter.value #=> 10

See Also:

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Constant Summary collapse

VERSION =
"0.3.0"

Instance Method Summary collapse

Constructor Details

#initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil) {|result| ... } ⇒ void

Creates a new RactorPool with the specified number of workers.

Examples:

With result handler

pool = RactorPool.new(size: 4, worker: proc { it }) { |result| puts result }

Without result handler

pool = RactorPool.new(size: 4, worker: proc { it })

With error handler

error_count = Atom.new(0)
on_error = proc { error_count.swap { |count| count + 1 } }
pool = RactorPool.new(size: 4, worker: proc { raise }, on_error: on_error)

Parameters:

  • size (Integer) (defaults to: Etc.nprocessors)

    number of worker ractors to create

  • worker (Proc)

    a shareable proc that processes each work item

  • name (String, nil) (defaults to: nil)

    optional name for the pool, used in thread/ractor names

  • on_error (Proc, nil) (defaults to: nil)

    optional shareable proc called with the raised exception when a worker raises

Yield Parameters:

  • result (Object)

    the result returned by the worker proc

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if worker is not a proc

  • (ArgumentError)

    if on_error is given but is not a proc



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ractor-pool.rb', line 92

def initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil, &result_handler)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc)
  raise ArgumentError, "on_error must be a Proc" if on_error && !on_error.is_a?(Proc)

  @size = size
  @worker = Ractor.shareable_proc(&worker)
  @name = name
  @on_error = Ractor.shareable_proc(&on_error) if on_error
  @result_handler = result_handler

  @in_flight = Atom.new(0)
  @shutdown  = Atom.new(false)

  @result_port = Ractor::Port.new if result_handler
  @error_port  = Ractor::Port.new unless on_error
  @coordinator = start_coordinator if size > 1
  @workers = start_workers
  @error_collector = start_error_collector
  @collector = start_collector
end

Instance Method Details

#<<(work) ⇒ void

This method returns an undefined value.

Queues a work item to be processed by an available worker.

Examples:

pool << "http://example.com/page1"
pool << "http://example.com/page2"

Parameters:

  • work (Object)

    the work item to process

Raises:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/ractor-pool.rb', line 125

def <<(work)
  raise EnqueuedWorkAfterShutdownError if @shutdown.value

  @in_flight.swap { |count| count + 1 }

  if @shutdown.value
    @in_flight.swap { |count| count - 1 }
    raise EnqueuedWorkAfterShutdownError
  end

  begin
    (@coordinator || @workers.first).send(work, move: true)
  ensure
    @in_flight.swap { |count| count - 1 }
  end
end

#shutdownvoid

This method returns an undefined value.

Shuts down the pool gracefully.

This method:

  1. Prevents new work from being queued

  2. Waits for all in-flight work submissions to complete

  3. Allows all queued work to complete

  4. Waits for all workers to finish

  5. Waits for all results and errors to be processed

This method is idempotent and can be called multiple times safely.

Examples:

pool.shutdown


159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/ractor-pool.rb', line 159

def shutdown
  already_shutdown = false
  @shutdown.swap do |current|
    if current
      already_shutdown = true
      current
    else
      true
    end
  end
  return if already_shutdown

  Thread.pass until @in_flight.value.zero?

  @coordinator&.send(SHUTDOWN, move: true) ||
    (@workers.first.send(SHUTDOWN, move: true) && @result_port&.send(SHUTDOWN, move: true))
  @workers.each(&:join)
  @coordinator&.join
  @error_port&.send(SHUTDOWN, move: true)
  @error_collector&.join
  @collector&.join
end