Class: Conductor::Worker::TaskHandler
- Inherits:
-
Object
- Object
- Conductor::Worker::TaskHandler
- Defined in:
- lib/conductor/worker/task_handler.rb
Overview
TaskHandler - The top-level orchestrator that manages all workers Creates one Thread per worker, each running a TaskRunner
Supports multiple execution modes based on worker configuration:
- :thread (default) - Thread-based with ThreadPoolExecutor
- :ractor - Ractor-based for true parallelism (Ruby 3.1+)
- :fiber - Fiber-based with async gem for high I/O concurrency
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
-
#event_dispatcher ⇒ Object
readonly
Returns the value of attribute event_dispatcher.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
-
.run(workers: nil, configuration: nil, **options) {|self| ... } ⇒ Object
Context manager pattern - execute block and stop on exit.
Instance Method Summary collapse
-
#add_worker(worker) ⇒ self
Add a single worker.
-
#add_workers(workers) ⇒ self
Add workers to the handler.
-
#initialize(workers: nil, configuration: nil, scan_for_annotated_workers: true, import_modules: nil, event_listeners: nil, logger: nil, register_task_definitions: false) ⇒ TaskHandler
constructor
Initialize TaskHandler.
-
#join ⇒ self
Wait for all worker threads to complete (blocking).
-
#running? ⇒ Boolean
Check if handler is running.
-
#start ⇒ self
Start all worker threads.
-
#stop(timeout: 5) ⇒ self
Stop all workers gracefully.
-
#worker_names ⇒ Array<String>
Get list of worker names.
Constructor Details
#initialize(workers: nil, configuration: nil, scan_for_annotated_workers: true, import_modules: nil, event_listeners: nil, logger: nil, register_task_definitions: false) ⇒ TaskHandler
Initialize TaskHandler
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/conductor/worker/task_handler.rb', line 32 def initialize( workers: nil, configuration: nil, scan_for_annotated_workers: true, import_modules: nil, event_listeners: nil, logger: nil, register_task_definitions: false ) @configuration = configuration || Configuration.new @logger = logger || create_default_logger @event_dispatcher = Events::SyncEventDispatcher.new @workers = [] @threads = [] @runners = [] @ractors = [] # For Ractor-based workers @running = false @mutex = Mutex.new @register_task_definitions = register_task_definitions @event_listeners = [] # Register event listeners register_listeners(event_listeners) if event_listeners # Import modules (triggers worker_task registrations) import_worker_modules(import_modules) if import_modules # Discover workers from registry discover_registered_workers if scan_for_annotated_workers # Add provided workers add_workers(workers) if workers end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
22 23 24 |
# File 'lib/conductor/worker/task_handler.rb', line 22 def configuration @configuration end |
#event_dispatcher ⇒ Object (readonly)
Returns the value of attribute event_dispatcher.
22 23 24 |
# File 'lib/conductor/worker/task_handler.rb', line 22 def event_dispatcher @event_dispatcher end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
22 23 24 |
# File 'lib/conductor/worker/task_handler.rb', line 22 def workers @workers end |
Class Method Details
.run(workers: nil, configuration: nil, **options) {|self| ... } ⇒ Object
Context manager pattern - execute block and stop on exit
296 297 298 299 300 301 302 303 |
# File 'lib/conductor/worker/task_handler.rb', line 296 def self.run(workers: nil, configuration: nil, **) handler = new(workers: workers, configuration: configuration, **) begin yield handler if block_given? ensure handler.stop end end |
Instance Method Details
#add_worker(worker) ⇒ self
Add a single worker
77 78 79 80 81 82 |
# File 'lib/conductor/worker/task_handler.rb', line 77 def add_worker(worker) @mutex.synchronize do @workers << worker end self end |
#add_workers(workers) ⇒ self
Add workers to the handler
69 70 71 72 |
# File 'lib/conductor/worker/task_handler.rb', line 69 def add_workers(workers) workers.each { |w| add_worker(w) } self end |
#join ⇒ self
Wait for all worker threads to complete (blocking)
276 277 278 279 |
# File 'lib/conductor/worker/task_handler.rb', line 276 def join @threads.each(&:join) self end |
#running? ⇒ Boolean
Check if handler is running
283 284 285 |
# File 'lib/conductor/worker/task_handler.rb', line 283 def running? @running end |
#start ⇒ self
Start all worker threads
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/conductor/worker/task_handler.rb', line 86 def start @mutex.synchronize do return self if @running @running = true # Register task definitions if enabled register_all_task_definitions if @register_task_definitions @workers.each do |worker| start_worker(worker) end @logger.info("TaskHandler started with #{@workers.size} workers") end self end |
#stop(timeout: 5) ⇒ self
Stop all workers gracefully
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/conductor/worker/task_handler.rb', line 237 def stop(timeout: 5) @mutex.synchronize do return self unless @running @logger.info('Stopping TaskHandler...') # Signal all runners to shutdown @runners.each(&:shutdown) # Wait for threads to finish @threads.each do |thread| thread.join(timeout) thread.kill if thread.alive? end # Shutdown Ractors @ractors.each do |ractor| # Ractors don't have a clean shutdown mechanism # They'll be GC'd when no longer referenced ractor.take if ractor.respond_to?(:take) rescue Ractor::ClosedError, Ractor::RemoteError # Ractor already finished end @runners.clear @threads.clear @ractors.clear stop_event_listeners @running = false @logger.info('TaskHandler stopped') end self end |
#worker_names ⇒ Array<String>
Get list of worker names
289 290 291 |
# File 'lib/conductor/worker/task_handler.rb', line 289 def worker_names @workers.map(&:task_definition_name) end |