Class: RobotLab::RactorWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/ractor_worker_pool.rb

Overview

A pool of Ractor workers that execute CPU-bound, Ractor-safe tools.

Work is distributed via a shared RactorQueue. Each worker runs a blocking loop, pops RactorJob instances, dispatches to the named tool class, and pushes the frozen result (or a RactorJobError) to the job’s per-job reply_queue.

Shutdown uses a poison-pill pattern: one nil sentinel per worker is pushed to the work queue; each worker exits when it pops nil.

Only tools that declare ractor_safe true should be submitted. Tool classes are instantiated fresh inside the Ractor for each call.

Examples:

pool = RactorWorkerPool.new(size: 4)
result = pool.submit("MyTool", { "arg" => "value" })
pool.shutdown

Constant Summary collapse

QUEUE_CAPACITY =
1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: :auto) ⇒ RactorWorkerPool

Returns a new instance of RactorWorkerPool.

Parameters:

  • size (Integer, :auto) (defaults to: :auto)

    number of workers (:auto = Etc.nprocessors)



28
29
30
31
32
33
# File 'lib/robot_lab/ractor_worker_pool.rb', line 28

def initialize(size: :auto)
  @size    = size == :auto ? Etc.nprocessors : size.to_i
  @closed  = false
  @work_q  = RactorQueue.new(capacity: QUEUE_CAPACITY)
  @workers = @size.times.map { spawn_worker(@work_q) }
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



25
26
27
# File 'lib/robot_lab/ractor_worker_pool.rb', line 25

def size
  @size
end

Instance Method Details

#shutdownvoid

This method returns an undefined value.

Gracefully shut down the pool via poison-pill pattern.



69
70
71
72
73
74
75
# File 'lib/robot_lab/ractor_worker_pool.rb', line 69

def shutdown
  return if @closed

  @closed = true
  @size.times { @work_q.push(nil) }
  @workers.each { |w| w.join rescue nil }
end

#submit(tool_class_name, args) ⇒ Object

Submit a tool job and block until the result is available.

Parameters:

  • tool_class_name (String)

    fully-qualified Ruby constant name of the tool class

  • args (Hash)

    tool arguments (deep-frozen before crossing Ractor boundary)

Returns:

  • (Object)

    the tool’s return value

Raises:

  • (ToolError)

    if the tool raises inside the Ractor



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/robot_lab/ractor_worker_pool.rb', line 41

def submit(tool_class_name, args)
  raise ToolError, "Pool is shut down" if @closed

  reply_q = RactorQueue.new(capacity: 1)
  payload = RactorBoundary.freeze_deep({
    tool_class: tool_class_name.to_s,
    args:       args
  })

  job = RactorJob.new(
    id:          SecureRandom.uuid.freeze,
    type:        :tool,
    payload:     payload,
    reply_queue: reply_q
  )

  @work_q.push(job)
  result = reply_q.pop

  if result.is_a?(RactorJobError)
    raise ToolError, "Tool '#{tool_class_name}' failed in Ractor: #{result.message}"
  end

  result
end