Class: KairosMcp::Daemon::WAL

Inherits:
Object
  • Object
show all
Defined in:
lib/kairos_mcp/daemon/wal.rb

Overview

WAL — Write-Ahead Log for the DECIDE→ACT boundary.

Design (v0.2 §5):

* Per-mandate append-only JSON-lines file at
  .kairos/wal/{mandate_id}.wal.jsonl
* Every record is fsync'd after write. Parent directory is fsync'd on
  file creation [CF-6] so the newly created inode is durable.
* Covered ops:
    - plan_commit    : entire DECIDE-phase plan with hashed steps
    - append         : pending step (idem_key computed)
    - transition     : pending → executing → completed | failed | needs_review
    - plan_finalize  : plan end of life (succeeded / failed / abandoned)
* Recovery (Daemon#recover_from_wal!) parses the file, rebuilds plan
  records, and uses `IdempotencyCheck` on every non-finalized step.

Crash-safety invariants:

I1. Once `commit_plan` returns, the plan is durable (fsync + dir fsync).
I2. Once a transition mark_* returns, the transition is durable (fsync).
I3. Appends are strictly ordered under @mutex; concurrent callers cannot
    interleave partial JSON lines.

NOTE on macOS: ‘IO#fsync` on HFS+/APFS flushes to the device driver but does not guarantee platter persistence (only `fcntl F_FULLFSYNC` does). We accept that trade-off; callers that need stronger guarantees on macOS should enable `fsync = full` at the filesystem layer.

Defined Under Namespace

Classes: PlanRecord, StepEntry

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path) ⇒ WAL

Returns a new instance of WAL.



93
94
95
96
97
98
# File 'lib/kairos_mcp/daemon/wal.rb', line 93

def initialize(path)
  @path = path
  @mutex = Mutex.new
  @file = File.open(path, 'a')
  @file.binmode
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



67
68
69
# File 'lib/kairos_mcp/daemon/wal.rb', line 67

def path
  @path
end

Class Method Details

.fsync_dir(dir) ⇒ Object

Best-effort directory fsync. Not all platforms support it; EINVAL on a few filesystems (FAT32) is fine to swallow.



82
83
84
85
86
87
88
89
90
91
# File 'lib/kairos_mcp/daemon/wal.rb', line 82

def self.fsync_dir(dir)
  d = File.open(dir)
  begin
    d.fsync
  rescue Errno::EINVAL, Errno::EACCES, NotImplementedError
    # Directory fsync not supported here; best effort.
  ensure
    d.close
  end
end

.open(path:) ⇒ Object

Open (or create) a WAL file. Guarantees parent-directory fsync on first creation so the new inode is crash-durable [CF-6].



71
72
73
74
75
76
77
78
# File 'lib/kairos_mcp/daemon/wal.rb', line 71

def self.open(path:)
  dir = File.dirname(path)
  FileUtils.mkdir_p(dir)
  is_new = !File.exist?(path)
  wal = new(path)
  fsync_dir(dir) if is_new
  wal
end

Instance Method Details

#append_pending(step_id:, plan_id:, idem_key:) ⇒ Object

—————————————————————- STEP



136
137
138
139
140
141
142
143
144
# File 'lib/kairos_mcp/daemon/wal.rb', line 136

def append_pending(step_id:, plan_id:, idem_key:)
  append(
    op: 'append',
    step_id: step_id,
    plan_id: plan_id,
    idem_key: idem_key,
    status: 'pending'
  )
end

#archive(to: nil) ⇒ Object

Compress the WAL file to ‘dest` (defaults to “<path>.gz”) and remove the original. Used after a mandate completes and its plan history has been archived elsewhere.

Returns the destination path.



236
237
238
239
240
241
242
243
244
245
# File 'lib/kairos_mcp/daemon/wal.rb', line 236

def archive(to: nil)
  close
  dest = to || "#{@path}.gz"
  Zlib::GzipWriter.open(dest) do |gz|
    File.open(@path, 'rb') { |src| IO.copy_stream(src, gz) }
  end
  self.class.fsync_dir(File.dirname(dest))
  File.unlink(@path) if File.exist?(@path)
  dest
end

#closeObject



222
223
224
225
226
227
228
229
# File 'lib/kairos_mcp/daemon/wal.rb', line 222

def close
  @mutex.synchronize do
    next if @file.nil? || @file.closed?

    @file.close
    @file = nil
  end
end

#commit_plan(plan_id:, mandate_id:, cycle:, steps:) ⇒ Object

Record an entire DECIDE-phase plan. ‘steps` is an Array of Hashes with keys :step_id, :tool, :params_hash, :pre_hash, :expected_post_hash. Additional keys are preserved (best-effort) so callers can annotate.



105
106
107
108
109
110
111
112
113
114
115
# File 'lib/kairos_mcp/daemon/wal.rb', line 105

def commit_plan(plan_id:, mandate_id:, cycle:, steps:)
  canonical_steps = steps.map { |s| normalize_step(s) }
  append(
    op: 'plan_commit',
    plan_id: plan_id,
    mandate_id: mandate_id,
    cycle: cycle,
    plan_hash: Canonical.sha256_json(canonical_steps),
    steps: canonical_steps
  )
end

#finalize_plan(plan_id, status: 'succeeded') ⇒ Object



117
118
119
120
121
122
123
# File 'lib/kairos_mcp/daemon/wal.rb', line 117

def finalize_plan(plan_id, status: 'succeeded')
  append(
    op: 'plan_finalize',
    plan_id: plan_id,
    status: status
  )
end

#flushObject

—————————————————————- FILE



213
214
215
216
217
218
219
220
# File 'lib/kairos_mcp/daemon/wal.rb', line 213

def flush
  @mutex.synchronize do
    next if @file.nil? || @file.closed?

    @file.flush
    @file.fsync
  end
end

#mark_completed(step_id, post_hash:, result_hash:, recovered: false, evidence: nil) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/kairos_mcp/daemon/wal.rb', line 155

def mark_completed(step_id, post_hash:, result_hash:, recovered: false, evidence: nil)
  entry = {
    op: 'transition',
    step_id: step_id,
    status: 'completed',
    post_hash: post_hash,
    result_hash: result_hash
  }
  entry[:recovered] = true if recovered
  entry[:evidence] = evidence unless evidence.nil?
  append(entry)
end

#mark_executing(step_id, pre_hash:) ⇒ Object



146
147
148
149
150
151
152
153
# File 'lib/kairos_mcp/daemon/wal.rb', line 146

def mark_executing(step_id, pre_hash:)
  append(
    op: 'transition',
    step_id: step_id,
    status: 'executing',
    pre_hash: pre_hash
  )
end

#mark_failed(step_id, error_class:, error_msg:) ⇒ Object



168
169
170
171
172
173
174
175
176
# File 'lib/kairos_mcp/daemon/wal.rb', line 168

def mark_failed(step_id, error_class:, error_msg:)
  append(
    op: 'transition',
    step_id: step_id,
    status: 'failed',
    error_class: error_class,
    error_msg: error_msg.to_s[0, 500]
  )
end

#mark_needs_review(step_id, reason: nil, evidence: nil) ⇒ Object



178
179
180
181
182
183
184
185
186
187
# File 'lib/kairos_mcp/daemon/wal.rb', line 178

def mark_needs_review(step_id, reason: nil, evidence: nil)
  entry = {
    op: 'transition',
    step_id: step_id,
    status: 'needs_review'
  }
  entry[:reason] = reason unless reason.nil?
  entry[:evidence] = evidence unless evidence.nil?
  append(entry)
end

#mark_plan_abandoned(plan_id, reason:) ⇒ Object



125
126
127
128
129
130
131
132
# File 'lib/kairos_mcp/daemon/wal.rb', line 125

def mark_plan_abandoned(plan_id, reason:)
  append(
    op: 'plan_finalize',
    plan_id: plan_id,
    status: 'abandoned',
    reason: reason
  )
end

#mark_reset_to_pending(step_id) ⇒ Object



189
190
191
192
193
194
195
196
# File 'lib/kairos_mcp/daemon/wal.rb', line 189

def mark_reset_to_pending(step_id)
  append(
    op: 'transition',
    step_id: step_id,
    status: 'pending',
    reset: true
  )
end

#open?Boolean

True iff the WAL file is open.

Returns:

  • (Boolean)


248
249
250
# File 'lib/kairos_mcp/daemon/wal.rb', line 248

def open?
  !@file.nil? && !@file.closed?
end

#plansObject

Parse the WAL file and return every PlanRecord, in order of commit.



201
202
203
# File 'lib/kairos_mcp/daemon/wal.rb', line 201

def plans
  parse_all
end

#plans_not_finalizedObject

Subset: plans lacking a plan_finalize marker. These are what recovery must reconcile.



207
208
209
# File 'lib/kairos_mcp/daemon/wal.rb', line 207

def plans_not_finalized
  parse_all.reject { |p| p.finalized }
end