Class: Conductor::Worker::FiberTaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/worker/fiber_executor.rb

Overview

FiberTaskRunner - TaskRunner variant that uses FiberExecutor Runs within an async event loop for fiber-based concurrency

Constant Summary collapse

RETRY_BACKOFFS =

Retry backoffs for task update (in seconds)

[0, 10, 20, 30].freeze
MAX_BACKOFF_EXPONENT =

Maximum exponent for adaptive backoff

10
MAX_AUTH_BACKOFF_SECONDS =

Maximum auth failure backoff in seconds

60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, configuration:, event_dispatcher: nil, logger: nil) ⇒ FiberTaskRunner

Initialize FiberTaskRunner

Parameters:

  • worker (Worker)

    The worker instance

  • configuration (Configuration)

    Conductor configuration

  • event_dispatcher (SyncEventDispatcher) (defaults to: nil)

    Event dispatcher

  • logger (Logger) (defaults to: nil)

    Logger instance



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/conductor/worker/fiber_executor.rb', line 149

def initialize(worker, configuration:, event_dispatcher: nil, logger: nil)
  @worker = worker
  @configuration = configuration || Configuration.new
  @event_dispatcher = event_dispatcher || Events::SyncEventDispatcher.new
  @logger = logger || create_default_logger

  # Resolve worker configuration
  resolved = WorkerConfig.resolve(
    worker.task_definition_name,
    extract_worker_options(worker)
  )
  @poll_interval = resolved[:poll_interval]
  @max_workers = resolved[:thread_count] # thread_count becomes fiber concurrency
  @worker_id = resolved[:worker_id]
  @domain = resolved[:domain]
  @poll_timeout = resolved[:poll_timeout]

  # State tracking
  @consecutive_empty_polls = 0
  @auth_failures = 0
  @last_auth_failure_time = nil
  @last_poll_time = nil
  @poll_count = 0
  @shutdown = false
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



142
143
144
# File 'lib/conductor/worker/fiber_executor.rb', line 142

def worker
  @worker
end

Instance Method Details

#runObject

Main run loop - runs within async event loop



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/conductor/worker/fiber_executor.rb', line 176

def run
  @logger.info("Starting FiberTaskRunner for '#{@worker.task_definition_name}' " \
               "(fiber_concurrency=#{@max_workers})")

  # Create task client (using async-compatible HTTP if available)
  @task_client = Client::TaskClient.new(@configuration)

  # Create fiber executor
  @executor = FiberExecutor.new(@max_workers)

  # Start the async event loop
  @executor.start do |executor|
    until @shutdown
      begin
        run_once(executor)
        # Small sleep to prevent tight loop (async-friendly)
        sleep(0.001)
      rescue StandardError => e
        @logger.error("Error in fiber polling loop: #{e.message}")
        sleep(1)
      end
    end
  end

  cleanup
  @logger.info('FiberTaskRunner stopped')
end

#run_once(executor) ⇒ Object

Single iteration

Parameters:



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/conductor/worker/fiber_executor.rb', line 206

def run_once(executor)
  # Check capacity
  return if executor.at_capacity?

  available_slots = @max_workers - executor.running_count

  # Adaptive backoff
  if @consecutive_empty_polls.positive?
    backoff_ms = calculate_adaptive_backoff
    elapsed_ms = @last_poll_time ? (Time.now - @last_poll_time) * 1000 : backoff_ms
    return if elapsed_ms < backoff_ms
  end

  # Poll for tasks
  @last_poll_time = Time.now
  tasks = batch_poll(available_slots)

  if tasks.empty?
    @consecutive_empty_polls += 1
  else
    @consecutive_empty_polls = 0
    tasks.each do |task|
      executor.submit { execute_and_update(task) }
    end
    publish_active_workers(executor)
  end
end

#shutdownObject

Signal shutdown



235
236
237
238
# File 'lib/conductor/worker/fiber_executor.rb', line 235

def shutdown
  @shutdown = true
  @executor&.shutdown
end