Class: Harbor::Events::Emitter
- Inherits:
-
Object
- Object
- Harbor::Events::Emitter
- Defined in:
- lib/harbor/events/emitter.rb
Overview
Append-only event stream for every Harbor::Tools kernel invocation. Emits start + completed events per tool call. Events power:
- audit log (who did what when)
- agent observability pane (live tool-call feed in Harbor-web)
- future: metrics, failure analysis
Storage: same audit.db SQLite file as AuditLog (single DB for v1 per user decision to keep SQLite everywhere).
Schema:
events(event_id TEXT PK, parent_id TEXT, tool TEXT, actor TEXT,
origin TEXT, app TEXT, destination TEXT, args TEXT,
phase TEXT, result TEXT, error TEXT, duration_ms INTEGER,
ts TEXT)
Indexed on (app, ts), (actor, ts), (origin, ts) for the three most common filter-by axes on the future agent-observability pane.
Callers MUST treat emission as fire-and-forget. Emitter failures are logged to stderr and swallowed — they never propagate into the caller. This is wired at the Harbor::Tools#call boundary; don’t add rescue blocks here that would hide real bugs.
Instance Method Summary collapse
- #close ⇒ Object
- #emit_complete(event_id, result:, error: nil, duration_ms: nil) ⇒ Object
-
#emit_start(tool:, actor:, origin:, app: nil, destination: nil, args: nil, parent_id: nil) ⇒ Object
Returns event_id so the caller can pair it with the completed event.
-
#initialize(db_path = nil) ⇒ Emitter
constructor
A new instance of Emitter.
- #query(app: nil, actor: nil, origin: nil, limit: 100) ⇒ Object
Constructor Details
#initialize(db_path = nil) ⇒ Emitter
Returns a new instance of Emitter.
34 35 36 37 38 39 40 41 42 |
# File 'lib/harbor/events/emitter.rb', line 34 def initialize(db_path = nil) db_path ||= DEFAULT_AUDIT_DB FileUtils.mkdir_p(File.dirname(db_path)) @db = SQLite3::Database.new(db_path) @db.execute("PRAGMA journal_mode=WAL") @db.execute("PRAGMA busy_timeout=5000") @db.results_as_hash = true create_table end |
Instance Method Details
#close ⇒ Object
89 90 91 |
# File 'lib/harbor/events/emitter.rb', line 89 def close @db.close end |
#emit_complete(event_id, result:, error: nil, duration_ms: nil) ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/harbor/events/emitter.rb', line 58 def emit_complete(event_id, result:, error: nil, duration_ms: nil) return unless event_id @db.execute( "INSERT INTO events (event_id, parent_id, tool, actor, origin, app, destination, args, phase, result, error, duration_ms, ts) " \ "SELECT ?, parent_id, tool, actor, origin, app, destination, args, 'completed', ?, ?, ?, ? " \ "FROM events WHERE event_id = ? AND phase = 'started' LIMIT 1", [SecureRandom.uuid, result.to_s, error, duration_ms, Time.now.utc.iso8601, event_id] ) end |
#emit_start(tool:, actor:, origin:, app: nil, destination: nil, args: nil, parent_id: nil) ⇒ Object
Returns event_id so the caller can pair it with the completed event.
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/harbor/events/emitter.rb', line 45 def emit_start(tool:, actor:, origin:, app: nil, destination: nil, args: nil, parent_id: nil) event_id = SecureRandom.uuid @db.execute( "INSERT INTO events (event_id, parent_id, tool, actor, origin, app, destination, args, phase, ts) " \ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", [event_id, parent_id, tool.to_s, actor&.to_s, origin.to_s, app, destination, args ? Redactor.redact(args).to_json : nil, "started", Time.now.utc.iso8601] ) event_id end |
#query(app: nil, actor: nil, origin: nil, limit: 100) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/harbor/events/emitter.rb', line 69 def query(app: nil, actor: nil, origin: nil, limit: 100) conditions = [] bindings = [] if app conditions << "app = ?" bindings << app end if actor conditions << "actor = ?" bindings << actor end if origin conditions << "origin = ?" bindings << origin.to_s end where_clause = conditions.empty? ? "" : "WHERE #{conditions.join(' AND ')}" sql = "SELECT * FROM events #{where_clause} ORDER BY ts DESC, id DESC LIMIT ?" @db.execute(sql, bindings + [limit]) end |