Class: KairosMcp::Daemon::CommandMailbox
- Inherits:
-
Object
- Object
- KairosMcp::Daemon::CommandMailbox
- Defined in:
- lib/kairos_mcp/daemon/command_mailbox.rb
Overview
Thread-safe command mailbox (CF-2).
Design rationale (design v0.2 §3.1 / CommandMailbox):
-
HTTP server threads (producers) enqueue command Hashes.
-
The single event-loop thread (consumer) drains them at the top of each tick via ‘drain`.
-
Built on top of ::Queue so enqueue is non-blocking and lock-free from the caller’s perspective; drain is bounded and non-blocking.
Constant Summary collapse
- COMMAND_TYPES =
Sentinel types for well-known commands. SkillSets may introduce new types freely — the mailbox itself is type-agnostic.
%i[reload shutdown status_dump custom].freeze
- DEFAULT_MAX_SIZE =
10_000
Instance Attribute Summary collapse
-
#dropped_count ⇒ Object
readonly
Returns the value of attribute dropped_count.
-
#max_size ⇒ Object
readonly
Returns the value of attribute max_size.
Instance Method Summary collapse
-
#clear ⇒ Object
Drop all queued commands.
-
#drain(max: 32) ⇒ Array<Hash>
Drain up to ‘max` commands from the queue without blocking.
- #empty? ⇒ Boolean
-
#enqueue(type, payload = {}) ⇒ String?
Enqueue a command.
-
#initialize(max_size: DEFAULT_MAX_SIZE) ⇒ CommandMailbox
constructor
CF-6 fix: bounded capacity with drop-newest overflow policy.
-
#size ⇒ Object
Number of queued commands not yet drained.
Constructor Details
#initialize(max_size: DEFAULT_MAX_SIZE) ⇒ CommandMailbox
CF-6 fix: bounded capacity with drop-newest overflow policy.
25 26 27 28 29 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 25 def initialize(max_size: DEFAULT_MAX_SIZE) @queue = ::Queue.new @max_size = max_size @dropped_count = 0 end |
Instance Attribute Details
#dropped_count ⇒ Object (readonly)
Returns the value of attribute dropped_count.
22 23 24 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 22 def dropped_count @dropped_count end |
#max_size ⇒ Object (readonly)
Returns the value of attribute max_size.
22 23 24 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 22 def max_size @max_size end |
Instance Method Details
#clear ⇒ Object
Drop all queued commands. Used during shutdown to avoid leaking references; not part of the normal operating protocol.
86 87 88 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 86 def clear @queue.clear end |
#drain(max: 32) ⇒ Array<Hash>
Drain up to ‘max` commands from the queue without blocking. Returns an Array of command entries (may be empty).
61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 61 def drain(max: 32) drained = [] max.times do break if @queue.empty? begin drained << @queue.pop(true) # non_block rescue ThreadError # Queue became empty between empty? and pop — done. break end end drained end |
#empty? ⇒ Boolean
80 81 82 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 80 def empty? @queue.empty? end |
#enqueue(type, payload = {}) ⇒ String?
Enqueue a command. Returns the assigned command_id (UUID), or nil if the mailbox is full (drop-newest policy).
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 37 def enqueue(type, payload = {}) raise ArgumentError, 'type required' if type.nil? if @queue.size >= @max_size @dropped_count += 1 return nil end command_id = SecureRandom.uuid entry = { id: command_id, type: type.to_sym, payload: payload || {}, enqueued_at: Time.now.utc } @queue << entry command_id end |
#size ⇒ Object
Number of queued commands not yet drained.
76 77 78 |
# File 'lib/kairos_mcp/daemon/command_mailbox.rb', line 76 def size @queue.size end |