Class: Conductor::Worker::RactorTaskRunner
- Inherits:
-
Object
- Object
- Conductor::Worker::RactorTaskRunner
- Defined in:
- lib/conductor/worker/ractor_task_runner.rb
Overview
RactorTaskRunner - Ractor-based runner for CPU-bound workers Provides true parallelism by running in isolated Ractors (no GVL sharing)
Key differences from TaskRunner:
- Creates HTTP client INSIDE the Ractor (can't be shared)
- Sequential task execution within each Ractor
- Events sent to main thread via Ractor messaging
- Parallelism comes from multiple Ractors (thread_count = Ractor count)
- Requires Ruby 3.1+
Constant Summary collapse
- RETRY_BACKOFFS =
Retry backoffs for task update (in seconds)
[0, 10, 20, 30].freeze
- MAX_BACKOFF_EXPONENT =
Maximum exponent for adaptive backoff
10- MAX_AUTH_BACKOFF_SECONDS =
Maximum auth failure backoff in seconds
60
Instance Attribute Summary collapse
-
#ractor_id ⇒ Object
readonly
Returns the value of attribute ractor_id.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#initialize(worker, configuration:, ractor_id: 0, event_queue: nil) ⇒ RactorTaskRunner
constructor
Initialize RactorTaskRunner Note: HTTP client is created inside run() after Ractor starts.
-
#run ⇒ Object
Main polling loop - runs inside a Ractor Creates HTTP client after Ractor starts (can't be passed in).
-
#run_once ⇒ Object
Single iteration of the polling loop.
-
#shutdown ⇒ Object
Signal shutdown.
Constructor Details
#initialize(worker, configuration:, ractor_id: 0, event_queue: nil) ⇒ RactorTaskRunner
Initialize RactorTaskRunner Note: HTTP client is created inside run() after Ractor starts
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/conductor/worker/ractor_task_runner.rb', line 47 def initialize(worker, configuration:, ractor_id: 0, event_queue: nil) @worker = worker @configuration_hash = serialize_configuration(configuration) @ractor_id = ractor_id @event_queue = event_queue # These will be created inside the Ractor @task_client = nil @logger = nil # State tracking (will be initialized in run) @consecutive_empty_polls = 0 @auth_failures = 0 @last_auth_failure_time = nil @last_poll_time = nil @poll_count = 0 @shutdown = false end |
Instance Attribute Details
#ractor_id ⇒ Object (readonly)
Returns the value of attribute ractor_id.
39 40 41 |
# File 'lib/conductor/worker/ractor_task_runner.rb', line 39 def ractor_id @ractor_id end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
39 40 41 |
# File 'lib/conductor/worker/ractor_task_runner.rb', line 39 def worker @worker end |
Instance Method Details
#run ⇒ Object
Main polling loop - runs inside a Ractor Creates HTTP client after Ractor starts (can't be passed in)
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/conductor/worker/ractor_task_runner.rb', line 68 def run setup_ractor_resources @logger.info("[Ractor #{@ractor_id}] Starting RactorTaskRunner for '#{@worker.task_definition_name}'") until @shutdown begin run_once rescue StandardError => e @logger.error("[Ractor #{@ractor_id}] Error in polling loop: #{e.}") sleep(1) end end cleanup @logger.info("[Ractor #{@ractor_id}] RactorTaskRunner stopped") end |
#run_once ⇒ Object
Single iteration of the polling loop
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/conductor/worker/ractor_task_runner.rb', line 86 def run_once # Adaptive backoff for empty polls if @consecutive_empty_polls.positive? backoff_ms = calculate_adaptive_backoff elapsed_ms = @last_poll_time ? (Time.now - @last_poll_time) * 1000 : backoff_ms if elapsed_ms < backoff_ms sleep((backoff_ms - elapsed_ms) / 1000.0) return end end # Poll for a single task (Ractor processes sequentially) @last_poll_time = Time.now task = poll_task if task.nil? @consecutive_empty_polls += 1 else @consecutive_empty_polls = 0 execute_and_update(task) end end |
#shutdown ⇒ Object
Signal shutdown
111 112 113 |
# File 'lib/conductor/worker/ractor_task_runner.rb', line 111 def shutdown @shutdown = true end |