Class: RobotLab::RactorWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/ractor_worker_pool.rb

Overview

A pool of Ractor workers that execute CPU-bound, Ractor-safe tools.

Work is distributed via a shared RactorQueue. Each worker runs a blocking loop, pops RactorJob instances, dispatches to the named tool class, and pushes the frozen result (or a RactorJobError) to the job’s per-job reply_queue.

Shutdown uses a poison-pill pattern: one nil sentinel per worker is pushed to the work queue; each worker exits when it pops nil.

Only tools that declare ractor_safe true should be submitted. Tool classes are instantiated fresh inside the Ractor for each call.

Examples:

pool = RactorWorkerPool.new(size: 4)
result = pool.submit("MyTool", { "arg" => "value" })
pool.shutdown

Constant Summary collapse

QUEUE_CAPACITY =

Capacity of the shared work queue.

1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: :auto) ⇒ RactorWorkerPool

Creates a new pool and starts worker Ractors immediately.

Parameters:

  • size (Integer, :auto) (defaults to: :auto)

    number of workers (:auto = Etc.nprocessors)



35
36
37
38
39
40
# File 'lib/robot_lab/ractor_worker_pool.rb', line 35

def initialize(size: :auto)
  @size    = size == :auto ? Etc.nprocessors : size.to_i
  @closed  = false
  @work_q  = RactorQueue.new(capacity: QUEUE_CAPACITY)
  @workers = @size.times.map { spawn_worker(@work_q) }
end

Instance Attribute Details

#sizeInteger (readonly)

Returns number of worker Ractors.

Returns:

  • (Integer)

    number of worker Ractors



30
31
32
# File 'lib/robot_lab/ractor_worker_pool.rb', line 30

def size
  @size
end

Instance Method Details

#shutdownvoid

This method returns an undefined value.

Gracefully shut down the pool.

Pushes one nil poison pill per worker so each exits its loop. Waits for all workers to terminate.



81
82
83
84
85
86
87
88
# File 'lib/robot_lab/ractor_worker_pool.rb', line 81

def shutdown
  return if @closed

  @closed = true
  # Push one nil poison pill per worker
  @size.times { @work_q.push(nil) }
  @workers.each { |w| w.join rescue nil }
end

#submit(tool_class_name, args) ⇒ Object

Submit a tool job and block until the result is available.

Parameters:

  • tool_class_name (String)

    fully-qualified Ruby constant name of the tool class

  • args (Hash)

    tool arguments (deep-frozen before crossing Ractor boundary)

Returns:

  • (Object)

    the tool’s return value

Raises:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/robot_lab/ractor_worker_pool.rb', line 49

def submit(tool_class_name, args)
  raise ToolError, "Pool is shut down" if @closed

  reply_q = RactorQueue.new(capacity: 1)
  payload = RactorBoundary.freeze_deep({
    tool_class: tool_class_name.to_s,
    args: args
  })

  job = RactorJob.new(
    id:          SecureRandom.uuid.freeze,
    type:        :tool,
    payload:     payload,
    reply_queue: reply_q
  )

  @work_q.push(job)
  result = reply_q.pop

  if result.is_a?(RactorJobError)
    raise ToolError, "Tool '#{tool_class_name}' failed in Ractor: #{result.message}"
  end

  result
end