Class: Harbor::Events::Emitter

Inherits:
Object
  • Object
show all
Defined in:
lib/harbor/events/emitter.rb

Overview

Append-only event stream for every Harbor::Tools kernel invocation. Emits start + completed events per tool call. Events power:

- audit log (who did what when)
- agent observability pane (live tool-call feed in Harbor-web)
- future: metrics, failure analysis

Storage: same audit.db SQLite file as AuditLog (single DB for v1 per user decision to keep SQLite everywhere).

Schema:

events(event_id TEXT PK, parent_id TEXT, tool TEXT, actor TEXT,
       origin TEXT, app TEXT, destination TEXT, args TEXT,
       phase TEXT, result TEXT, error TEXT, duration_ms INTEGER,
       ts TEXT)

Indexed on (app, ts), (actor, ts), (origin, ts) for the three most common filter-by axes on the future agent-observability pane.

Callers MUST treat emission as fire-and-forget. Emitter failures are logged to stderr and swallowed — they never propagate into the caller. This is wired at the Harbor::Tools#call boundary; don’t add rescue blocks here that would hide real bugs.

Instance Method Summary collapse

Constructor Details

#initialize(db_path = nil) ⇒ Emitter

Returns a new instance of Emitter.



34
35
36
37
38
39
40
41
42
# File 'lib/harbor/events/emitter.rb', line 34

def initialize(db_path = nil)
  db_path ||= DEFAULT_AUDIT_DB
  FileUtils.mkdir_p(File.dirname(db_path))
  @db = SQLite3::Database.new(db_path)
  @db.execute("PRAGMA journal_mode=WAL")
  @db.execute("PRAGMA busy_timeout=5000")
  @db.results_as_hash = true
  create_table
end

Instance Method Details

#closeObject



89
90
91
# File 'lib/harbor/events/emitter.rb', line 89

def close
  @db.close
end

#emit_complete(event_id, result:, error: nil, duration_ms: nil) ⇒ Object



58
59
60
61
62
63
64
65
66
67
# File 'lib/harbor/events/emitter.rb', line 58

def emit_complete(event_id, result:, error: nil, duration_ms: nil)
  return unless event_id

  @db.execute(
    "INSERT INTO events (event_id, parent_id, tool, actor, origin, app, destination, args, phase, result, error, duration_ms, ts) " \
    "SELECT ?, parent_id, tool, actor, origin, app, destination, args, 'completed', ?, ?, ?, ? " \
    "FROM events WHERE event_id = ? AND phase = 'started' LIMIT 1",
    [SecureRandom.uuid, result.to_s, error, duration_ms, Time.now.utc.iso8601, event_id]
  )
end

#emit_start(tool:, actor:, origin:, app: nil, destination: nil, args: nil, parent_id: nil) ⇒ Object

Returns event_id so the caller can pair it with the completed event.



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/harbor/events/emitter.rb', line 45

def emit_start(tool:, actor:, origin:, app: nil, destination: nil, args: nil, parent_id: nil)
  event_id = SecureRandom.uuid
  @db.execute(
    "INSERT INTO events (event_id, parent_id, tool, actor, origin, app, destination, args, phase, ts) " \
    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
    [event_id, parent_id, tool.to_s, actor&.to_s, origin.to_s,
     app, destination,
     args ? Redactor.redact(args).to_json : nil,
     "started", Time.now.utc.iso8601]
  )
  event_id
end

#query(app: nil, actor: nil, origin: nil, limit: 100) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/harbor/events/emitter.rb', line 69

def query(app: nil, actor: nil, origin: nil, limit: 100)
  conditions = []
  bindings = []
  if app
    conditions << "app = ?"
    bindings << app
  end
  if actor
    conditions << "actor = ?"
    bindings << actor
  end
  if origin
    conditions << "origin = ?"
    bindings << origin.to_s
  end
  where_clause = conditions.empty? ? "" : "WHERE #{conditions.join(' AND ')}"
  sql = "SELECT * FROM events #{where_clause} ORDER BY ts DESC, id DESC LIMIT ?"
  @db.execute(sql, bindings + [limit])
end