Module: KairosMcp::Daemon::Integration

Defined in:
lib/kairos_mcp/daemon/integration.rb

Overview

Integration — the glue that turns the P2.1 Daemon skeleton into a fully-wired P2.8 daemon.

This module does NOT modify Daemon source. Instead it uses ‘define_singleton_method` on a daemon instance to replace the `chronos_tick` and `run_one_ooda_cycle` stubs with real behavior, and to attach a handful of accessor methods used by Heartbeat.

Design (v0.2 P2.8 §7):

chronos_tick (wired)
  → chronos.tick(now) → enqueue_mandate for each FiredEvent
    (records rejections + missed in the chronos object itself).

run_one_ooda_cycle (wired)
  1. budget.reset_if_new_day!
  2. if budget.exceeded? → log + return (mandate stays queued)
  3. pop a mandate from chronos.queue
  4. register_running + open WAL (if wal_dir) + call cycle_runner
  5. record LLM usage reported by cycle_runner into budget + save
  6. emit heartbeat (rate-limited)
  7. unregister_running

AttachServer is started on wire! (if provided) and stopped when
the daemon stops. AttachServer's :shutdown command flows through
daemon.mailbox → dispatch_command → request_shutdown! — no new
plumbing needed.

The ‘cycle_runner` is a callable that takes a mandate hash and returns a Hash like:

{ status: 'ok'|'paused'|'error',
  llm_calls: 1, input_tokens: 123, output_tokens: 456 }

Real wiring points at CognitiveLoop; tests inject a stub.

Defined Under Namespace

Classes: State

Class Method Summary collapse

Class Method Details

.apply_usage(state, result) ⇒ Object



265
266
267
268
269
270
271
272
273
274
# File 'lib/kairos_mcp/daemon/integration.rb', line 265

def self.apply_usage(state, result)
  return unless state.budget && result

  state.budget.record_usage(
    input_tokens:  Integer(result[:input_tokens]  || result['input_tokens']  || 0),
    output_tokens: Integer(result[:output_tokens] || result['output_tokens'] || 0),
    calls:         Integer(result[:llm_calls]     || result['llm_calls']     || 0)
  )
  state.budget.save
end

.attach_accessors!(daemon, state) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/kairos_mcp/daemon/integration.rb', line 116

def self.attach_accessors!(daemon, state)
  daemon.instance_variable_set(:@integration_state, state)

  daemon.define_singleton_method(:integration_state) { @integration_state }

  daemon.define_singleton_method(:active_mandate_id) do
    @integration_state&.active_mandate_id
  end

  daemon.define_singleton_method(:last_cycle_at) do
    @integration_state&.last_cycle_at
  end

  daemon.define_singleton_method(:queue_depth) do
    s = @integration_state
    s && s.chronos ? s.chronos.queue.size : 0
  end
end

.attach_server_started?(attach_server) ⇒ Boolean

Returns:

  • (Boolean)


286
287
288
289
290
# File 'lib/kairos_mcp/daemon/integration.rb', line 286

def self.attach_server_started?(attach_server)
  attach_server.respond_to?(:port) && !attach_server.port.nil?
rescue StandardError
  false
end

.default_cycle_resultObject



241
242
243
# File 'lib/kairos_mcp/daemon/integration.rb', line 241

def self.default_cycle_result
  { status: 'ok', llm_calls: 0, input_tokens: 0, output_tokens: 0 }
end

.emit_heartbeat(daemon, state) ⇒ Object



276
277
278
279
280
281
282
283
284
# File 'lib/kairos_mcp/daemon/integration.rb', line 276

def self.emit_heartbeat(daemon, state)
  return unless state.heartbeat

  state.last_heartbeat_at = state.heartbeat.emit_if_due(
    daemon,
    state.last_heartbeat_at,
    interval: state.heartbeat_interval
  )
end

.invoke_cycle_runner(state, mandate) ⇒ Object



234
235
236
237
238
239
# File 'lib/kairos_mcp/daemon/integration.rb', line 234

def self.invoke_cycle_runner(state, mandate)
  return default_cycle_result unless state.cycle_runner

  r = state.cycle_runner.call(mandate)
  r.is_a?(Hash) ? r : default_cycle_result
end

.maybe_open_wal(state, mandate_id) ⇒ Object

Called from the overridden methods — keep them small and delegate here.



222
223
224
225
226
227
228
229
230
231
232
# File 'lib/kairos_mcp/daemon/integration.rb', line 222

def self.maybe_open_wal(state, mandate_id)
  return nil unless state.wal_dir && mandate_id

  # Lazy-require WAL so unit tests that don't exercise WAL don't pay
  # the cost of loading Zlib / Canonical.
  require_relative 'wal' unless defined?(KairosMcp::Daemon::WAL)
  path = File.join(state.wal_dir, "#{mandate_id}.wal.jsonl")
  KairosMcp::Daemon::WAL.open(path: path)
rescue StandardError
  nil
end

.override_chronos_tick!(daemon, state) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/kairos_mcp/daemon/integration.rb', line 135

def self.override_chronos_tick!(daemon, state)
  daemon.define_singleton_method(:chronos_tick) do
    s = @integration_state
    return unless s && s.chronos

    now = s.clock.call
    events = s.chronos.tick(now)
    events.each do |ev|
      # Chronos#enqueue_mandate takes a positional Hash.
      result = s.chronos.enqueue_mandate(
        { schedule: ev.schedule, mandate: ev.mandate }
      )
      if result == :rejected && s.chronos.respond_to?(:rollback_fire)
        s.chronos.rollback_fire(ev.name)
      end
    end
  end
end

.override_run_one_ooda_cycle!(daemon, state) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/kairos_mcp/daemon/integration.rb', line 154

def self.override_run_one_ooda_cycle!(daemon, state)
  daemon.define_singleton_method(:run_one_ooda_cycle) do
    s = @integration_state
    return unless s

    # 1. Roll the budget over if the day changed.
    s.budget&.reset_if_new_day!

    # 2. Budget gate — pause rather than pop the mandate.
    if s.budget&.exceeded?
      @logger&.warn('daemon_budget_exceeded',
                    source: 'daemon',
                    details: { limit: s.budget.limit,
                               calls: s.budget.llm_calls })
      KairosMcp::Daemon::Integration.emit_heartbeat(self, s)
      return
    end

    mandate = s.chronos&.pop_queued
    unless mandate
      KairosMcp::Daemon::Integration.emit_heartbeat(self, s)
      return
    end

    mandate_id = mandate[:id] || mandate[:name]
    # Chronos#unregister_running filters by :id — make sure it's set.
    mandate[:id] ||= mandate_id
    s.active_mandate_id = mandate_id
    s.chronos.register_running(mandate) if s.chronos.respond_to?(:register_running)

    wal = KairosMcp::Daemon::Integration.maybe_open_wal(s, mandate_id)
    s.current_wal = wal

    begin
      result = KairosMcp::Daemon::Integration.invoke_cycle_runner(s, mandate)
      KairosMcp::Daemon::Integration.apply_usage(s, result)
    rescue StandardError => e
      partial = KairosMcp::Daemon::Integration.partial_usage_from_accumulator(s)

      if KairosMcp::Daemon::Integration.shutdown_error?(e)
        # P4.1: Shutdown mid-cycle — apply partial usage, don't log as error
        partial[:status] = 'interrupted'
        @logger&.info('daemon_cycle_interrupted',
                      source: 'daemon',
                      details: { mandate: mandate_id, usage: partial })
      else
        # P4.1: Recover partial usage even on failure
        partial[:status] = 'error'
        @logger&.error('daemon_cycle_runner_failed',
                       source: 'daemon',
                       details: { mandate: mandate_id,
                                  error: "#{e.class}: #{e.message}" })
      end
      KairosMcp::Daemon::Integration.apply_usage(s, partial)
    ensure
      s.current_wal = nil
      s.last_cycle_at = s.clock.call
      s.active_mandate_id = nil
      if s.chronos.respond_to?(:unregister_running)
        s.chronos.unregister_running(mandate_id)
      end
      KairosMcp::Daemon::Integration.emit_heartbeat(self, s)
    end
  end
end

.partial_usage_from_accumulator(state) ⇒ Object

P4.1: Extract partial usage from the shared UsageAccumulator. Used on exception paths (ShutdownRequested, LlmCallError) to ensure partial LLM spend is still recorded into Budget.



256
257
258
259
260
261
262
263
# File 'lib/kairos_mcp/daemon/integration.rb', line 256

def self.partial_usage_from_accumulator(state)
  ua = state.usage_accumulator
  if ua && ua.respond_to?(:to_h)
    ua.to_h  # { llm_calls:, input_tokens:, output_tokens: }
  else
    { llm_calls: 0, input_tokens: 0, output_tokens: 0 }
  end
end

.shutdown_error?(error) ⇒ Boolean

P4.1: Check if an error is a shutdown request without hard-coupling to DaemonLlmCaller (which may not be loaded in test environments).

Returns:

  • (Boolean)


247
248
249
250
251
# File 'lib/kairos_mcp/daemon/integration.rb', line 247

def self.shutdown_error?(error)
  return true if defined?(KairosMcp::Daemon::DaemonLlmCaller::ShutdownRequested) &&
                  error.is_a?(KairosMcp::Daemon::DaemonLlmCaller::ShutdownRequested)
  false
end

.unwire!(daemon) ⇒ Object

Tear down integration resources. Safe to call during stop!.



93
94
95
96
97
98
99
100
101
102
# File 'lib/kairos_mcp/daemon/integration.rb', line 93

def unwire!(daemon)
  state = daemon.instance_variable_get(:@integration_state)
  return unless state

  state.attach_server&.stop if state.attach_server
  state.current_wal&.close if state.current_wal.respond_to?(:close)
  daemon.remove_instance_variable(:@integration_state)
rescue StandardError
  # Teardown must not raise.
end

.wire!(daemon, chronos:, budget: nil, heartbeat: nil, attach_server: nil, wal_dir: nil, cycle_runner: nil, usage_accumulator: nil, clock: nil, heartbeat_interval: Heartbeat::DEFAULT_INTERVAL) ⇒ Object

Wire components onto a Daemon instance. Returns the daemon.

Required:

daemon    — a KairosMcp::Daemon (or duck-typed equivalent)
chronos   — Chronos instance

Optional:

budget         — Budget instance (a disabled stub is used if nil)
heartbeat      — Heartbeat instance (also optional)
attach_server  — AttachServer instance; started on wire!
wal_dir        — directory for per-mandate WAL files
cycle_runner   — ->(mandate) { ... } — stubbable cycle executor
clock          — ->{ Time } — injected clock for heartbeat cadence
heartbeat_interval — seconds between heartbeats (default 10)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/kairos_mcp/daemon/integration.rb', line 57

def wire!(daemon,
          chronos:,
          budget: nil,
          heartbeat: nil,
          attach_server: nil,
          wal_dir: nil,
          cycle_runner: nil,
          usage_accumulator: nil,
          clock: nil,
          heartbeat_interval: Heartbeat::DEFAULT_INTERVAL)
  clock ||= -> { Time.now.utc }

  state = State.new(
    chronos: chronos,
    budget: budget,
    heartbeat: heartbeat,
    attach_server: attach_server,
    wal_dir: wal_dir,
    cycle_runner: cycle_runner,
    usage_accumulator: usage_accumulator,
    clock: clock,
    heartbeat_interval: heartbeat_interval
  )

  attach_accessors!(daemon, state)
  override_chronos_tick!(daemon, state)
  override_run_one_ooda_cycle!(daemon, state)

  # Start attach server after the daemon is in :running state.
  # Caller decides when to call wire! — it's expected AFTER daemon.start!
  attach_server&.start if attach_server && !attach_server_started?(attach_server)

  daemon
end