Class: KairosMcp::Daemon::Chronos
- Inherits:
-
Object
- Object
- KairosMcp::Daemon::Chronos
- Defined in:
- lib/kairos_mcp/daemon/chronos.rb
Overview
Chronos — the daemon’s time-driven scheduler (P2.4).
Design reference: v0.2 §4.
Responsibilities:
1. Load schedules from `.kairos/config/schedules.yml`.
2. Maintain `chronos_state.yml` atomically with two timestamps per
schedule: `last_fire_at` (actual fire) and `last_evaluated_at`
(last due-evaluation window boundary). These are intentionally
separated [FIX: CF-10] so that:
(a) A fresh boot does not treat un-fired history as "due now".
(b) The skip policy cannot miscount by 1.
3. On each `tick(now)`, count the cron occurrences in the window
(last_evaluated_at, now] and apply the per-schedule `missed_policy`:
- skip : fire once, log the rest
- catch_up_once : fire once regardless of the missed count
- catch_up_bounded : fire up to `max_catch_up_runs`, drop the
rest; if the backlog is older than `stale_after`, drop it
entirely and fire exactly once to mark recovery [FIX: CF-3].
4. Offer an in-memory mandate queue with concurrency enforcement
(queue / allow / reject) — daemon integration pops from it.
Crash-safety:
* State writes use tmp+rename+dir-fsync [FIX: CF-13]. A torn write
leaves either the previous state or the new state intact — never
a half-written YAML.
* An exception inside one schedule's evaluation does NOT prevent
later schedules from being evaluated (per-schedule rescue).
Timezone handling:
* Per-schedule `timezone:` is applied via ENV['TZ'] swap around
`Time#getlocal`. This is safe in the daemon's single-threaded
event loop. Tests should not rely on the system TZ.
* DST caveat: during fall-back, the same wall-clock minute occurs
twice in real time; a per-minute iterator will count both.
This matches neither strict "fire once per wall minute" nor
strict "fire once per real minute" — but is consistent with
how many simple cron implementations behave.
This class is intentionally standalone: no coupling to Safety, Mandate, or Autonomos. Daemon integration wires those together.
Defined Under Namespace
Modules: Cron Classes: FiredEvent, MissedEntry, Rejection, StaleDrop
Constant Summary collapse
- DEFAULT_SCHEDULES_PATH =
'.kairos/config/schedules.yml'- DEFAULT_STATE_PATH =
'.kairos/chronos_state.yml'- DEFAULT_STALE_AFTER =
48h
48 * 3600
- DEFAULT_MAX_CATCH_UP =
5- DEFAULT_MAX_CYCLES =
50
Instance Attribute Summary collapse
-
#missed_log ⇒ Object
readonly
Returns the value of attribute missed_log.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#rejection_log ⇒ Object
readonly
Returns the value of attribute rejection_log.
-
#schedules ⇒ Object
readonly
Returns the value of attribute schedules.
-
#stale_drops ⇒ Object
readonly
Returns the value of attribute stale_drops.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#enqueue_mandate(event) ⇒ Object
Apply concurrency policy and either push the mandate onto the queue, reject it, or log it.
-
#initialize(schedules_path: nil, state_path: nil, logger: nil, clock: nil, schedules: nil) ⇒ Chronos
constructor
A new instance of Chronos.
-
#pop_queued ⇒ Object
Pops the next mandate from the queue.
-
#register_running(mandate) ⇒ Object
Daemon tells us which mandates are currently running so concurrency decisions work.
-
#rollback_fire(schedule_name) ⇒ Object
Codex-R1 fix #2: roll back last_fire_at if enqueue was rejected.
-
#state_for(name) ⇒ Object
Public: does a (possibly absent) state file exist for a schedule?.
-
#state_path ⇒ Object
Raw path to the persisted state file (for tests / status dumps).
-
#tick(now = nil) ⇒ Object
Evaluate every schedule against ‘now`.
- #unregister_running(mandate_id) ⇒ Object
Constructor Details
#initialize(schedules_path: nil, state_path: nil, logger: nil, clock: nil, schedules: nil) ⇒ Chronos
Returns a new instance of Chronos.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 76 def initialize(schedules_path: nil, state_path: nil, logger: nil, clock: nil, schedules: nil) @schedules_path = schedules_path || DEFAULT_SCHEDULES_PATH @state_path = state_path || DEFAULT_STATE_PATH @logger = logger @clock = clock || -> { Time.now } @boot_time = @clock.call @schedules = schedules ? schedules.map { |s| symbolize(s) } : load_schedules @state = load_state @queue = [] @running_mandates = [] @missed_log = [] @stale_drops = [] @rejection_log = [] end |
Instance Attribute Details
#missed_log ⇒ Object (readonly)
Returns the value of attribute missed_log.
66 67 68 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66 def missed_log @missed_log end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
66 67 68 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66 def queue @queue end |
#rejection_log ⇒ Object (readonly)
Returns the value of attribute rejection_log.
66 67 68 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66 def rejection_log @rejection_log end |
#schedules ⇒ Object (readonly)
Returns the value of attribute schedules.
66 67 68 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66 def schedules @schedules end |
#stale_drops ⇒ Object (readonly)
Returns the value of attribute stale_drops.
66 67 68 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66 def stale_drops @stale_drops end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
66 67 68 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66 def state @state end |
Instance Method Details
#enqueue_mandate(event) ⇒ Object
Apply concurrency policy and either push the mandate onto the queue, reject it, or log it. Returns :queued | :rejected.
The ‘allow` policy still pushes to the queue — it is the daemon’s dispatcher that decides whether to start immediately or queue. The ‘allow`-with-same-scope case is treated like `queue`.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 138 def enqueue_mandate(event) sched = event[:schedule] mandate = event[:mandate] name = sched[:name] policy = (sched[:concurrency] || 'queue').to_s case policy when 'queue' @queue << mandate :queued when 'allow' # Codex-R1 fix #1: same-project_scope overlap → queue; different scope → immediate if running_conflicts?(name, sched[:project_scope]) @queue << mandate :queued else # Different project_scope or no conflict → allow concurrent @queue.unshift(mandate) # priority position for immediate dispatch :queued end when 'reject' # Codex-R1 fix #1: reject checks both name AND project_scope if running_with_name?(name) || running_with_scope?(sched[:project_scope]) @rejection_log << Rejection.new(name: name, reason: 'already_running', at: @clock.call.iso8601) log(:warn, 'chronos_mandate_rejected', name: name) :rejected else @queue << mandate :queued end else raise ArgumentError, "unknown concurrency policy: #{policy.inspect}" end end |
#pop_queued ⇒ Object
Pops the next mandate from the queue. Returns nil if empty.
186 187 188 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 186 def pop_queued @queue.shift end |
#register_running(mandate) ⇒ Object
Daemon tells us which mandates are currently running so concurrency decisions work. Entries must be Hashes with :id and :name.
192 193 194 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 192 def register_running(mandate) @running_mandates << mandate end |
#rollback_fire(schedule_name) ⇒ Object
Codex-R1 fix #2: roll back last_fire_at if enqueue was rejected. Call this after enqueue_mandate returns :rejected.
177 178 179 180 181 182 183 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 177 def rollback_fire(schedule_name) st = @state.dig('schedules', schedule_name.to_s) return unless st && st['_tentative_fire_at'] st.delete('last_fire_at') if st['last_fire_at'] == st['_tentative_fire_at'] st.delete('_tentative_fire_at') persist_state_atomic end |
#state_for(name) ⇒ Object
Public: does a (possibly absent) state file exist for a schedule?
95 96 97 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 95 def state_for(name) @state['schedules'][name.to_s] || {} end |
#state_path ⇒ Object
Raw path to the persisted state file (for tests / status dumps).
201 202 203 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 201 def state_path @state_path end |
#tick(now = nil) ⇒ Object
Evaluate every schedule against ‘now`. Returns an Array of FiredEvent objects (may be empty).
Order: schedules are evaluated in the order they appear in schedules.yml. State is persisted iff any schedule fired OR any state timestamp changed since load — to avoid fsync storms on idle ticks we only persist when something changed.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 108 def tick(now = nil) now ||= @clock.call fired = [] state_dirty = false @schedules.each do |sched| # Codex-R1 fix #3: guard against malformed schedule entries # (e.g. scalar instead of hash) before any method call. begin next unless sched.is_a?(Hash) && enabled?(sched) state_dirty = true if evaluate_schedule(sched, now, fired) rescue StandardError => e log(:error, 'chronos_schedule_eval_failed', name: (sched.is_a?(Hash) ? sched[:name] : sched.inspect), error: "#{e.class}: #{e.}") end end persist_state_atomic if state_dirty fired end |
#unregister_running(mandate_id) ⇒ Object
196 197 198 |
# File 'lib/kairos_mcp/daemon/chronos.rb', line 196 def unregister_running(mandate_id) @running_mandates.reject! { |m| m[:id] == mandate_id } end |