Class: Conductor::Worker::TaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/worker/task_runner.rb

Overview

TaskRunner - The core polling loop that runs in a dedicated thread Implements batch polling, adaptive backoff, capacity management, and event publishing

Constant Summary collapse

RETRY_BACKOFFS =

Retry backoffs for task update (in seconds)

[0, 10, 20, 30].freeze
MAX_BACKOFF_EXPONENT =

Maximum exponent for adaptive backoff to prevent overflow

10
MAX_AUTH_BACKOFF_SECONDS =

Maximum auth failure backoff in seconds

60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, configuration:, event_dispatcher: nil, logger: nil) ⇒ TaskRunner

Initialize TaskRunner for a specific worker

Parameters:

  • worker (Worker)

    The worker instance

  • configuration (Configuration)

    Conductor configuration

  • event_dispatcher (SyncEventDispatcher) (defaults to: nil)

    Shared event dispatcher

  • logger (Logger) (defaults to: nil)

    Logger instance



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/conductor/worker/task_runner.rb', line 38

def initialize(worker, configuration:, event_dispatcher: nil, logger: nil)
  @worker = worker
  @configuration = configuration || Configuration.new
  @event_dispatcher = event_dispatcher || Events::SyncEventDispatcher.new
  @logger = logger || create_default_logger

  # Create task client for API communication
  @task_client = Client::TaskClient.new(@configuration)

  # Resolve worker configuration
  resolved_config = WorkerConfig.resolve(
    worker.task_definition_name,
    extract_worker_options(worker)
  )
  apply_resolved_config(resolved_config)

  # Create thread pool executor for task execution
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads: 1,
    max_threads: @max_workers,
    max_queue: @max_workers * 2,
    fallback_policy: :caller_runs
  )

  # State tracking
  @running_tasks = Concurrent::Set.new
  @consecutive_empty_polls = Concurrent::AtomicFixnum.new(0)
  @auth_failures = Concurrent::AtomicFixnum.new(0)
  @last_auth_failure_time = nil
  @last_poll_time = nil
  @poll_count = Concurrent::AtomicFixnum.new(0)
  @shutdown = Concurrent::AtomicBoolean.new(false)
  @mutex = Mutex.new
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



31
32
33
# File 'lib/conductor/worker/task_runner.rb', line 31

def running
  @running
end

#workerObject (readonly)

Returns the value of attribute worker.



31
32
33
# File 'lib/conductor/worker/task_runner.rb', line 31

def worker
  @worker
end

Instance Method Details

#runObject

Main polling loop (runs until shutdown)



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/conductor/worker/task_runner.rb', line 74

def run
  @logger.info("Starting TaskRunner for '#{@worker.task_definition_name}' " \
               "(thread_count=#{@max_workers}, poll_interval=#{@poll_interval}ms)")

  # Register task definition if configured
  register_task_definition if @worker.register_task_def

  until @shutdown.true?
    begin
      run_once
    rescue StandardError => e
      @logger.error("Error in polling loop: #{e.message}")
      @logger.debug(e.backtrace.join("\n")) if e.backtrace
      sleep(1) # Brief pause before retrying
    end
  end

  cleanup
  @logger.info("TaskRunner for '#{@worker.task_definition_name}' stopped")
end

#run_onceObject

Single iteration of the polling loop



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/conductor/worker/task_runner.rb', line 96

def run_once
  # 1. Cleanup completed tasks
  cleanup_completed_tasks

  # 2. Check capacity
  current_capacity = @running_tasks.size
  if current_capacity >= @max_workers
    sleep(0.001) # 1ms sleep to prevent busy-waiting
    return
  end

  available_slots = @max_workers - current_capacity

  # 3. Adaptive backoff for empty polls
  if @consecutive_empty_polls.value.positive?
    backoff_ms = calculate_adaptive_backoff
    elapsed_ms = @last_poll_time ? (Time.now - @last_poll_time) * 1000 : backoff_ms

    if elapsed_ms < backoff_ms
      sleep_time = (backoff_ms - elapsed_ms) / 1000.0
      sleep([sleep_time, 0.001].max)
      return
    end
  end

  # 4. Batch poll for tasks
  @last_poll_time = Time.now
  tasks = batch_poll(available_slots)

  # 5. Submit tasks for execution
  if tasks.empty?
    @consecutive_empty_polls.increment
  else
    @consecutive_empty_polls.value = 0
    tasks.each do |task|
      submit_task(task)
    end
  end
end

#running?Boolean

Check if runner is running

Returns:

  • (Boolean)


143
144
145
# File 'lib/conductor/worker/task_runner.rb', line 143

def running?
  !@shutdown.true?
end

#shutdownObject

Signal the runner to stop



137
138
139
# File 'lib/conductor/worker/task_runner.rb', line 137

def shutdown
  @shutdown.make_true
end