Class: Kaal::Coordinator
- Inherits:
-
Object
- Object
- Kaal::Coordinator
- Defined in:
- lib/kaal/core/coordinator.rb
Overview
Coordinator manages the main scheduler loop that calculates due fire times and dispatches cron work safely across multiple nodes using backend lease coordination.
The coordinator:
-
Runs a background thread on tick_interval
-
Calculates due cron fire times and acquires distributed leases for them
-
Dispatches claimed work and supports graceful shutdown and test re-entrancy
Constant Summary collapse
- DELAYED_JOB_BATCH_SIZE =
100- DELAYED_JOB_MAX_BATCHES_PER_TICK =
10- DELAYED_JOB_DELETE_CONFIRMATION_JITTER_MAX =
0.05
Instance Method Summary collapse
-
#initialize(configuration:, registry:) ⇒ Coordinator
constructor
Initialize a new Coordinator instance.
-
#reset! ⇒ void
Reset coordinator state for re-entrancy in tests.
-
#restart! ⇒ Thread
Restart the coordinator (stop then start).
-
#running? ⇒ Boolean
Check if the coordinator is currently running.
-
#start! ⇒ Thread
Start the coordinator background thread.
-
#stop!(timeout: 30) ⇒ Boolean
Stop the coordinator gracefully.
-
#tick! ⇒ void
Execute a single tick manually.
Constructor Details
#initialize(configuration:, registry:) ⇒ Coordinator
Initialize a new Coordinator instance.
30 31 32 33 34 35 36 37 38 |
# File 'lib/kaal/core/coordinator.rb', line 30 def initialize(configuration:, registry:) @configuration = configuration @registry = registry @thread = nil @running = false @stop_requested = false @mutex = Mutex.new @tick_cv = ConditionVariable.new end |
Instance Method Details
#reset! ⇒ void
This method returns an undefined value.
Reset coordinator state for re-entrancy in tests.
Stops any running thread before clearing state to avoid orphaning it. Raises an error if the thread cannot be stopped within the timeout.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/kaal/core/coordinator.rb', line 123 def reset! # Stop any running thread first to prevent orphaned threads if running? stopped = stop! raise 'Failed to stop coordinator thread within timeout' unless stopped end # Now safe to reset all state @mutex.synchronize do @running = false @stop_requested = false @thread = nil end end |
#restart! ⇒ Thread
Restart the coordinator (stop then start).
97 98 99 100 |
# File 'lib/kaal/core/coordinator.rb', line 97 def restart! stop! start! end |
#running? ⇒ Boolean
Check if the coordinator is currently running.
88 89 90 |
# File 'lib/kaal/core/coordinator.rb', line 88 def running? @mutex.synchronize { @running } end |
#start! ⇒ Thread
Start the coordinator background thread.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/kaal/core/coordinator.rb', line 45 def start! @mutex.synchronize do return nil if @running # Run recovery before starting the main loop recover_missed_runs @running = true @stop_requested = false @thread = Thread.new { run_loop } @thread.abort_on_exception = true return @thread end end |
#stop!(timeout: 30) ⇒ Boolean
Stop the coordinator gracefully.
Signals the coordinator to stop after the current tick completes, then waits for the thread to finish.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/kaal/core/coordinator.rb', line 69 def stop!(timeout: 30) # rubocop:disable Naming/PredicateMethod request_stop # Wait for thread to finish outside the lock result = @thread&.join(timeout) # If we had a thread and join timed out, thread is still alive return false if @thread && result.nil? @thread = nil @mutex.synchronize { @running = false } true end |
#tick! ⇒ void
This method returns an undefined value.
Execute a single tick manually.
This is useful for testing and Rake tasks that want to trigger the scheduler without running the background loop.
110 111 112 |
# File 'lib/kaal/core/coordinator.rb', line 110 def tick! execute_tick end |