Module: KairosMcp::Daemon::Integration
- Defined in:
- lib/kairos_mcp/daemon/integration.rb
Overview
Integration — the glue that turns the P2.1 Daemon skeleton into a fully-wired P2.8 daemon.
This module does NOT modify Daemon source. Instead it uses ‘define_singleton_method` on a daemon instance to replace the `chronos_tick` and `run_one_ooda_cycle` stubs with real behavior, and to attach a handful of accessor methods used by Heartbeat.
Design (v0.2 P2.8 §7):
chronos_tick (wired)
→ chronos.tick(now) → enqueue_mandate for each FiredEvent
(records rejections + missed in the chronos object itself).
run_one_ooda_cycle (wired)
1. budget.reset_if_new_day!
2. if budget.exceeded? → log + return (mandate stays queued)
3. pop a mandate from chronos.queue
4. register_running + open WAL (if wal_dir) + call cycle_runner
5. record LLM usage reported by cycle_runner into budget + save
6. emit heartbeat (rate-limited)
7. unregister_running
AttachServer is started on wire! (if provided) and stopped when
the daemon stops. AttachServer's :shutdown command flows through
daemon.mailbox → dispatch_command → request_shutdown! — no new
plumbing needed.
The ‘cycle_runner` is a callable that takes a mandate hash and returns a Hash like:
{ status: 'ok'|'paused'|'error',
llm_calls: 1, input_tokens: 123, output_tokens: 456 }
Real wiring points at CognitiveLoop; tests inject a stub.
Defined Under Namespace
Classes: State
Class Method Summary collapse
- .apply_usage(state, result) ⇒ Object
- .attach_accessors!(daemon, state) ⇒ Object
- .attach_server_started?(attach_server) ⇒ Boolean
- .default_cycle_result ⇒ Object
- .emit_heartbeat(daemon, state) ⇒ Object
- .invoke_cycle_runner(state, mandate) ⇒ Object
-
.maybe_open_wal(state, mandate_id) ⇒ Object
Called from the overridden methods — keep them small and delegate here.
- .override_chronos_tick!(daemon, state) ⇒ Object
- .override_run_one_ooda_cycle!(daemon, state) ⇒ Object
-
.partial_usage_from_accumulator(state) ⇒ Object
P4.1: Extract partial usage from the shared UsageAccumulator.
-
.shutdown_error?(error) ⇒ Boolean
P4.1: Check if an error is a shutdown request without hard-coupling to DaemonLlmCaller (which may not be loaded in test environments).
-
.unwire!(daemon) ⇒ Object
Tear down integration resources.
-
.wire!(daemon, chronos:, budget: nil, heartbeat: nil, attach_server: nil, wal_dir: nil, cycle_runner: nil, usage_accumulator: nil, clock: nil, heartbeat_interval: Heartbeat::DEFAULT_INTERVAL) ⇒ Object
Wire components onto a Daemon instance.
Class Method Details
.apply_usage(state, result) ⇒ Object
265 266 267 268 269 270 271 272 273 274 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 265 def self.apply_usage(state, result) return unless state.budget && result state.budget.record_usage( input_tokens: Integer(result[:input_tokens] || result['input_tokens'] || 0), output_tokens: Integer(result[:output_tokens] || result['output_tokens'] || 0), calls: Integer(result[:llm_calls] || result['llm_calls'] || 0) ) state.budget.save end |
.attach_accessors!(daemon, state) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 116 def self.attach_accessors!(daemon, state) daemon.instance_variable_set(:@integration_state, state) daemon.define_singleton_method(:integration_state) { @integration_state } daemon.define_singleton_method(:active_mandate_id) do @integration_state&.active_mandate_id end daemon.define_singleton_method(:last_cycle_at) do @integration_state&.last_cycle_at end daemon.define_singleton_method(:queue_depth) do s = @integration_state s && s.chronos ? s.chronos.queue.size : 0 end end |
.attach_server_started?(attach_server) ⇒ Boolean
286 287 288 289 290 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 286 def self.attach_server_started?(attach_server) attach_server.respond_to?(:port) && !attach_server.port.nil? rescue StandardError false end |
.default_cycle_result ⇒ Object
241 242 243 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 241 def self.default_cycle_result { status: 'ok', llm_calls: 0, input_tokens: 0, output_tokens: 0 } end |
.emit_heartbeat(daemon, state) ⇒ Object
276 277 278 279 280 281 282 283 284 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 276 def self.emit_heartbeat(daemon, state) return unless state.heartbeat state.last_heartbeat_at = state.heartbeat.emit_if_due( daemon, state.last_heartbeat_at, interval: state.heartbeat_interval ) end |
.invoke_cycle_runner(state, mandate) ⇒ Object
234 235 236 237 238 239 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 234 def self.invoke_cycle_runner(state, mandate) return default_cycle_result unless state.cycle_runner r = state.cycle_runner.call(mandate) r.is_a?(Hash) ? r : default_cycle_result end |
.maybe_open_wal(state, mandate_id) ⇒ Object
Called from the overridden methods — keep them small and delegate here.
222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 222 def self.maybe_open_wal(state, mandate_id) return nil unless state.wal_dir && mandate_id # Lazy-require WAL so unit tests that don't exercise WAL don't pay # the cost of loading Zlib / Canonical. require_relative 'wal' unless defined?(KairosMcp::Daemon::WAL) path = File.join(state.wal_dir, "#{mandate_id}.wal.jsonl") KairosMcp::Daemon::WAL.open(path: path) rescue StandardError nil end |
.override_chronos_tick!(daemon, state) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 135 def self.override_chronos_tick!(daemon, state) daemon.define_singleton_method(:chronos_tick) do s = @integration_state return unless s && s.chronos now = s.clock.call events = s.chronos.tick(now) events.each do |ev| # Chronos#enqueue_mandate takes a positional Hash. result = s.chronos.enqueue_mandate( { schedule: ev.schedule, mandate: ev.mandate } ) if result == :rejected && s.chronos.respond_to?(:rollback_fire) s.chronos.rollback_fire(ev.name) end end end end |
.override_run_one_ooda_cycle!(daemon, state) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 154 def self.override_run_one_ooda_cycle!(daemon, state) daemon.define_singleton_method(:run_one_ooda_cycle) do s = @integration_state return unless s # 1. Roll the budget over if the day changed. s.budget&.reset_if_new_day! # 2. Budget gate — pause rather than pop the mandate. if s.budget&.exceeded? @logger&.warn('daemon_budget_exceeded', source: 'daemon', details: { limit: s.budget.limit, calls: s.budget.llm_calls }) KairosMcp::Daemon::Integration.emit_heartbeat(self, s) return end mandate = s.chronos&.pop_queued unless mandate KairosMcp::Daemon::Integration.emit_heartbeat(self, s) return end mandate_id = mandate[:id] || mandate[:name] # Chronos#unregister_running filters by :id — make sure it's set. mandate[:id] ||= mandate_id s.active_mandate_id = mandate_id s.chronos.register_running(mandate) if s.chronos.respond_to?(:register_running) wal = KairosMcp::Daemon::Integration.maybe_open_wal(s, mandate_id) s.current_wal = wal begin result = KairosMcp::Daemon::Integration.invoke_cycle_runner(s, mandate) KairosMcp::Daemon::Integration.apply_usage(s, result) rescue StandardError => e partial = KairosMcp::Daemon::Integration.partial_usage_from_accumulator(s) if KairosMcp::Daemon::Integration.shutdown_error?(e) # P4.1: Shutdown mid-cycle — apply partial usage, don't log as error partial[:status] = 'interrupted' @logger&.info('daemon_cycle_interrupted', source: 'daemon', details: { mandate: mandate_id, usage: partial }) else # P4.1: Recover partial usage even on failure partial[:status] = 'error' @logger&.error('daemon_cycle_runner_failed', source: 'daemon', details: { mandate: mandate_id, error: "#{e.class}: #{e.}" }) end KairosMcp::Daemon::Integration.apply_usage(s, partial) ensure s.current_wal = nil s.last_cycle_at = s.clock.call s.active_mandate_id = nil if s.chronos.respond_to?(:unregister_running) s.chronos.unregister_running(mandate_id) end KairosMcp::Daemon::Integration.emit_heartbeat(self, s) end end end |
.partial_usage_from_accumulator(state) ⇒ Object
P4.1: Extract partial usage from the shared UsageAccumulator. Used on exception paths (ShutdownRequested, LlmCallError) to ensure partial LLM spend is still recorded into Budget.
256 257 258 259 260 261 262 263 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 256 def self.partial_usage_from_accumulator(state) ua = state.usage_accumulator if ua && ua.respond_to?(:to_h) ua.to_h # { llm_calls:, input_tokens:, output_tokens: } else { llm_calls: 0, input_tokens: 0, output_tokens: 0 } end end |
.shutdown_error?(error) ⇒ Boolean
P4.1: Check if an error is a shutdown request without hard-coupling to DaemonLlmCaller (which may not be loaded in test environments).
247 248 249 250 251 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 247 def self.shutdown_error?(error) return true if defined?(KairosMcp::Daemon::DaemonLlmCaller::ShutdownRequested) && error.is_a?(KairosMcp::Daemon::DaemonLlmCaller::ShutdownRequested) false end |
.unwire!(daemon) ⇒ Object
Tear down integration resources. Safe to call during stop!.
93 94 95 96 97 98 99 100 101 102 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 93 def unwire!(daemon) state = daemon.instance_variable_get(:@integration_state) return unless state state.attach_server&.stop if state.attach_server state.current_wal&.close if state.current_wal.respond_to?(:close) daemon.remove_instance_variable(:@integration_state) rescue StandardError # Teardown must not raise. end |
.wire!(daemon, chronos:, budget: nil, heartbeat: nil, attach_server: nil, wal_dir: nil, cycle_runner: nil, usage_accumulator: nil, clock: nil, heartbeat_interval: Heartbeat::DEFAULT_INTERVAL) ⇒ Object
Wire components onto a Daemon instance. Returns the daemon.
Required:
daemon — a KairosMcp::Daemon (or duck-typed equivalent)
chronos — Chronos instance
Optional:
budget — Budget instance (a disabled stub is used if nil)
heartbeat — Heartbeat instance (also optional)
attach_server — AttachServer instance; started on wire!
wal_dir — directory for per-mandate WAL files
cycle_runner — ->(mandate) { ... } — stubbable cycle executor
clock — ->{ Time } — injected clock for heartbeat cadence
heartbeat_interval — seconds between heartbeats (default 10)
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/kairos_mcp/daemon/integration.rb', line 57 def wire!(daemon, chronos:, budget: nil, heartbeat: nil, attach_server: nil, wal_dir: nil, cycle_runner: nil, usage_accumulator: nil, clock: nil, heartbeat_interval: Heartbeat::DEFAULT_INTERVAL) clock ||= -> { Time.now.utc } state = State.new( chronos: chronos, budget: budget, heartbeat: heartbeat, attach_server: attach_server, wal_dir: wal_dir, cycle_runner: cycle_runner, usage_accumulator: usage_accumulator, clock: clock, heartbeat_interval: heartbeat_interval ) attach_accessors!(daemon, state) override_chronos_tick!(daemon, state) override_run_one_ooda_cycle!(daemon, state) # Start attach server after the daemon is in :running state. # Caller decides when to call wire! — it's expected AFTER daemon.start! attach_server&.start if attach_server && !attach_server_started?(attach_server) daemon end |