Class: KairosMcp::Daemon::Chronos

Inherits:
Object
  • Object
show all
Defined in:
lib/kairos_mcp/daemon/chronos.rb

Overview

Chronos — the daemon’s time-driven scheduler (P2.4).

Design reference: v0.2 §4.

Responsibilities:

1. Load schedules from `.kairos/config/schedules.yml`.
2. Maintain `chronos_state.yml` atomically with two timestamps per
   schedule: `last_fire_at` (actual fire) and `last_evaluated_at`
   (last due-evaluation window boundary). These are intentionally
   separated [FIX: CF-10] so that:
     (a) A fresh boot does not treat un-fired history as "due now".
     (b) The skip policy cannot miscount by 1.
3. On each `tick(now)`, count the cron occurrences in the window
   (last_evaluated_at, now] and apply the per-schedule `missed_policy`:
     - skip             : fire once, log the rest
     - catch_up_once    : fire once regardless of the missed count
     - catch_up_bounded : fire up to `max_catch_up_runs`, drop the
       rest; if the backlog is older than `stale_after`, drop it
       entirely and fire exactly once to mark recovery [FIX: CF-3].
4. Offer an in-memory mandate queue with concurrency enforcement
   (queue / allow / reject) — daemon integration pops from it.

Crash-safety:

* State writes use tmp+rename+dir-fsync [FIX: CF-13]. A torn write
  leaves either the previous state or the new state intact — never
  a half-written YAML.
* An exception inside one schedule's evaluation does NOT prevent
  later schedules from being evaluated (per-schedule rescue).

Timezone handling:

* Per-schedule `timezone:` is applied via ENV['TZ'] swap around
  `Time#getlocal`. This is safe in the daemon's single-threaded
  event loop. Tests should not rely on the system TZ.
* DST caveat: during fall-back, the same wall-clock minute occurs
  twice in real time; a per-minute iterator will count both.
  This matches neither strict "fire once per wall minute" nor
  strict "fire once per real minute" — but is consistent with
  how many simple cron implementations behave.

This class is intentionally standalone: no coupling to Safety, Mandate, or Autonomos. Daemon integration wires those together.

Defined Under Namespace

Modules: Cron Classes: FiredEvent, MissedEntry, Rejection, StaleDrop

Constant Summary collapse

DEFAULT_SCHEDULES_PATH =
'.kairos/config/schedules.yml'
DEFAULT_STATE_PATH =
'.kairos/chronos_state.yml'
DEFAULT_STALE_AFTER =

48h

48 * 3600
DEFAULT_MAX_CATCH_UP =
5
DEFAULT_MAX_CYCLES =
50

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(schedules_path: nil, state_path: nil, logger: nil, clock: nil, schedules: nil) ⇒ Chronos

Returns a new instance of Chronos.

Parameters:

  • schedules_path (String) (defaults to: nil)

    path to schedules.yml

  • state_path (String) (defaults to: nil)

    path to chronos_state.yml

  • logger (#info, #warn, #error, nil) (defaults to: nil)
  • clock (#call, nil) (defaults to: nil)

    returns current Time (for tests)

  • schedules (Array<Hash>, nil) (defaults to: nil)

    inline schedules (skips file)



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/kairos_mcp/daemon/chronos.rb', line 76

def initialize(schedules_path: nil, state_path: nil, logger: nil,
               clock: nil, schedules: nil)
  @schedules_path = schedules_path || DEFAULT_SCHEDULES_PATH
  @state_path     = state_path     || DEFAULT_STATE_PATH
  @logger         = logger
  @clock          = clock || -> { Time.now }
  @boot_time      = @clock.call

  @schedules = schedules ? schedules.map { |s| symbolize(s) } : load_schedules
  @state     = load_state

  @queue            = []
  @running_mandates = []
  @missed_log       = []
  @stale_drops      = []
  @rejection_log    = []
end

Instance Attribute Details

#missed_logObject (readonly)

Returns the value of attribute missed_log.



66
67
68
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66

def missed_log
  @missed_log
end

#queueObject (readonly)

Returns the value of attribute queue.



66
67
68
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66

def queue
  @queue
end

#rejection_logObject (readonly)

Returns the value of attribute rejection_log.



66
67
68
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66

def rejection_log
  @rejection_log
end

#schedulesObject (readonly)

Returns the value of attribute schedules.



66
67
68
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66

def schedules
  @schedules
end

#stale_dropsObject (readonly)

Returns the value of attribute stale_drops.



66
67
68
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66

def stale_drops
  @stale_drops
end

#stateObject (readonly)

Returns the value of attribute state.



66
67
68
# File 'lib/kairos_mcp/daemon/chronos.rb', line 66

def state
  @state
end

Instance Method Details

#enqueue_mandate(event) ⇒ Object

Apply concurrency policy and either push the mandate onto the queue, reject it, or log it. Returns :queued | :rejected.

The ‘allow` policy still pushes to the queue — it is the daemon’s dispatcher that decides whether to start immediately or queue. The ‘allow`-with-same-scope case is treated like `queue`.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/kairos_mcp/daemon/chronos.rb', line 138

def enqueue_mandate(event)
  sched    = event[:schedule]
  mandate  = event[:mandate]
  name     = sched[:name]
  policy   = (sched[:concurrency] || 'queue').to_s

  case policy
  when 'queue'
    @queue << mandate
    :queued
  when 'allow'
    # Codex-R1 fix #1: same-project_scope overlap → queue; different scope → immediate
    if running_conflicts?(name, sched[:project_scope])
      @queue << mandate
      :queued
    else
      # Different project_scope or no conflict → allow concurrent
      @queue.unshift(mandate)  # priority position for immediate dispatch
      :queued
    end
  when 'reject'
    # Codex-R1 fix #1: reject checks both name AND project_scope
    if running_with_name?(name) || running_with_scope?(sched[:project_scope])
      @rejection_log << Rejection.new(name: name,
                                      reason: 'already_running',
                                      at: @clock.call.iso8601)
      log(:warn, 'chronos_mandate_rejected', name: name)
      :rejected
    else
      @queue << mandate
      :queued
    end
  else
    raise ArgumentError, "unknown concurrency policy: #{policy.inspect}"
  end
end

#pop_queuedObject

Pops the next mandate from the queue. Returns nil if empty.



186
187
188
# File 'lib/kairos_mcp/daemon/chronos.rb', line 186

def pop_queued
  @queue.shift
end

#register_running(mandate) ⇒ Object

Daemon tells us which mandates are currently running so concurrency decisions work. Entries must be Hashes with :id and :name.



192
193
194
# File 'lib/kairos_mcp/daemon/chronos.rb', line 192

def register_running(mandate)
  @running_mandates << mandate
end

#rollback_fire(schedule_name) ⇒ Object

Codex-R1 fix #2: roll back last_fire_at if enqueue was rejected. Call this after enqueue_mandate returns :rejected.



177
178
179
180
181
182
183
# File 'lib/kairos_mcp/daemon/chronos.rb', line 177

def rollback_fire(schedule_name)
  st = @state.dig('schedules', schedule_name.to_s)
  return unless st && st['_tentative_fire_at']
  st.delete('last_fire_at') if st['last_fire_at'] == st['_tentative_fire_at']
  st.delete('_tentative_fire_at')
  persist_state_atomic
end

#state_for(name) ⇒ Object

Public: does a (possibly absent) state file exist for a schedule?



95
96
97
# File 'lib/kairos_mcp/daemon/chronos.rb', line 95

def state_for(name)
  @state['schedules'][name.to_s] || {}
end

#state_pathObject

Raw path to the persisted state file (for tests / status dumps).



201
202
203
# File 'lib/kairos_mcp/daemon/chronos.rb', line 201

def state_path
  @state_path
end

#tick(now = nil) ⇒ Object

Evaluate every schedule against ‘now`. Returns an Array of FiredEvent objects (may be empty).

Order: schedules are evaluated in the order they appear in schedules.yml. State is persisted iff any schedule fired OR any state timestamp changed since load — to avoid fsync storms on idle ticks we only persist when something changed.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/kairos_mcp/daemon/chronos.rb', line 108

def tick(now = nil)
  now ||= @clock.call
  fired = []
  state_dirty = false

  @schedules.each do |sched|
    # Codex-R1 fix #3: guard against malformed schedule entries
    # (e.g. scalar instead of hash) before any method call.
    begin
      next unless sched.is_a?(Hash) && enabled?(sched)
      state_dirty = true if evaluate_schedule(sched, now, fired)
    rescue StandardError => e
      log(:error, 'chronos_schedule_eval_failed',
          name: (sched.is_a?(Hash) ? sched[:name] : sched.inspect),
          error: "#{e.class}: #{e.message}")
    end
  end

  persist_state_atomic if state_dirty
  fired
end

#unregister_running(mandate_id) ⇒ Object



196
197
198
# File 'lib/kairos_mcp/daemon/chronos.rb', line 196

def unregister_running(mandate_id)
  @running_mandates.reject! { |m| m[:id] == mandate_id }
end