Class: KairosMcp::Daemon::WAL
- Inherits:
-
Object
- Object
- KairosMcp::Daemon::WAL
- Defined in:
- lib/kairos_mcp/daemon/wal.rb
Overview
WAL — Write-Ahead Log for the DECIDE→ACT boundary.
Design (v0.2 §5):
* Per-mandate append-only JSON-lines file at
.kairos/wal/{mandate_id}.wal.jsonl
* Every record is fsync'd after write. Parent directory is fsync'd on
file creation [CF-6] so the newly created inode is durable.
* Covered ops:
- plan_commit : entire DECIDE-phase plan with hashed steps
- append : pending step (idem_key computed)
- transition : pending → executing → completed | failed | needs_review
- plan_finalize : plan end of life (succeeded / failed / abandoned)
* Recovery (Daemon#recover_from_wal!) parses the file, rebuilds plan
records, and uses `IdempotencyCheck` on every non-finalized step.
Crash-safety invariants:
I1. Once `commit_plan` returns, the plan is durable (fsync + dir fsync).
I2. Once a transition mark_* returns, the transition is durable (fsync).
I3. Appends are strictly ordered under @mutex; concurrent callers cannot
interleave partial JSON lines.
NOTE on macOS: ‘IO#fsync` on HFS+/APFS flushes to the device driver but does not guarantee platter persistence (only `fcntl F_FULLFSYNC` does). We accept that trade-off; callers that need stronger guarantees on macOS should enable `fsync = full` at the filesystem layer.
Defined Under Namespace
Classes: PlanRecord, StepEntry
Instance Attribute Summary collapse
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
-
.fsync_dir(dir) ⇒ Object
Best-effort directory fsync.
-
.open(path:) ⇒ Object
Open (or create) a WAL file.
Instance Method Summary collapse
-
#append_pending(step_id:, plan_id:, idem_key:) ⇒ Object
—————————————————————- STEP.
-
#archive(to: nil) ⇒ Object
Compress the WAL file to ‘dest` (defaults to “<path>.gz”) and remove the original.
- #close ⇒ Object
-
#commit_plan(plan_id:, mandate_id:, cycle:, steps:) ⇒ Object
Record an entire DECIDE-phase plan.
- #finalize_plan(plan_id, status: 'succeeded') ⇒ Object
-
#flush ⇒ Object
—————————————————————- FILE.
-
#initialize(path) ⇒ WAL
constructor
A new instance of WAL.
- #mark_completed(step_id, post_hash:, result_hash:, recovered: false, evidence: nil) ⇒ Object
- #mark_executing(step_id, pre_hash:) ⇒ Object
- #mark_failed(step_id, error_class:, error_msg:) ⇒ Object
- #mark_needs_review(step_id, reason: nil, evidence: nil) ⇒ Object
- #mark_plan_abandoned(plan_id, reason:) ⇒ Object
- #mark_reset_to_pending(step_id) ⇒ Object
-
#open? ⇒ Boolean
True iff the WAL file is open.
-
#plans ⇒ Object
Parse the WAL file and return every PlanRecord, in order of commit.
-
#plans_not_finalized ⇒ Object
Subset: plans lacking a plan_finalize marker.
Constructor Details
#initialize(path) ⇒ WAL
Returns a new instance of WAL.
93 94 95 96 97 98 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 93 def initialize(path) @path = path @mutex = Mutex.new @file = File.open(path, 'a') @file.binmode end |
Instance Attribute Details
#path ⇒ Object (readonly)
Returns the value of attribute path.
67 68 69 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 67 def path @path end |
Class Method Details
.fsync_dir(dir) ⇒ Object
Best-effort directory fsync. Not all platforms support it; EINVAL on a few filesystems (FAT32) is fine to swallow.
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 82 def self.fsync_dir(dir) d = File.open(dir) begin d.fsync rescue Errno::EINVAL, Errno::EACCES, NotImplementedError # Directory fsync not supported here; best effort. ensure d.close end end |
.open(path:) ⇒ Object
Open (or create) a WAL file. Guarantees parent-directory fsync on first creation so the new inode is crash-durable [CF-6].
71 72 73 74 75 76 77 78 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 71 def self.open(path:) dir = File.dirname(path) FileUtils.mkdir_p(dir) is_new = !File.exist?(path) wal = new(path) fsync_dir(dir) if is_new wal end |
Instance Method Details
#append_pending(step_id:, plan_id:, idem_key:) ⇒ Object
—————————————————————- STEP
136 137 138 139 140 141 142 143 144 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 136 def append_pending(step_id:, plan_id:, idem_key:) append( op: 'append', step_id: step_id, plan_id: plan_id, idem_key: idem_key, status: 'pending' ) end |
#archive(to: nil) ⇒ Object
Compress the WAL file to ‘dest` (defaults to “<path>.gz”) and remove the original. Used after a mandate completes and its plan history has been archived elsewhere.
Returns the destination path.
236 237 238 239 240 241 242 243 244 245 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 236 def archive(to: nil) close dest = to || "#{@path}.gz" Zlib::GzipWriter.open(dest) do |gz| File.open(@path, 'rb') { |src| IO.copy_stream(src, gz) } end self.class.fsync_dir(File.dirname(dest)) File.unlink(@path) if File.exist?(@path) dest end |
#close ⇒ Object
222 223 224 225 226 227 228 229 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 222 def close @mutex.synchronize do next if @file.nil? || @file.closed? @file.close @file = nil end end |
#commit_plan(plan_id:, mandate_id:, cycle:, steps:) ⇒ Object
Record an entire DECIDE-phase plan. ‘steps` is an Array of Hashes with keys :step_id, :tool, :params_hash, :pre_hash, :expected_post_hash. Additional keys are preserved (best-effort) so callers can annotate.
105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 105 def commit_plan(plan_id:, mandate_id:, cycle:, steps:) canonical_steps = steps.map { |s| normalize_step(s) } append( op: 'plan_commit', plan_id: plan_id, mandate_id: mandate_id, cycle: cycle, plan_hash: Canonical.sha256_json(canonical_steps), steps: canonical_steps ) end |
#finalize_plan(plan_id, status: 'succeeded') ⇒ Object
117 118 119 120 121 122 123 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 117 def finalize_plan(plan_id, status: 'succeeded') append( op: 'plan_finalize', plan_id: plan_id, status: status ) end |
#flush ⇒ Object
—————————————————————- FILE
213 214 215 216 217 218 219 220 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 213 def flush @mutex.synchronize do next if @file.nil? || @file.closed? @file.flush @file.fsync end end |
#mark_completed(step_id, post_hash:, result_hash:, recovered: false, evidence: nil) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 155 def mark_completed(step_id, post_hash:, result_hash:, recovered: false, evidence: nil) entry = { op: 'transition', step_id: step_id, status: 'completed', post_hash: post_hash, result_hash: result_hash } entry[:recovered] = true if recovered entry[:evidence] = evidence unless evidence.nil? append(entry) end |
#mark_executing(step_id, pre_hash:) ⇒ Object
146 147 148 149 150 151 152 153 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 146 def mark_executing(step_id, pre_hash:) append( op: 'transition', step_id: step_id, status: 'executing', pre_hash: pre_hash ) end |
#mark_failed(step_id, error_class:, error_msg:) ⇒ Object
168 169 170 171 172 173 174 175 176 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 168 def mark_failed(step_id, error_class:, error_msg:) append( op: 'transition', step_id: step_id, status: 'failed', error_class: error_class, error_msg: error_msg.to_s[0, 500] ) end |
#mark_needs_review(step_id, reason: nil, evidence: nil) ⇒ Object
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 178 def mark_needs_review(step_id, reason: nil, evidence: nil) entry = { op: 'transition', step_id: step_id, status: 'needs_review' } entry[:reason] = reason unless reason.nil? entry[:evidence] = evidence unless evidence.nil? append(entry) end |
#mark_plan_abandoned(plan_id, reason:) ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 125 def mark_plan_abandoned(plan_id, reason:) append( op: 'plan_finalize', plan_id: plan_id, status: 'abandoned', reason: reason ) end |
#mark_reset_to_pending(step_id) ⇒ Object
189 190 191 192 193 194 195 196 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 189 def mark_reset_to_pending(step_id) append( op: 'transition', step_id: step_id, status: 'pending', reset: true ) end |
#open? ⇒ Boolean
True iff the WAL file is open.
248 249 250 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 248 def open? !@file.nil? && !@file.closed? end |
#plans ⇒ Object
Parse the WAL file and return every PlanRecord, in order of commit.
201 202 203 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 201 def plans parse_all end |
#plans_not_finalized ⇒ Object
Subset: plans lacking a plan_finalize marker. These are what recovery must reconcile.
207 208 209 |
# File 'lib/kairos_mcp/daemon/wal.rb', line 207 def plans_not_finalized parse_all.reject { |p| p.finalized } end |