Class: Kaal::Coordinator
- Inherits:
-
Object
- Object
- Kaal::Coordinator
- Defined in:
- lib/kaal/core/coordinator.rb
Overview
Coordinator manages the main scheduler loop that calculates due fire times and dispatches cron work safely across multiple nodes using backend lease coordination.
The coordinator:
-
Runs a background thread on tick_interval
-
For each registered cron, calculates due fire times within the window
-
Attempts to acquire a distributed lease for each due time
-
Calls the enqueue callback if the lease is acquired
-
Supports graceful shutdown and re-entrancy for testing
Instance Method Summary collapse
-
#initialize(configuration:, registry:) ⇒ Coordinator
constructor
Initialize a new Coordinator instance.
-
#reset! ⇒ void
Reset coordinator state for re-entrancy in tests.
-
#restart! ⇒ Thread
Restart the coordinator (stop then start).
-
#running? ⇒ Boolean
Check if the coordinator is currently running.
-
#start! ⇒ Thread
Start the coordinator background thread.
-
#stop!(timeout: 30) ⇒ Boolean
Stop the coordinator gracefully.
-
#tick! ⇒ void
Execute a single tick manually.
Constructor Details
#initialize(configuration:, registry:) ⇒ Coordinator
Initialize a new Coordinator instance.
39 40 41 42 43 44 45 46 47 |
# File 'lib/kaal/core/coordinator.rb', line 39 def initialize(configuration:, registry:) @configuration = configuration @registry = registry @thread = nil @running = false @stop_requested = false @mutex = Mutex.new @tick_cv = ConditionVariable.new end |
Instance Method Details
#reset! ⇒ void
This method returns an undefined value.
Reset coordinator state for re-entrancy in tests.
Stops any running thread before clearing state to avoid orphaning it. Raises an error if the thread cannot be stopped within the timeout.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/kaal/core/coordinator.rb', line 132 def reset! # Stop any running thread first to prevent orphaned threads if running? stopped = stop! raise 'Failed to stop coordinator thread within timeout' unless stopped end # Now safe to reset all state @mutex.synchronize do @running = false @stop_requested = false @thread = nil end end |
#restart! ⇒ Thread
Restart the coordinator (stop then start).
106 107 108 109 |
# File 'lib/kaal/core/coordinator.rb', line 106 def restart! stop! start! end |
#running? ⇒ Boolean
Check if the coordinator is currently running.
97 98 99 |
# File 'lib/kaal/core/coordinator.rb', line 97 def running? @mutex.synchronize { @running } end |
#start! ⇒ Thread
Start the coordinator background thread.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kaal/core/coordinator.rb', line 54 def start! @mutex.synchronize do return nil if @running # Run recovery before starting the main loop recover_missed_runs @running = true @stop_requested = false @thread = Thread.new { run_loop } @thread.abort_on_exception = true return @thread end end |
#stop!(timeout: 30) ⇒ Boolean
Stop the coordinator gracefully.
Signals the coordinator to stop after the current tick completes, then waits for the thread to finish.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/kaal/core/coordinator.rb', line 78 def stop!(timeout: 30) # rubocop:disable Naming/PredicateMethod request_stop # Wait for thread to finish outside the lock result = @thread&.join(timeout) # If we had a thread and join timed out, thread is still alive return false if @thread && result.nil? @thread = nil @mutex.synchronize { @running = false } true end |
#tick! ⇒ void
This method returns an undefined value.
Execute a single tick manually.
This is useful for testing and Rake tasks that want to trigger the scheduler without running the background loop.
119 120 121 |
# File 'lib/kaal/core/coordinator.rb', line 119 def tick! execute_tick end |