Class: RobotLab::RactorWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/ractor_worker_pool.rb

Overview

A pool of Ractor workers that execute CPU-bound, Ractor-safe tools.

Work is distributed via a shared RactorQueue. Each worker runs a blocking loop, pops RactorJob instances, dispatches to the named tool class, and pushes the frozen result (or a RactorJobError) to the job’s per-job reply_queue.

Shutdown uses a poison-pill pattern: one nil sentinel per worker is pushed to the work queue; each worker exits when it pops nil.

Only tools that declare ractor_safe true should be submitted. Tool classes are instantiated fresh inside the Ractor for each call.

Examples:

pool = RactorWorkerPool.new(size: 4)
result = pool.submit("MyTool", { "arg" => "value" })
pool.shutdown

Constant Summary collapse

QUEUE_CAPACITY =
1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: :auto) ⇒ RactorWorkerPool

Returns a new instance of RactorWorkerPool.

Parameters:

  • size (Integer, :auto) (defaults to: :auto)

    number of workers (:auto = Etc.nprocessors)



28
29
30
31
32
33
# File 'lib/robot_lab/ractor_worker_pool.rb', line 28

def initialize(size: :auto)
  @size    = size == :auto ? Etc.nprocessors : size.to_i
  @closed  = false
  @work_q  = RactorQueue.new(capacity: QUEUE_CAPACITY)
  @workers = @size.times.map { spawn_worker(@work_q) }
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



25
26
27
# File 'lib/robot_lab/ractor_worker_pool.rb', line 25

def size
  @size
end

Class Method Details

.process_job(job) ⇒ Object

Called inside Ractor worker blocks — must be a class method.



77
78
79
80
81
82
83
84
# File 'lib/robot_lab/ractor_worker_pool.rb', line 77

def self.process_job(job)
  tool_class    = Object.const_get(job.payload[:tool_class])
  result        = tool_class.new.execute(**job.payload[:args].transform_keys(&:to_sym))
  frozen_result = ::Ractor.make_shareable(result.frozen? ? result : result.dup.freeze)
  job.reply_queue.push(frozen_result)
rescue StandardError => e
  job.reply_queue.push(wrap_error(e))
end

.wrap_error(err) ⇒ Object



86
87
88
89
90
91
# File 'lib/robot_lab/ractor_worker_pool.rb', line 86

def self.wrap_error(err)
  RobotLab::RactorJobError.new(
    message: err.message.freeze,
    backtrace: (err.backtrace || []).map(&:freeze).freeze
  )
end

Instance Method Details

#shutdownvoid

This method returns an undefined value.

Gracefully shut down the pool via poison-pill pattern.



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/robot_lab/ractor_worker_pool.rb', line 64

def shutdown
  return if @closed

  @closed = true
  @size.times { @work_q.push(nil) }
  @workers.each do |w|
    w.join
  rescue StandardError
    nil
  end
end

#submit(tool_class_name, args) ⇒ Object

Submit a tool job and block until the result is available.

Parameters:

  • tool_class_name (String)

    fully-qualified Ruby constant name of the tool class

  • args (Hash)

    tool arguments (deep-frozen before crossing Ractor boundary)

Returns:

  • (Object)

    the tool’s return value

Raises:

  • (ToolError)

    if the tool raises inside the Ractor



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/robot_lab/ractor_worker_pool.rb', line 41

def submit(tool_class_name, args)
  raise ToolError, 'Pool is shut down' if @closed

  reply_q = RactorQueue.new(capacity: 1)
  payload = RactorBoundary.freeze_deep({ tool_class: tool_class_name.to_s, args: args })

  job = RactorJob.new(
    id: SecureRandom.uuid.freeze,
    type: :tool,
    payload: payload,
    reply_queue: reply_q
  )

  @work_q.push(job)
  result = reply_q.pop

  raise ToolError, "Tool '#{tool_class_name}' failed in Ractor: #{result.message}" if result.is_a?(RactorJobError)

  result
end