Class: RactorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor-pool.rb,
lib/ractor-pool/version.rb

Overview

A thread-safe, lock-free pool of Ractor workers with coordinator or round-robin dispatch for distributing work.

RactorPool manages a fixed number of worker ractors that process work items in parallel. The :coordinator strategy (the default) routes each work item to whichever worker is currently idle via a dedicated coordinator Ractor, so a slow item on one worker does not block faster items from being picked up by other workers. Use it when work items have variable cost. The :round_robin strategy dispatches work to workers in turn, so a slow item on one worker queues the next item destined for that worker behind it, even if other workers are idle. Use it when work items have uniform cost. Results are collected and passed to a result handler running in a separate thread.

Examples:

Basic usage

results = []
worker = -> (work) { work * 2 }
pool = RactorPool.new(size: 4, worker: worker) { |result| results << result }

10.times { |index| pool << index }
pool.shutdown

p results #=> [2, 0, 6, 4, 14, 10, 8, 16, 18, 12]

Without result handler

counter = Atom.new(0)
worker = proc do |work|
  counter.swap { |current_value| current_value + 1 }
  work * 2
end
pool = RactorPool.new(size: 4, worker: worker)

10.times { |index| pool << index }
pool.shutdown

p counter.value #=> 10

See Also:

Defined Under Namespace

Classes: EnqueuedWorkAfterShutdownError, Error

Constant Summary collapse

VERSION =
"0.4.0"

Instance Method Summary collapse

Constructor Details

#initialize(size: Etc.nprocessors, worker:, strategy: :coordinator, name: nil, on_error: nil) {|result| ... } ⇒ void

Creates a new RactorPool with the specified number of workers.

Examples:

With result handler

pool = RactorPool.new(size: 4, worker: proc { it }) { |result| puts result }

Without result handler

pool = RactorPool.new(size: 4, worker: proc { it })

With round-robin strategy

pool = RactorPool.new(size: 4, strategy: :round_robin, worker: proc { it })

With error handler

error_count = Atom.new(0)
on_error = proc { error_count.swap { |count| count + 1 } }
pool = RactorPool.new(size: 4, worker: proc { raise }, on_error: on_error)

Parameters:

  • size (Integer) (defaults to: Etc.nprocessors)

    number of worker ractors to create

  • worker (Proc)

    a shareable proc that processes each work item

  • strategy (Symbol) (defaults to: :coordinator)

    dispatch strategy, either :coordinator (the default) or :round_robin

  • name (String, nil) (defaults to: nil)

    optional name for the pool, used in thread/ractor names

  • on_error (Proc, nil) (defaults to: nil)

    optional shareable proc called with the raised exception when a worker raises

Yield Parameters:

  • result (Object)

    the result returned by the worker proc

Raises:

  • (ArgumentError)

    if size is not a positive integer

  • (ArgumentError)

    if worker is not a proc

  • (ArgumentError)

    if strategy is not :coordinator or :round_robin

  • (ArgumentError)

    if on_error is given but is not a proc



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/ractor-pool.rb', line 107

def initialize(size: Etc.nprocessors, worker:, strategy: :coordinator, name: nil, on_error: nil, &result_handler)
  raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0
  raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc)
  raise ArgumentError, "strategy must be one of #{STRATEGIES.inspect}" unless STRATEGIES.include?(strategy)
  raise ArgumentError, "on_error must be a Proc" if on_error && !on_error.is_a?(Proc)

  @size = size
  @worker = Ractor.shareable_proc(&worker)
  @strategy = strategy
  @name = name
  @on_error = Ractor.shareable_proc(&on_error) if on_error
  @result_handler = result_handler

  @in_flight = Atom.new(0)
  @shutdown  = Atom.new(false)
  @next_worker_index = Atom.new(-1) if size > 1 && strategy == :round_robin

  @result_port = Ractor::Port.new if result_handler
  @error_port  = Ractor::Port.new unless on_error
  @coordinator = start_coordinator if size > 1 && strategy == :coordinator
  @workers = start_workers
  @collector = start_collector
  @error_collector = start_error_collector
end

Instance Method Details

#<<(work) ⇒ void

This method returns an undefined value.

Queues a work item to be processed by an available worker.

Examples:

pool << "http://example.com/page1"
pool << "http://example.com/page2"

Parameters:

  • work (Object)

    the work item to process

Raises:



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/ractor-pool.rb', line 143

def <<(work)
  raise EnqueuedWorkAfterShutdownError if @shutdown.value

  @in_flight.swap { |count| count + 1 }

  if @shutdown.value
    @in_flight.swap { |count| count - 1 }
    raise EnqueuedWorkAfterShutdownError
  end

  target = if @coordinator
             @coordinator
           elsif @next_worker_index
             @workers[@next_worker_index.swap { |index| (index + 1) % @size }]
           else
             @workers.first
           end

  begin
    target.send(work, move: true)
  ensure
    @in_flight.swap { |count| count - 1 }
  end
end

#shutdownvoid

This method returns an undefined value.

Shuts down the pool gracefully.

This method:

  1. Prevents new work from being queued

  2. Waits for all in-flight work submissions to complete

  3. Allows all queued work to complete

  4. Waits for all workers to finish

  5. Waits for all results and errors to be processed

This method is idempotent and can be called multiple times safely.

Examples:

pool.shutdown


185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/ractor-pool.rb', line 185

def shutdown
  already_shutdown = false
  @shutdown.swap do |current|
    if current
      already_shutdown = true
      current
    else
      true
    end
  end
  return if already_shutdown

  Thread.pass until @in_flight.value.zero?

  if @coordinator
    @coordinator.send(SHUTDOWN, move: true)
    @workers.each(&:join)
    @coordinator.join
  else
    @workers.each { |worker| worker.send(SHUTDOWN, move: true) }
    @workers.each(&:join)
    @result_port&.send(SHUTDOWN, move: true)
  end
  @error_port&.send(SHUTDOWN, move: true)
  @error_collector&.join
  @collector&.join
end