Class: RubynCode::Teams::Mailbox
- Inherits:
-
Object
- Object
- RubynCode::Teams::Mailbox
- Defined in:
- lib/rubyn_code/teams/mailbox.rb
Overview
JSONL-based mailbox for inter-agent messaging backed by SQLite.
Messages are stored in the ‘mailbox_messages` table with structured JSON content. Each message tracks read/unread state per recipient. Supports structured data payloads and correlation IDs for request/response tracking.
Instance Method Summary collapse
-
#broadcast(from:, content:, all_names:) ⇒ Array<String>
Broadcasts a message from one agent to all other agents.
-
#find_by_correlation_id(correlation_id) ⇒ Array<Hash>
Finds all messages matching a correlation ID.
-
#initialize(db) ⇒ Mailbox
constructor
A new instance of Mailbox.
-
#pending_for(name) ⇒ Array<Hash>
Returns unread messages for the given agent WITHOUT marking them as read.
-
#read_inbox(name) ⇒ Array<Hash>
Reads all unread messages for the given agent and marks them as read.
-
#send(from:, to:, content:, message_type: 'message', correlation_id: nil, data: nil) ⇒ String
Sends a message from one agent to another.
-
#send_structured(from:, to:, type:, data:, content: nil, correlation_id: nil) ⇒ String
Sends a structured message with typed data payload.
-
#unread_count(name) ⇒ Integer
Returns the count of unread messages for the given agent.
Constructor Details
#initialize(db) ⇒ Mailbox
Returns a new instance of Mailbox.
16 17 18 19 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 16 def initialize(db) @db = db ensure_table! end |
Instance Method Details
#broadcast(from:, content:, all_names:) ⇒ Array<String>
Broadcasts a message from one agent to all other agents.
137 138 139 140 141 142 143 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 137 def broadcast(from:, content:, all_names:) recipients = all_names.reject { |n| n == from } recipients.map do |recipient| send(from: from, to: recipient, content: content, message_type: 'broadcast') end end |
#find_by_correlation_id(correlation_id) ⇒ Array<Hash>
Finds all messages matching a correlation ID. Useful for tracking request/response chains.
118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 118 def find_by_correlation_id(correlation_id) rows = @db.query( <<~SQL, SELECT id, payload, correlation_id, data FROM mailbox_messages WHERE correlation_id = ? ORDER BY created_at ASC SQL [correlation_id] ).to_a rows.map { |r| (r) } end |
#pending_for(name) ⇒ Array<Hash>
Returns unread messages for the given agent WITHOUT marking them as read. Used by IdlePoller to check for pending work without consuming messages.
150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 150 def pending_for(name) rows = @db.query( <<~SQL, SELECT id, payload, correlation_id, data FROM mailbox_messages WHERE recipient = ? AND read = 0 ORDER BY created_at ASC SQL [name] ).to_a rows.map { |r| (r) } end |
#read_inbox(name) ⇒ Array<Hash>
Reads all unread messages for the given agent and marks them as read.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 88 def read_inbox(name) rows = @db.query( <<~SQL, SELECT id, payload, correlation_id, data FROM mailbox_messages WHERE recipient = ? AND read = 0 ORDER BY created_at ASC SQL [name] ).to_a return [] if rows.empty? ids = rows.map { |r| r['id'] } = rows.map { |r| (r) } # Mark all fetched messages as read in a single statement placeholders = ids.map { '?' }.join(', ') @db.execute( "UPDATE mailbox_messages SET read = 1 WHERE id IN (#{placeholders})", ids ) end |
#send(from:, to:, content:, message_type: 'message', correlation_id: nil, data: nil) ⇒ String
Sends a message from one agent to another.
rubocop:disable Metrics/ParameterLists
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 31 def send(from:, to:, content:, message_type: 'message', correlation_id: nil, data: nil) id = SecureRandom.uuid now = Time.now.utc.iso8601 payload = JSON.generate({ id: id, from: from, to: to, content: content, message_type: , timestamp: now }) data_json = data ? JSON.generate(data) : nil @db.execute( <<~SQL, INSERT INTO mailbox_messages (id, sender, recipient, message_type, payload, correlation_id, data, read, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?) SQL [id, from, to, , payload, correlation_id, data_json, now] ) id end |
#send_structured(from:, to:, type:, data:, content: nil, correlation_id: nil) ⇒ String
Sends a structured message with typed data payload. Convenience wrapper around #send for machine-to-machine communication.
rubocop:disable Metrics/ParameterLists
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 69 def send_structured(from:, to:, type:, data:, content: nil, correlation_id: nil) content ||= "#{type}: #{data.inspect}"[0, 200] correlation_id ||= SecureRandom.uuid send( from: from, to: to, content: content, message_type: type, correlation_id: correlation_id, data: data ) end |
#unread_count(name) ⇒ Integer
Returns the count of unread messages for the given agent.
167 168 169 170 171 172 173 |
# File 'lib/rubyn_code/teams/mailbox.rb', line 167 def unread_count(name) rows = @db.query( 'SELECT COUNT(*) AS cnt FROM mailbox_messages WHERE recipient = ? AND read = 0', [name] ).to_a rows.first&.fetch('cnt', 0) || 0 end |