Class: Conductor::Worker::FiberExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/worker/fiber_executor.rb

Overview

FiberExecutor - Fiber-based executor using the async gem Provides lightweight cooperative concurrency for high I/O workloads

Unlike ThreadPoolExecutor which uses OS threads (~8KB each), FiberExecutor uses fibers (~400 bytes each), enabling thousands of concurrent tasks within a single thread.

Requirements:

  • async gem must be installed (optional dependency)
  • All I/O must be non-blocking (use async-compatible libraries)

Examples:

worker = Worker.new('io_task', executor: :fiber, thread_count: 100) { |t| async_http_call(t) }
handler = TaskHandler.new(workers: [worker])
handler.start

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_concurrency) ⇒ FiberExecutor

Initialize FiberExecutor

Parameters:

  • max_concurrency (Integer)

    Maximum concurrent fibers (semaphore limit)



25
26
27
28
29
30
31
32
33
34
# File 'lib/conductor/worker/fiber_executor.rb', line 25

def initialize(max_concurrency)
  @max_concurrency = max_concurrency
  @running_fibers = []
  @semaphore = nil
  @scheduler = nil
  @shutdown = false

  # Lazy-load the async gem
  load_async_gem
end

Instance Attribute Details

#max_concurrencyObject (readonly)

Returns the value of attribute max_concurrency.



21
22
23
# File 'lib/conductor/worker/fiber_executor.rb', line 21

def max_concurrency
  @max_concurrency
end

Instance Method Details

#at_capacity?Boolean

Check if at capacity

Returns:

  • (Boolean)


66
67
68
# File 'lib/conductor/worker/fiber_executor.rb', line 66

def at_capacity?
  running_count >= @max_concurrency
end

#running_countInteger

Get current number of running fibers

Returns:

  • (Integer)


59
60
61
62
# File 'lib/conductor/worker/fiber_executor.rb', line 59

def running_count
  cleanup_completed_fibers
  @running_fibers.size
end

#shutdownObject

Signal shutdown



94
95
96
97
98
99
100
101
102
# File 'lib/conductor/worker/fiber_executor.rb', line 94

def shutdown
  @shutdown = true
  @running_fibers.each do |fiber|
    fiber.stop
  rescue StandardError
    # Ignore errors during shutdown
  end
  @running_fibers.clear
end

#shutdown?Boolean

Check if shutdown

Returns:

  • (Boolean)


106
107
108
# File 'lib/conductor/worker/fiber_executor.rb', line 106

def shutdown?
  @shutdown
end

#start { ... } ⇒ Object

Start the fiber scheduler Must be called before submitting tasks

Yields:

  • Block to execute within the scheduler



85
86
87
88
89
90
91
# File 'lib/conductor/worker/fiber_executor.rb', line 85

def start(&block)
  Async do |task|
    @scheduler = task
    @semaphore = Async::Semaphore.new(@max_concurrency)
    block.call(self) if block_given?
  end
end

#submit(&block) ⇒ Object

Submit a task for execution

Parameters:

  • block (Proc)

    Block to execute in a fiber

Returns:

  • (Object)

    Fiber task handle



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/conductor/worker/fiber_executor.rb', line 39

def submit(&block)
  raise 'FiberExecutor not started' unless @scheduler

  # Wrap the block with semaphore for concurrency control
  fiber_task = @scheduler.async do
    @semaphore.acquire
    begin
      block.call
    ensure
      @semaphore.release
    end
  end

  @running_fibers << fiber_task
  cleanup_completed_fibers
  fiber_task
end

#wait_for_completion(timeout: nil) ⇒ Object

Wait for all fibers to complete

Parameters:

  • timeout (Float, nil) (defaults to: nil)

    Optional timeout in seconds



72
73
74
75
76
77
78
79
80
# File 'lib/conductor/worker/fiber_executor.rb', line 72

def wait_for_completion(timeout: nil)
  cleanup_completed_fibers
  @running_fibers.each do |fiber|
    fiber.wait
  rescue StandardError
    # Ignore errors during wait
  end
  @running_fibers.clear
end