Class: Conductor::Worker::RactorTaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/worker/ractor_task_runner.rb

Overview

RactorTaskRunner - Ractor-based runner for CPU-bound workers Provides true parallelism by running in isolated Ractors (no GVL sharing)

Key differences from TaskRunner:

  • Creates HTTP client INSIDE the Ractor (can't be shared)
  • Sequential task execution within each Ractor
  • Events sent to main thread via Ractor messaging
  • Parallelism comes from multiple Ractors (thread_count = Ractor count)
  • Requires Ruby 3.1+

Examples:

worker = Worker.new('cpu_task', isolation: :ractor, thread_count: 4) { |t| heavy_computation(t) }
handler = TaskHandler.new(workers: [worker])
handler.start

Constant Summary collapse

RETRY_BACKOFFS =

Retry backoffs for task update (in seconds)

[0, 10, 20, 30].freeze
MAX_BACKOFF_EXPONENT =

Maximum exponent for adaptive backoff

10
MAX_AUTH_BACKOFF_SECONDS =

Maximum auth failure backoff in seconds

60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, configuration:, ractor_id: 0, event_queue: nil) ⇒ RactorTaskRunner

Initialize RactorTaskRunner Note: HTTP client is created inside run() after Ractor starts

Parameters:

  • worker (Worker)

    The worker instance (must be Ractor-safe)

  • configuration (Configuration)

    Conductor configuration (serializable parts)

  • ractor_id (Integer) (defaults to: 0)

    Identifier for this Ractor instance

  • event_queue (Ractor) (defaults to: nil)

    Main Ractor to send events to (optional)



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/conductor/worker/ractor_task_runner.rb', line 47

def initialize(worker, configuration:, ractor_id: 0, event_queue: nil)
  @worker = worker
  @configuration_hash = serialize_configuration(configuration)
  @ractor_id = ractor_id
  @event_queue = event_queue

  # These will be created inside the Ractor
  @task_client = nil
  @logger = nil

  # State tracking (will be initialized in run)
  @consecutive_empty_polls = 0
  @auth_failures = 0
  @last_auth_failure_time = nil
  @last_poll_time = nil
  @poll_count = 0
  @shutdown = false
end

Instance Attribute Details

#ractor_idObject (readonly)

Returns the value of attribute ractor_id.



39
40
41
# File 'lib/conductor/worker/ractor_task_runner.rb', line 39

def ractor_id
  @ractor_id
end

#workerObject (readonly)

Returns the value of attribute worker.



39
40
41
# File 'lib/conductor/worker/ractor_task_runner.rb', line 39

def worker
  @worker
end

Instance Method Details

#runObject

Main polling loop - runs inside a Ractor Creates HTTP client after Ractor starts (can't be passed in)



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/conductor/worker/ractor_task_runner.rb', line 68

def run
  setup_ractor_resources
  @logger.info("[Ractor #{@ractor_id}] Starting RactorTaskRunner for '#{@worker.task_definition_name}'")

  until @shutdown
    begin
      run_once
    rescue StandardError => e
      @logger.error("[Ractor #{@ractor_id}] Error in polling loop: #{e.message}")
      sleep(1)
    end
  end

  cleanup
  @logger.info("[Ractor #{@ractor_id}] RactorTaskRunner stopped")
end

#run_onceObject

Single iteration of the polling loop



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/conductor/worker/ractor_task_runner.rb', line 86

def run_once
  # Adaptive backoff for empty polls
  if @consecutive_empty_polls.positive?
    backoff_ms = calculate_adaptive_backoff
    elapsed_ms = @last_poll_time ? (Time.now - @last_poll_time) * 1000 : backoff_ms

    if elapsed_ms < backoff_ms
      sleep((backoff_ms - elapsed_ms) / 1000.0)
      return
    end
  end

  # Poll for a single task (Ractor processes sequentially)
  @last_poll_time = Time.now
  task = poll_task

  if task.nil?
    @consecutive_empty_polls += 1
  else
    @consecutive_empty_polls = 0
    execute_and_update(task)
  end
end

#shutdownObject

Signal shutdown



111
112
113
# File 'lib/conductor/worker/ractor_task_runner.rb', line 111

def shutdown
  @shutdown = true
end