Class: Conductor::Worker::TaskHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/worker/task_handler.rb

Overview

TaskHandler - The top-level orchestrator that manages all workers Creates one Thread per worker, each running a TaskRunner

Supports multiple execution modes based on worker configuration:

  • :thread (default) - Thread-based with ThreadPoolExecutor
  • :ractor - Ractor-based for true parallelism (Ruby 3.1+)
  • :fiber - Fiber-based with async gem for high I/O concurrency

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workers: nil, configuration: nil, scan_for_annotated_workers: true, import_modules: nil, event_listeners: nil, logger: nil, register_task_definitions: false) ⇒ TaskHandler

Initialize TaskHandler

Parameters:

  • workers (Array<Worker>, nil) (defaults to: nil)

    Pre-created worker instances

  • configuration (Configuration, nil) (defaults to: nil)

    Conductor configuration

  • scan_for_annotated_workers (Boolean) (defaults to: true)

    Auto-discover workers from registry

  • import_modules (Array<String>, nil) (defaults to: nil)

    Ruby files to require (triggers registration)

  • event_listeners (Array<Object>, nil) (defaults to: nil)

    Custom event listeners

  • logger (Logger, nil) (defaults to: nil)

    Logger instance

  • register_task_definitions (Boolean) (defaults to: false)

    Auto-register task definitions on start



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/conductor/worker/task_handler.rb', line 32

def initialize(
  workers: nil,
  configuration: nil,
  scan_for_annotated_workers: true,
  import_modules: nil,
  event_listeners: nil,
  logger: nil,
  register_task_definitions: false
)
  @configuration = configuration || Configuration.new
  @logger = logger || create_default_logger
  @event_dispatcher = Events::SyncEventDispatcher.new
  @workers = []
  @threads = []
  @runners = []
  @ractors = [] # For Ractor-based workers
  @running = false
  @mutex = Mutex.new
  @register_task_definitions = register_task_definitions
  @event_listeners = []

  # Register event listeners
  register_listeners(event_listeners) if event_listeners

  # Import modules (triggers worker_task registrations)
  import_worker_modules(import_modules) if import_modules

  # Discover workers from registry
  discover_registered_workers if scan_for_annotated_workers

  # Add provided workers
  add_workers(workers) if workers
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



22
23
24
# File 'lib/conductor/worker/task_handler.rb', line 22

def configuration
  @configuration
end

#event_dispatcherObject (readonly)

Returns the value of attribute event_dispatcher.



22
23
24
# File 'lib/conductor/worker/task_handler.rb', line 22

def event_dispatcher
  @event_dispatcher
end

#workersObject (readonly)

Returns the value of attribute workers.



22
23
24
# File 'lib/conductor/worker/task_handler.rb', line 22

def workers
  @workers
end

Class Method Details

.run(workers: nil, configuration: nil, **options) {|self| ... } ⇒ Object

Context manager pattern - execute block and stop on exit

Yields:

  • (self)

Returns:

  • (Object)

    Block return value



296
297
298
299
300
301
302
303
# File 'lib/conductor/worker/task_handler.rb', line 296

def self.run(workers: nil, configuration: nil, **options)
  handler = new(workers: workers, configuration: configuration, **options)
  begin
    yield handler if block_given?
  ensure
    handler.stop
  end
end

Instance Method Details

#add_worker(worker) ⇒ self

Add a single worker

Parameters:

  • worker (Worker)

    Worker to add

Returns:

  • (self)


77
78
79
80
81
82
# File 'lib/conductor/worker/task_handler.rb', line 77

def add_worker(worker)
  @mutex.synchronize do
    @workers << worker
  end
  self
end

#add_workers(workers) ⇒ self

Add workers to the handler

Parameters:

  • workers (Array<Worker>)

    Workers to add

Returns:

  • (self)


69
70
71
72
# File 'lib/conductor/worker/task_handler.rb', line 69

def add_workers(workers)
  workers.each { |w| add_worker(w) }
  self
end

#joinself

Wait for all worker threads to complete (blocking)

Returns:

  • (self)


276
277
278
279
# File 'lib/conductor/worker/task_handler.rb', line 276

def join
  @threads.each(&:join)
  self
end

#running?Boolean

Check if handler is running

Returns:

  • (Boolean)


283
284
285
# File 'lib/conductor/worker/task_handler.rb', line 283

def running?
  @running
end

#startself

Start all worker threads

Returns:

  • (self)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/conductor/worker/task_handler.rb', line 86

def start
  @mutex.synchronize do
    return self if @running

    @running = true

    # Register task definitions if enabled
    register_all_task_definitions if @register_task_definitions

    @workers.each do |worker|
      start_worker(worker)
    end

    @logger.info("TaskHandler started with #{@workers.size} workers")
  end

  self
end

#stop(timeout: 5) ⇒ self

Stop all workers gracefully

Parameters:

  • timeout (Integer) (defaults to: 5)

    Seconds to wait before force-killing threads

Returns:

  • (self)


237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/conductor/worker/task_handler.rb', line 237

def stop(timeout: 5)
  @mutex.synchronize do
    return self unless @running

    @logger.info('Stopping TaskHandler...')

    # Signal all runners to shutdown
    @runners.each(&:shutdown)

    # Wait for threads to finish
    @threads.each do |thread|
      thread.join(timeout)
      thread.kill if thread.alive?
    end

    # Shutdown Ractors
    @ractors.each do |ractor|
      # Ractors don't have a clean shutdown mechanism
      # They'll be GC'd when no longer referenced
      ractor.take if ractor.respond_to?(:take)
    rescue Ractor::ClosedError, Ractor::RemoteError
      # Ractor already finished
    end

    @runners.clear
    @threads.clear
    @ractors.clear

    stop_event_listeners
    @running = false

    @logger.info('TaskHandler stopped')
  end

  self
end

#worker_namesArray<String>

Get list of worker names

Returns:

  • (Array<String>)


289
290
291
# File 'lib/conductor/worker/task_handler.rb', line 289

def worker_names
  @workers.map(&:task_definition_name)
end