Class: Kaal::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/kaal/core/coordinator.rb

Overview

Coordinator manages the main scheduler loop that calculates due fire times and dispatches cron work safely across multiple nodes using backend lease coordination.

The coordinator:

  1. Runs a background thread on tick_interval

  2. For each registered cron, calculates due fire times within the window

  3. Attempts to acquire a distributed lease for each due time

  4. Calls the enqueue callback if the lease is acquired

  5. Supports graceful shutdown and re-entrancy for testing

Examples:

Start the coordinator

coordinator = Kaal::Coordinator.new
coordinator.start!

Manual tick execution (for testing)

coordinator.tick!

Stop the coordinator

coordinator.stop!

Instance Method Summary collapse

Constructor Details

#initialize(configuration:, registry:) ⇒ Coordinator

Initialize a new Coordinator instance.

Parameters:

  • configuration (Configuration)

    the scheduler configuration

  • registry (Registry)

    the registered crons registry



39
40
41
42
43
44
45
46
47
# File 'lib/kaal/core/coordinator.rb', line 39

def initialize(configuration:, registry:)
  @configuration = configuration
  @registry = registry
  @thread = nil
  @running = false
  @stop_requested = false
  @mutex = Mutex.new
  @tick_cv = ConditionVariable.new
end

Instance Method Details

#reset!void

This method returns an undefined value.

Reset coordinator state for re-entrancy in tests.

Stops any running thread before clearing state to avoid orphaning it. Raises an error if the thread cannot be stopped within the timeout.

Raises:

  • (RuntimeError)

    if stop! times out



132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/kaal/core/coordinator.rb', line 132

def reset!
  # Stop any running thread first to prevent orphaned threads
  if running?
    stopped = stop!
    raise 'Failed to stop coordinator thread within timeout' unless stopped
  end

  # Now safe to reset all state
  @mutex.synchronize do
    @running = false
    @stop_requested = false
    @thread = nil
  end
end

#restart!Thread

Restart the coordinator (stop then start).

Returns:

  • (Thread)

    the started thread



106
107
108
109
# File 'lib/kaal/core/coordinator.rb', line 106

def restart!
  stop!
  start!
end

#running?Boolean

Check if the coordinator is currently running.

Returns:

  • (Boolean)

    true if running, false otherwise



97
98
99
# File 'lib/kaal/core/coordinator.rb', line 97

def running?
  @mutex.synchronize { @running }
end

#start!Thread

Start the coordinator background thread.

Returns:

  • (Thread)

    the started thread, or nil if already running



54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kaal/core/coordinator.rb', line 54

def start!
  @mutex.synchronize do
    return nil if @running

    # Run recovery before starting the main loop
    recover_missed_runs

    @running = true
    @stop_requested = false
    @thread = Thread.new { run_loop }
    @thread.abort_on_exception = true
    return @thread
  end
end

#stop!(timeout: 30) ⇒ Boolean

Stop the coordinator gracefully.

Signals the coordinator to stop after the current tick completes, then waits for the thread to finish.

Parameters:

  • timeout (Integer) (defaults to: 30)

    seconds to wait for graceful shutdown (default: 30)

Returns:

  • (Boolean)

    true if stopped, false if timeout



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/kaal/core/coordinator.rb', line 78

def stop!(timeout: 30) # rubocop:disable Naming/PredicateMethod
  request_stop

  # Wait for thread to finish outside the lock
  result = @thread&.join(timeout)

  # If we had a thread and join timed out, thread is still alive
  return false if @thread && result.nil?

  @thread = nil
  @mutex.synchronize { @running = false }

  true
end

#tick!void

This method returns an undefined value.

Execute a single tick manually.

This is useful for testing and Rake tasks that want to trigger the scheduler without running the background loop.



119
120
121
# File 'lib/kaal/core/coordinator.rb', line 119

def tick!
  execute_tick
end