Class: Conductor::Worker::FiberTaskRunner
- Inherits:
-
Object
- Object
- Conductor::Worker::FiberTaskRunner
- Defined in:
- lib/conductor/worker/fiber_executor.rb
Overview
FiberTaskRunner - TaskRunner variant that uses FiberExecutor Runs within an async event loop for fiber-based concurrency
Constant Summary collapse
- RETRY_BACKOFFS =
Retry backoffs for task update (in seconds)
[0, 10, 20, 30].freeze
- MAX_BACKOFF_EXPONENT =
Maximum exponent for adaptive backoff
10- MAX_AUTH_BACKOFF_SECONDS =
Maximum auth failure backoff in seconds
60
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#initialize(worker, configuration:, event_dispatcher: nil, logger: nil) ⇒ FiberTaskRunner
constructor
Initialize FiberTaskRunner.
-
#run ⇒ Object
Main run loop - runs within async event loop.
-
#run_once(executor) ⇒ Object
Single iteration.
-
#shutdown ⇒ Object
Signal shutdown.
Constructor Details
#initialize(worker, configuration:, event_dispatcher: nil, logger: nil) ⇒ FiberTaskRunner
Initialize FiberTaskRunner
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/conductor/worker/fiber_executor.rb', line 149 def initialize(worker, configuration:, event_dispatcher: nil, logger: nil) @worker = worker @configuration = configuration || Configuration.new @event_dispatcher = event_dispatcher || Events::SyncEventDispatcher.new @logger = logger || create_default_logger # Resolve worker configuration resolved = WorkerConfig.resolve( worker.task_definition_name, (worker) ) @poll_interval = resolved[:poll_interval] @max_workers = resolved[:thread_count] # thread_count becomes fiber concurrency @worker_id = resolved[:worker_id] @domain = resolved[:domain] @poll_timeout = resolved[:poll_timeout] # State tracking @consecutive_empty_polls = 0 @auth_failures = 0 @last_auth_failure_time = nil @last_poll_time = nil @poll_count = 0 @shutdown = false end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
142 143 144 |
# File 'lib/conductor/worker/fiber_executor.rb', line 142 def worker @worker end |
Instance Method Details
#run ⇒ Object
Main run loop - runs within async event loop
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/conductor/worker/fiber_executor.rb', line 176 def run @logger.info("Starting FiberTaskRunner for '#{@worker.task_definition_name}' " \ "(fiber_concurrency=#{@max_workers})") # Create task client (using async-compatible HTTP if available) @task_client = Client::TaskClient.new(@configuration) # Create fiber executor @executor = FiberExecutor.new(@max_workers) # Start the async event loop @executor.start do |executor| until @shutdown begin run_once(executor) # Small sleep to prevent tight loop (async-friendly) sleep(0.001) rescue StandardError => e @logger.error("Error in fiber polling loop: #{e.}") sleep(1) end end end cleanup @logger.info('FiberTaskRunner stopped') end |
#run_once(executor) ⇒ Object
Single iteration
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/conductor/worker/fiber_executor.rb', line 206 def run_once(executor) # Check capacity return if executor.at_capacity? available_slots = @max_workers - executor.running_count # Adaptive backoff if @consecutive_empty_polls.positive? backoff_ms = calculate_adaptive_backoff elapsed_ms = @last_poll_time ? (Time.now - @last_poll_time) * 1000 : backoff_ms return if elapsed_ms < backoff_ms end # Poll for tasks @last_poll_time = Time.now tasks = batch_poll(available_slots) if tasks.empty? @consecutive_empty_polls += 1 else @consecutive_empty_polls = 0 tasks.each do |task| executor.submit { execute_and_update(task) } end publish_active_workers(executor) end end |
#shutdown ⇒ Object
Signal shutdown
235 236 237 238 |
# File 'lib/conductor/worker/fiber_executor.rb', line 235 def shutdown @shutdown = true @executor&.shutdown end |