Class: KairosMcp::Daemon::CommandMailbox

Inherits:
Object
  • Object
show all
Defined in:
lib/kairos_mcp/daemon/command_mailbox.rb

Overview

Thread-safe command mailbox (CF-2).

Design rationale (design v0.2 §3.1 / CommandMailbox):

  • HTTP server threads (producers) enqueue command Hashes.

  • The single event-loop thread (consumer) drains them at the top of each tick via ‘drain`.

  • Built on top of ::Queue so enqueue is non-blocking and lock-free from the caller’s perspective; drain is bounded and non-blocking.

Constant Summary collapse

COMMAND_TYPES =

Sentinel types for well-known commands. SkillSets may introduce new types freely — the mailbox itself is type-agnostic.

%i[reload shutdown status_dump custom].freeze
DEFAULT_MAX_SIZE =
10_000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size: DEFAULT_MAX_SIZE) ⇒ CommandMailbox

CF-6 fix: bounded capacity with drop-newest overflow policy.



25
26
27
28
29
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 25

def initialize(max_size: DEFAULT_MAX_SIZE)
  @queue = ::Queue.new
  @max_size = max_size
  @dropped_count = 0
end

Instance Attribute Details

#dropped_countObject (readonly)

Returns the value of attribute dropped_count.



22
23
24
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 22

def dropped_count
  @dropped_count
end

#max_sizeObject (readonly)

Returns the value of attribute max_size.



22
23
24
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 22

def max_size
  @max_size
end

Instance Method Details

#clearObject

Drop all queued commands. Used during shutdown to avoid leaking references; not part of the normal operating protocol.



86
87
88
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 86

def clear
  @queue.clear
end

#drain(max: 32) ⇒ Array<Hash>

Drain up to ‘max` commands from the queue without blocking. Returns an Array of command entries (may be empty).

Parameters:

  • max (Integer) (defaults to: 32)

    maximum number of commands to pop this call

Returns:

  • (Array<Hash>)


61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 61

def drain(max: 32)
  drained = []
  max.times do
    break if @queue.empty?
    begin
      drained << @queue.pop(true) # non_block
    rescue ThreadError
      # Queue became empty between empty? and pop — done.
      break
    end
  end
  drained
end

#empty?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 80

def empty?
  @queue.empty?
end

#enqueue(type, payload = {}) ⇒ String?

Enqueue a command. Returns the assigned command_id (UUID), or nil if the mailbox is full (drop-newest policy).

Parameters:

  • type (Symbol, String)

    command kind (e.g. :reload, :shutdown)

  • payload (Hash) (defaults to: {})

    arbitrary structured payload for the consumer

Returns:

  • (String, nil)

    command_id or nil if dropped

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 37

def enqueue(type, payload = {})
  raise ArgumentError, 'type required' if type.nil?

  if @queue.size >= @max_size
    @dropped_count += 1
    return nil
  end

  command_id = SecureRandom.uuid
  entry = {
    id: command_id,
    type: type.to_sym,
    payload: payload || {},
    enqueued_at: Time.now.utc
  }
  @queue << entry
  command_id
end

#sizeObject

Number of queued commands not yet drained.



76
77
78
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 76

def size
  @queue.size
end