Class: Mcpeye::Tracker
- Inherits:
-
Object
- Object
- Mcpeye::Tracker
- Defined in:
- lib/mcpeye/tracker.rb
Overview
Tracker instruments a Ruby/Rails MCP server:
1. Injects the optional `mcpeyeIntent` parameter into each tool's input
schema (so the agent self-reports intent at near-zero cost).
2. Adds the reserved `mcpeye_request_capability` tool (active missing-
capability capture) with a handler that records the ask and returns a
canned acknowledgement. Toggle with `capture_missing_capabilities:`.
3. Wraps each tool handler to capture the call (name, arguments, result,
error, duration, the self-reported intent), redacting client-side.
4. Buffers CapturedToolCall events and POSTs the IngestPayload JSON to
"#{ingest_url}/ingest" with the x-mcpeye-secret header via Net::HTTP.
The wire shape matches @mcpeye/core’s IngestPayload exactly (and is byte-compatible with the TS and Python SDKs):
{ projectId, identity: { userId?, client?, serverVersion? }, events: [...] }
Prime directive: this NEVER raises into, or alters, the host MCP server. The only intentional raise is an empty ‘project_id` at construction (fail loud before any traffic). Every other failure — bad server shape, redaction error, transport failure, a buggy `identify`/`on_error` — is swallowed and routed to the single `on_error` sink (default: a `[mcpeye]` warning). A host handler’s OWN exception is the one thing re-raised, unchanged.
Latency: capturing a call is O(1) (redact + buffer append under a mutex). Shipping happens OFF the tool-call thread when a background flush is running (‘flush_interval:`). With the default `flush_interval: nil` (zero-thread), the eager flush at `flush_threshold` runs SYNCHRONOUSLY on the caller’s thread —one ‘Net::HTTP` POST bounded by the 5s/10s open/read timeouts — so set `flush_interval:` for fully non-blocking capture on a busy server.
The exact surface of a Ruby MCP server object varies (mcp gem, fast-mcp, a custom Rack handler, …), so #instrument duck-types the common shapes and degrades gracefully when it cannot introspect a server — you can always capture calls manually with #wrap / #record.
Constant Summary collapse
- DEFAULT_FLUSH_THRESHOLD =
POST once the buffer reaches this many events (eager flush). When a background flush thread is running it is woken to drain off the hot path; otherwise the flush happens inline on the caller’s thread.
20- DEFAULT_MAX_BUFFER =
Hard cap on buffered events; oldest are dropped past this while the API is down, so a permanently-unreachable ingest can never grow memory without bound and OOM the host. Mirrors the TS/Python maxBufferedEvents.
10_000- MAX_FIELD_BYTES =
A single captured field (arguments or result) is capped at this many UTF-8 bytes. One tool returning a multi-MB blob must never blow the ingest body limit (a permanent 413 requeue loop that halts ALL telemetry) or OOM the buffer — past the cap we ship a small marker. Mirrors TS/Python MAX_FIELD_BYTES.
32_768- REDACT_MARGIN_BYTES =
Margin past the cap to redact, so a secret straddling the cut is still matched in full before truncation. Mirrors TS/Python REDACT_MARGIN_BYTES.
4_096- TRUNCATED_SUFFIX =
"…[truncated]"
Instance Attribute Summary collapse
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#ingest_url ⇒ Object
readonly
Returns the value of attribute ingest_url.
-
#project_id ⇒ Object
readonly
Returns the value of attribute project_id.
-
#redact ⇒ Object
readonly
Returns the value of attribute redact.
Instance Method Summary collapse
-
#at_exit_drain ⇒ Object
Invoked from the at_exit hook registered by Mcpeye.track.
-
#flush ⇒ Object
POST the buffered events as a single IngestPayload.
-
#initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, on_error: nil) ⇒ Tracker
constructor
project_id — mcpeye project the data belongs to (required; raises on empty).
-
#inject_intent_param(server) ⇒ Object
Inject the optional mcpeyeIntent property into every discoverable tool’s input schema.
-
#instrument(server) ⇒ Object
Best-effort instrumentation of an MCP server object.
-
#pending ⇒ Object
Number of events waiting to be flushed (useful in tests / Rails shutdown).
-
#record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, duration_ms: nil) ⇒ Object
Capture a single tool call into the buffer (redacting + size-bounding if enabled).
-
#start_flush_thread ⇒ Object
Start the background flush thread (no-op if flush_interval is unset or a live thread already exists).
-
#stop ⇒ Object
Stop the background thread (bounded wait) and do a final flush.
-
#stopped? ⇒ Boolean
Whether #stop has been called (so the auto at_exit drain can skip).
-
#wrap(tool_name, own_intent: false, &handler) ⇒ Object
Wrap an arbitrary tool handler block so the call is captured.
Constructor Details
#initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, on_error: nil) ⇒ Tracker
project_id — mcpeye project the data belongs to (required; raises on empty). ingest_url — base URL of the self-hosted mcpeye API (no trailing /ingest).
Defaults to ENV["MCPEYE_INGEST_URL"].
ingest_secret — shared secret sent as x-mcpeye-secret.
Defaults to ENV["MCPEYE_INGEST_SECRET"].
redact — when true (default) scrub arguments/result/intent/error client-side. identity — static Hash { userId:, client:, serverVersion: }. identify — optional callable evaluated once per flush, returning the
identity Hash (per-request/thread-local attribution). A
raising identify yields {} for that flush, never breaks it.
flush_threshold — eager-flush buffer size (default 20). flush_interval — background flush interval in seconds (default nil = no
background thread; stay zero-thread unless asked).
denylist_fields — extra exact field names whose values are always dropped. max_buffer — hard buffer cap (default 10_000). capture_missing_capabilities — when true (default) add the reserved
`mcpeye_request_capability` tool so the agent can voice a
capability no existing tool covers, and answer that call
locally. Set false to keep the extra tool out of the manifest.
on_error — diagnostics sink for swallowed errors. Default warns with a
`[mcpeye]` prefix. Wrapped so it can never throw into the host.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/mcpeye/tracker.rb', line 91 def initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, on_error: nil) raise ArgumentError, "project_id is required" if project_id.nil? || project_id.to_s.empty? @project_id = project_id.to_s @ingest_url = (ingest_url || ENV["MCPEYE_INGEST_URL"]).to_s @ingest_secret = ingest_secret || ENV["MCPEYE_INGEST_SECRET"] @redact = redact @identity = normalize_identity(identity) @identify = identify @flush_threshold = flush_threshold @flush_interval = flush_interval @denylist_fields = denylist_fields || [] @max_buffer = max_buffer @capture_missing_capabilities = capture_missing_capabilities @on_error = wrap_on_error(on_error) @buffer = [] @mutex = Mutex.new # Eager-flush signalling for the background thread: a sticky flag (set under # @mutex when the threshold is hit) + a ConditionVariable, so a wakeup raised # while the thread is mid-POST is never lost (TS/Python parity — Python uses a # persistent Event for the same reason). @cond = ConditionVariable.new @flush_requested = false @flush_thread = nil @stopped = false @drop_warned = false @reported_once = {} # Names of tools that declare their OWN `mcpeyeIntent` param (a collision with # our reserved name). Populated during schema inspection; consulted on the # auto-wrap path so we never strip a field the tool legitimately owns. @own_intent_tools = {} # Names WE injected mcpeyeIntent into, so a second instrument pass doesn't # misclassify our own injected param as tool-owned (mirrors Python's # injected_tools). @injected_tools = {} end |
Instance Attribute Details
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
68 69 70 |
# File 'lib/mcpeye/tracker.rb', line 68 def identity @identity end |
#ingest_url ⇒ Object (readonly)
Returns the value of attribute ingest_url.
68 69 70 |
# File 'lib/mcpeye/tracker.rb', line 68 def ingest_url @ingest_url end |
#project_id ⇒ Object (readonly)
Returns the value of attribute project_id.
68 69 70 |
# File 'lib/mcpeye/tracker.rb', line 68 def project_id @project_id end |
#redact ⇒ Object (readonly)
Returns the value of attribute redact.
68 69 70 |
# File 'lib/mcpeye/tracker.rb', line 68 def redact @redact end |
Instance Method Details
#at_exit_drain ⇒ Object
Invoked from the at_exit hook registered by Mcpeye.track. Flushes unless the tracker was already stopped, so a manual #stop never double-flushes. Public so the auto-drain behavior is testable.
362 363 364 365 366 367 368 369 |
# File 'lib/mcpeye/tracker.rb', line 362 def at_exit_drain return if @stopped flush rescue StandardError => e @on_error.call(e) nil end |
#flush ⇒ Object
POST the buffered events as a single IngestPayload. No-op (returns nil) when the buffer is empty or ingest_url is unset. Returns the Net::HTTPResponse on success. NEVER raises: transport failures requeue the batch at the front (order-preserved, subject to the cap) and report via on_error; a poison (unserializable) batch is dropped with a report.
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/mcpeye/tracker.rb', line 279 def flush if blank?(@ingest_url) report_once( :url_blank, RuntimeError.new("ingest_url not configured — events stay buffered, nothing is flowing") ) return nil end batch = nil @mutex.synchronize do return nil if @buffer.empty? batch = @buffer @buffer = [] end warn_secret_once if blank?(@ingest_secret) body = begin JSON.generate(build_payload(batch)) rescue StandardError => e # Structurally un-serializable payload: drop it (not re-buffered, to # avoid a poison-batch loop), report, never raise. @on_error.call(e) return nil end begin response = post_ingest(body) code = response.code.to_i raise "ingest responded #{code}" unless (200..299).cover?(code) response rescue StandardError => e @on_error.call(e) requeue(batch) nil end end |
#inject_intent_param(server) ⇒ Object
Inject the optional mcpeyeIntent property into every discoverable tool’s input schema. Returns the server (fail-open). Collision-safe (never clobbers a tool that already declares mcpeyeIntent), never touches ‘required`, and skips frozen schemas rather than raising FrozenError into boot.
170 171 172 173 174 175 176 |
# File 'lib/mcpeye/tracker.rb', line 170 def inject_intent_param(server) inject_count(server) server rescue StandardError => e @on_error.call(e) server end |
#instrument(server) ⇒ Object
Best-effort instrumentation of an MCP server object. Injects the mcpeyeIntent param into discoverable tool schemas and wraps discoverable handlers so calls are captured automatically. Returns the server unchanged (fail-open) when no shape matches — you can still use #wrap / #record. Reports once if nothing could be introspected.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/mcpeye/tracker.rb', line 145 def instrument(server) injected = inject_count(server) # Add the reserved tool AFTER intent injection (so it never gets an mcpeyeIntent # param) and BEFORE handler wrapping (so its pre-wrapped handler is skipped, not # double-wrapped). A no-op when capture_missing_capabilities is off. append_request_capability_tool(server) wrapped = wrap_handler_count(server) if injected.zero? && wrapped.zero? report_once( :no_introspect, RuntimeError.new( "could not introspect server tools; instrument is a no-op — capture calls manually with #wrap/#record" ) ) end server rescue StandardError => e @on_error.call(e) server end |
#pending ⇒ Object
Number of events waiting to be flushed (useful in tests / Rails shutdown).
372 373 374 |
# File 'lib/mcpeye/tracker.rb', line 372 def pending @mutex.synchronize { @buffer.length } end |
#record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, duration_ms: nil) ⇒ Object
Capture a single tool call into the buffer (redacting + size-bounding if enabled). Triggers an eager flush once the buffer hits the threshold: woken on the background thread when one is running (off the hot path), otherwise flushed SYNCHRONOUSLY on this (the caller’s) thread — set flush_interval: to avoid that blocking POST. Returns the captured event Hash, or nil if the event could not be built (reported via on_error, dropped) — never raises into the host.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/mcpeye/tracker.rb', line 239 def record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, duration_ms: nil) event = build_event(tool_name, arguments, result: result, is_error: is_error, error_message: , intent: intent, duration_ms: duration_ms) flush_inline = false @mutex.synchronize do @buffer << event trim_locked if @buffer.length >= @flush_threshold if @flush_thread&.alive? # Ask the background thread to drain now, off the hot path. The sticky # flag means the request is honored even if it arrives mid-POST. @flush_requested = true @cond.signal else flush_inline = true end end end flush if flush_inline event rescue StandardError => e # A redaction/serialization error must never propagate into the host # handler: report it, drop this one event, and return nil. @on_error.call(e) nil end |
#start_flush_thread ⇒ Object
Start the background flush thread (no-op if flush_interval is unset or a live thread already exists). Safe to call after a fork (the inherited thread is dead, so a fresh one is spawned) — call it from on_worker_boot in a forking server (Puma/Unicorn). Returns the thread (or nil when disabled).
325 326 327 328 329 330 331 |
# File 'lib/mcpeye/tracker.rb', line 325 def start_flush_thread return nil if @flush_interval.nil? return @flush_thread if @flush_thread&.alive? @stopped = false @flush_thread = Thread.new { flush_loop } end |
#stop ⇒ Object
Stop the background thread (bounded wait) and do a final flush. Idempotent.
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/mcpeye/tracker.rb', line 334 def stop @mutex.synchronize do @stopped = true @cond.broadcast end if @flush_thread begin @flush_thread.join(2) rescue StandardError nil end @flush_thread = nil end flush nil rescue StandardError => e @on_error.call(e) nil end |
#stopped? ⇒ Boolean
Whether #stop has been called (so the auto at_exit drain can skip).
355 356 357 |
# File 'lib/mcpeye/tracker.rb', line 355 def stopped? @stopped end |
#wrap(tool_name, own_intent: false, &handler) ⇒ Object
Wrap an arbitrary tool handler block so the call is captured.
handler = tracker.wrap("search_contacts") { |args| do_the_real_work(args) }
The returned proc takes the tool arguments Hash, strips the injected mcpeyeIntent out as ‘intent`, runs the original handler with the cleaned arguments, records the captured call (redacted), and returns the original result unchanged. A handler that raises is recorded as is_error and the identical exception is re-raised; a result-level isError is captured with the result omitted. Capture never raises into the host.
When ‘own_intent: true` the tool legitimately declares its OWN `mcpeyeIntent` parameter, so we pass the arguments through verbatim (never strip it — that would break the tool) and do not claim its value as agent intent. The auto-instrument path sets this for colliding tools; the manual path defaults to stripping.
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/mcpeye/tracker.rb', line 194 def wrap(tool_name, own_intent: false, &handler) raise ArgumentError, "a handler block is required" unless block_given? wrapped = proc do |args = {}| if own_intent cleaned = args.is_a?(Hash) ? args : {} intent = nil else cleaned, intent = split_intent(args) end started = monotonic_ms begin result = handler.call(cleaned) rescue StandardError => e # Host handler raised: record the failure, then re-raise the identical # exception so the agent/client sees the real error (never swallowed). record(tool_name, cleaned, is_error: true, error_message: e., intent: intent, duration_ms: monotonic_ms - started) raise end is_err, err_msg = result_error_info(result) if is_err record(tool_name, cleaned, is_error: true, error_message: err_msg, intent: intent, duration_ms: monotonic_ms - started) else record(tool_name, cleaned, result: result, intent: intent, duration_ms: monotonic_ms - started) end result end # Mark the proc so wrap_handler_count never double-wraps it (a second # instrument, or track + a manual instrument, would otherwise stack wrappers # and double-capture every call with distinct callIds the server can't dedup). wrapped.define_singleton_method(:mcpeye_wrapped?) { true } wrapped end |