Class: Conductor::Worker::TaskRunner
- Inherits:
-
Object
- Object
- Conductor::Worker::TaskRunner
- Defined in:
- lib/conductor/worker/task_runner.rb
Overview
TaskRunner - The core polling loop that runs in a dedicated thread Implements batch polling, adaptive backoff, capacity management, and event publishing
Constant Summary collapse
- RETRY_BACKOFFS =
Retry backoffs for task update (in seconds)
[0, 10, 20, 30].freeze
- MAX_BACKOFF_EXPONENT =
Maximum exponent for adaptive backoff to prevent overflow
10- MAX_AUTH_BACKOFF_SECONDS =
Maximum auth failure backoff in seconds
60
Instance Attribute Summary collapse
-
#running ⇒ Object
readonly
Returns the value of attribute running.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#initialize(worker, configuration:, event_dispatcher: nil, logger: nil) ⇒ TaskRunner
constructor
Initialize TaskRunner for a specific worker.
-
#run ⇒ Object
Main polling loop (runs until shutdown).
-
#run_once ⇒ Object
Single iteration of the polling loop.
-
#running? ⇒ Boolean
Check if runner is running.
-
#shutdown ⇒ Object
Signal the runner to stop.
Constructor Details
#initialize(worker, configuration:, event_dispatcher: nil, logger: nil) ⇒ TaskRunner
Initialize TaskRunner for a specific worker
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/conductor/worker/task_runner.rb', line 38 def initialize(worker, configuration:, event_dispatcher: nil, logger: nil) @worker = worker @configuration = configuration || Configuration.new @event_dispatcher = event_dispatcher || Events::SyncEventDispatcher.new @logger = logger || create_default_logger # Create task client for API communication @task_client = Client::TaskClient.new(@configuration) # Resolve worker configuration resolved_config = WorkerConfig.resolve( worker.task_definition_name, (worker) ) apply_resolved_config(resolved_config) # Create thread pool executor for task execution @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: @max_workers, max_queue: @max_workers * 2, fallback_policy: :caller_runs ) # State tracking @running_tasks = Concurrent::Set.new @consecutive_empty_polls = Concurrent::AtomicFixnum.new(0) @auth_failures = Concurrent::AtomicFixnum.new(0) @last_auth_failure_time = nil @last_poll_time = nil @poll_count = Concurrent::AtomicFixnum.new(0) @shutdown = Concurrent::AtomicBoolean.new(false) @mutex = Mutex.new end |
Instance Attribute Details
#running ⇒ Object (readonly)
Returns the value of attribute running.
31 32 33 |
# File 'lib/conductor/worker/task_runner.rb', line 31 def running @running end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
31 32 33 |
# File 'lib/conductor/worker/task_runner.rb', line 31 def worker @worker end |
Instance Method Details
#run ⇒ Object
Main polling loop (runs until shutdown)
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/conductor/worker/task_runner.rb', line 74 def run @logger.info("Starting TaskRunner for '#{@worker.task_definition_name}' " \ "(thread_count=#{@max_workers}, poll_interval=#{@poll_interval}ms)") # Register task definition if configured register_task_definition if @worker.register_task_def until @shutdown.true? begin run_once rescue StandardError => e @logger.error("Error in polling loop: #{e.}") @logger.debug(e.backtrace.join("\n")) if e.backtrace sleep(1) # Brief pause before retrying end end cleanup @logger.info("TaskRunner for '#{@worker.task_definition_name}' stopped") end |
#run_once ⇒ Object
Single iteration of the polling loop
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/conductor/worker/task_runner.rb', line 96 def run_once # 1. Cleanup completed tasks cleanup_completed_tasks # 2. Check capacity current_capacity = @running_tasks.size if current_capacity >= @max_workers sleep(0.001) # 1ms sleep to prevent busy-waiting return end available_slots = @max_workers - current_capacity # 3. Adaptive backoff for empty polls if @consecutive_empty_polls.value.positive? backoff_ms = calculate_adaptive_backoff elapsed_ms = @last_poll_time ? (Time.now - @last_poll_time) * 1000 : backoff_ms if elapsed_ms < backoff_ms sleep_time = (backoff_ms - elapsed_ms) / 1000.0 sleep([sleep_time, 0.001].max) return end end # 4. Batch poll for tasks @last_poll_time = Time.now tasks = batch_poll(available_slots) # 5. Submit tasks for execution if tasks.empty? @consecutive_empty_polls.increment else @consecutive_empty_polls.value = 0 tasks.each do |task| submit_task(task) end end end |
#running? ⇒ Boolean
Check if runner is running
143 144 145 |
# File 'lib/conductor/worker/task_runner.rb', line 143 def running? !@shutdown.true? end |
#shutdown ⇒ Object
Signal the runner to stop
137 138 139 |
# File 'lib/conductor/worker/task_runner.rb', line 137 def shutdown @shutdown.make_true end |