Class: Mcpeye::Tracker
- Inherits:
-
Object
- Object
- Mcpeye::Tracker
- Defined in:
- lib/mcpeye/tracker.rb
Constant Summary collapse
- DEFAULT_FLUSH_THRESHOLD =
POST once the buffer reaches this many events (eager flush). When a background flush thread is running it is woken to drain off the hot path; otherwise the flush happens inline on the caller’s thread.
20- DEFAULT_MAX_BUFFER =
Hard cap on buffered events; oldest are dropped past this while the API is down, so a permanently-unreachable ingest can never grow memory without bound and OOM the host. Mirrors the TS/Python maxBufferedEvents.
10_000- MAX_FIELD_BYTES =
A single captured field (arguments or result) is capped at this many UTF-8 bytes. One tool returning a multi-MB blob must never blow the ingest body limit (a permanent 413 requeue loop that halts ALL telemetry) or OOM the buffer — past the cap we ship a small marker. Mirrors TS/Python MAX_FIELD_BYTES.
32_768- REDACT_MARGIN_BYTES =
Margin past the cap to redact, so a secret straddling the cut is still matched in full before truncation. Mirrors TS/Python REDACT_MARGIN_BYTES.
4_096- TRUNCATED_SUFFIX =
"…[truncated]"
Instance Attribute Summary collapse
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#ingest_url ⇒ Object
readonly
Returns the value of attribute ingest_url.
-
#project_id ⇒ Object
readonly
Returns the value of attribute project_id.
-
#redact ⇒ Object
readonly
Returns the value of attribute redact.
Instance Method Summary collapse
-
#answer_request_capability_official(_args = {}) ⇒ Object
Record a reserved-capability call and return the canned ack as a plain CallToolResult Hash.
-
#at_exit_drain ⇒ Object
Invoked from the at_exit hook registered by Mcpeye.track.
-
#capture_missing_capabilities? ⇒ Boolean
Whether the reserved mcpeye_request_capability tool is enabled.
-
#flush ⇒ Object
POST the buffered events as a single IngestPayload.
-
#harvest_host_intent(tool_name, args) ⇒ Object
Resolve the eligible host-intent value to promote for a call, or nil.
-
#host_intent_param_for(tool_name) ⇒ Object
The eligible host-intent param for a tool (call-time lookup), or nil.
-
#initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, host_intent_param: true, on_error: nil) ⇒ Tracker
constructor
project_id — mcpeye project the data belongs to (required; raises on empty).
-
#inject_intent_param(server) ⇒ Object
Inject the optional mcpeyeIntent property into every discoverable tool’s input schema.
-
#instrument(server) ⇒ Object
Best-effort instrumentation of an MCP server object.
-
#own_intent_tool?(tool_name) ⇒ Boolean
True when a tool declares its OWN mcpeyeIntent param (a collision).
-
#pending ⇒ Object
Number of events waiting to be flushed (useful in tests / Rails shutdown).
-
#record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, intent_source: nil, duration_ms: nil) ⇒ Object
Capture a single tool call into the buffer (redacting + size-bounding if enabled).
-
#reserved_host_owned? ⇒ Boolean
Read by the official call hook (which lives outside the Tracker instance) to decide whether to forward a reserved-name call to the host (collision) vs answer it locally.
-
#start_flush_thread ⇒ Object
Start the background flush thread (no-op if flush_interval is unset or a live thread already exists).
-
#stop ⇒ Object
Stop the background thread (bounded wait) and do a final flush.
-
#stopped? ⇒ Boolean
Whether #stop has been called (so the auto at_exit drain can skip).
-
#wrap(tool_name, own_intent: false, &handler) ⇒ Object
Wrap an arbitrary tool handler block so the call is captured.
Constructor Details
#initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, host_intent_param: true, on_error: nil) ⇒ Tracker
project_id — mcpeye project the data belongs to (required; raises on empty). ingest_url — base URL of the self-hosted mcpeye API (no trailing /ingest).
Defaults to ENV["MCPEYE_INGEST_URL"].
ingest_secret — shared secret sent as x-mcpeye-secret.
Defaults to ENV["MCPEYE_INGEST_SECRET"].
redact — when true (default) scrub arguments/result/intent/error client-side. identity — static Hash { userId:, userEmail:, client:, serverVersion: }
(batch-level fallback).
identify — optional callable for dynamic attribution. Evaluated PER CALL
on the request thread for userId/userEmail (correct per-event
attribution on a multi-user server) AND once per flush for the
batch identity. Return { userId:, userEmail:, client:,
serverVersion: }. A raising identify yields no identity for that
call/flush, never breaks the host. Example:
identify: -> { { userId: RequestStore.store[:current_user_id] } }
flush_threshold — eager-flush buffer size (default 20). flush_interval — background flush interval in seconds (default nil = no
background thread; stay zero-thread unless asked).
denylist_fields — extra exact field names whose values are always dropped. max_buffer — hard buffer cap (default 10_000). capture_missing_capabilities — when true (default) add the reserved
`mcpeye_request_capability` tool so the agent can voice a
capability no existing tool covers, and answer that call
locally. Set false to keep the extra tool out of the manifest.
on_error — diagnostics sink for swallowed errors. Default warns with a
`[mcpeye]` prefix. Wrapped so it can never throw into the host.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/mcpeye/tracker.rb', line 249 def initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, host_intent_param: true, on_error: nil) raise ArgumentError, "project_id is required" if project_id.nil? || project_id.to_s.empty? @project_id = project_id.to_s @ingest_url = (ingest_url || ENV["MCPEYE_INGEST_URL"]).to_s @ingest_secret = ingest_secret || ENV["MCPEYE_INGEST_SECRET"] @redact = redact @identity = normalize_identity(identity) @identify = identify @flush_threshold = flush_threshold @flush_interval = flush_interval @denylist_fields = denylist_fields || [] @max_buffer = max_buffer @capture_missing_capabilities = capture_missing_capabilities # Host-intent coexistence config (explicit field name and/or gated auto-detect): # true => gated auto-detect of "intent" # false => OFF (never harvest) # "str" => explicit field name, BYPASS gate (works even on no-schema paths) @host_intent_explicit, @host_intent_detect = parse_host_intent_config(host_intent_param) @on_error = wrap_on_error(on_error) @buffer = [] @mutex = Mutex.new # Eager-flush signalling for the background thread: a sticky flag (set under # @mutex when the threshold is hit) + a ConditionVariable, so a wakeup raised # while the thread is mid-POST is never lost (TS/Python parity — Python uses a # persistent Event for the same reason). @cond = ConditionVariable.new @flush_requested = false @flush_thread = nil @stopped = false @drop_warned = false @reported_once = {} # Names of tools that declare their OWN `mcpeyeIntent` param (a collision with # our reserved name). Populated during schema inspection; consulted on the # auto-wrap path so we never strip a field the tool legitimately owns. @own_intent_tools = {} # Names WE injected mcpeyeIntent into, so a second instrument pass doesn't # misclassify our own injected param as tool-owned (mirrors Python's # injected_tools). @injected_tools = {} # Per-tool map { tool_name => host_intent_param_name } of fields to harvest as # a FALLBACK intent. REBUILT on every tools/list (not grow-only like # @own_intent_tools) so a schema change can't leave a stale-eligible field. # Consulted at tools/call only when mcpeyeIntent came back empty. @host_intent_tools = {} # True when the HOST already exposes a tool named mcpeye_request_capability (a # collision). Recorded from the official tools/list (mirrors the duck path's # reserved-owned handling); when true the official call path forwards the # reserved name to the host instead of answering locally. @reserved_host_owned = false # Latch: emit the one-time loud "harvesting host intent" activation log once. @host_intent_activated = false # Latch: warn once if an explicit host_intent_param matched no listed tool. @host_intent_explicit_warned = false # Latch: true once a tools/list ever populated the detection map. On the # schema paths (official/duck) this makes harvest strictly map-driven like # the TS reference; the explicit-config fallback applies ONLY on a no-schema # path (#wrap), where no tools/list ever ran. @host_intent_listed = false # MCPEYE_DEBUG=intent: log per-tool detection reason codes. @debug_intent = intent_debug_enabled? end |
Instance Attribute Details
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
221 222 223 |
# File 'lib/mcpeye/tracker.rb', line 221 def identity @identity end |
#ingest_url ⇒ Object (readonly)
Returns the value of attribute ingest_url.
221 222 223 |
# File 'lib/mcpeye/tracker.rb', line 221 def ingest_url @ingest_url end |
#project_id ⇒ Object (readonly)
Returns the value of attribute project_id.
221 222 223 |
# File 'lib/mcpeye/tracker.rb', line 221 def project_id @project_id end |
#redact ⇒ Object (readonly)
Returns the value of attribute redact.
221 222 223 |
# File 'lib/mcpeye/tracker.rb', line 221 def redact @redact end |
Instance Method Details
#answer_request_capability_official(_args = {}) ⇒ Object
Record a reserved-capability call and return the canned ack as a plain CallToolResult Hash. The official gem serializes call_tool’s return value directly (it does NOT call #to_h), so a Hash — not a Tool::Response object —is what reaches the client.
359 360 361 362 363 364 365 |
# File 'lib/mcpeye/tracker.rb', line 359 def answer_request_capability_official(_args = {}) handle_request_capability(_args) { "content" => [{ "type" => "text", "text" => RequestCapability::ACK }], "isError" => false } rescue StandardError => e @on_error.call(e) { "content" => [{ "type" => "text", "text" => RequestCapability::ACK }], "isError" => false } end |
#at_exit_drain ⇒ Object
Invoked from the at_exit hook registered by Mcpeye.track. Flushes unless the tracker was already stopped, so a manual #stop never double-flushes. Public so the auto-drain behavior is testable.
598 599 600 601 602 603 604 605 |
# File 'lib/mcpeye/tracker.rb', line 598 def at_exit_drain return if @stopped flush rescue StandardError => e @on_error.call(e) nil end |
#capture_missing_capabilities? ⇒ Boolean
Whether the reserved mcpeye_request_capability tool is enabled. Read by the official-server capture module (which lives outside the Tracker instance).
351 352 353 |
# File 'lib/mcpeye/tracker.rb', line 351 def capture_missing_capabilities? @capture_missing_capabilities end |
#flush ⇒ Object
POST the buffered events as a single IngestPayload. No-op (returns nil) when the buffer is empty or ingest_url is unset. Returns the Net::HTTPResponse on success. NEVER raises: transport failures requeue the batch at the front (order-preserved, subject to the cap) and report via on_error; a poison (unserializable) batch is dropped with a report.
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 |
# File 'lib/mcpeye/tracker.rb', line 515 def flush if blank?(@ingest_url) report_once( :url_blank, RuntimeError.new("ingest_url not configured — events stay buffered, nothing is flowing") ) return nil end batch = nil @mutex.synchronize do return nil if @buffer.empty? batch = @buffer @buffer = [] end warn_secret_once if blank?(@ingest_secret) body = begin JSON.generate(build_payload(batch)) rescue StandardError => e # Structurally un-serializable payload: drop it (not re-buffered, to # avoid a poison-batch loop), report, never raise. @on_error.call(e) return nil end begin response = post_ingest(body) code = response.code.to_i raise "ingest responded #{code}" unless (200..299).cover?(code) response rescue StandardError => e @on_error.call(e) requeue(batch) nil end end |
#harvest_host_intent(tool_name, args) ⇒ Object
Resolve the eligible host-intent value to promote for a call, or nil. Returns
- param_name, value
-
when the tool has an eligible, non-denylisted host field
whose arg is a non-empty string; else nil. Reads string OR symbol arg keys. Mirrors TS harvestHostIntent (strictly map-driven once a tools/list has run), plus an explicit-config fallback for the no-schema path (#wrap): there no tools/list ever populated the map, so an EXPLICIT host_intent_param is honored directly against the call args (the integrator opted in). On the schema paths (official/duck) @host_intent_listed is true, so we stay map-only — a non-string or absent explicit field is NOT harvested from a stray runtime arg, exactly like TS. Gated auto-detect always requires a map entry.
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 |
# File 'lib/mcpeye/tracker.rb', line 1365 public def harvest_host_intent(tool_name, args) param = host_intent_param_for(tool_name) param = @host_intent_explicit if param.nil? && !@host_intent_listed return nil if param.nil? # PII guard: never promote a denylisted field name into the standalone intent. return nil if Intent.denied_field?(param, @denylist_fields) return nil unless args.is_a?(Hash) v = args[param] v = args[param.to_sym] if v.nil? && param.is_a?(String) return [param, v] if v.is_a?(String) && !v.strip.empty? nil rescue StandardError nil end |
#host_intent_param_for(tool_name) ⇒ Object
The eligible host-intent param for a tool (call-time lookup), or nil. Public so the duck-wrap proc and the official call hook — which resolve eligibility at CALL time against this mutable tracker — share one source of truth.
1351 1352 1353 |
# File 'lib/mcpeye/tracker.rb', line 1351 public def host_intent_param_for(tool_name) @host_intent_tools[tool_name.to_s] end |
#inject_intent_param(server) ⇒ Object
Inject the optional mcpeyeIntent property into every discoverable tool’s input schema. Returns the server (fail-open). Collision-safe (never clobbers a tool that already declares mcpeyeIntent), never touches ‘required`, and skips frozen schemas rather than raising FrozenError into boot.
371 372 373 374 375 376 377 |
# File 'lib/mcpeye/tracker.rb', line 371 def inject_intent_param(server) inject_count(server) server rescue StandardError => e @on_error.call(e) server end |
#instrument(server) ⇒ Object
Best-effort instrumentation of an MCP server object. Injects the mcpeyeIntent param into discoverable tool schemas and wraps discoverable handlers so calls are captured automatically. Returns the server unchanged (fail-open) when no shape matches — you can still use #wrap / #record. Reports once if nothing could be introspected.
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/mcpeye/tracker.rb', line 330 def instrument(server) # Official mcp gem (MCP::Server + MCP::Tool subclasses): dispatch-level, # per-server capture. Distinct from the duck-typed path below, whose tools # are handler-Hashes the official gem never produces. return instrument_official(server) if official_mcp_server?(server) injected = inject_count(server) # Add the reserved tool AFTER intent injection (so it never gets an mcpeyeIntent # param) and BEFORE handler wrapping (so its pre-wrapped handler is skipped, not # double-wrapped). A no-op when capture_missing_capabilities is off. append_request_capability_tool(server) wrapped = wrap_handler_count(server) warn_uninstrumentable(server) if injected.zero? && wrapped.zero? server rescue StandardError => e @on_error.call(e) server end |
#own_intent_tool?(tool_name) ⇒ Boolean
True when a tool declares its OWN mcpeyeIntent param (a collision). Read by the official call hook so it never strips a field the tool legitimately owns and never host-harvests for a collision tool (precedence: collision wins).
1392 1393 1394 |
# File 'lib/mcpeye/tracker.rb', line 1392 public def own_intent_tool?(tool_name) @own_intent_tools.key?(tool_name.to_s) end |
#pending ⇒ Object
Number of events waiting to be flushed (useful in tests / Rails shutdown).
608 609 610 |
# File 'lib/mcpeye/tracker.rb', line 608 def pending @mutex.synchronize { @buffer.length } end |
#record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, intent_source: nil, duration_ms: nil) ⇒ Object
Capture a single tool call into the buffer (redacting + size-bounding if enabled). Triggers an eager flush once the buffer hits the threshold: woken on the background thread when one is running (off the hot path), otherwise flushed SYNCHRONOUSLY on this (the caller’s) thread — set flush_interval: to avoid that blocking POST. Returns the captured event Hash, or nil if the event could not be built (reported via on_error, dropped) — never raises into the host.
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 |
# File 'lib/mcpeye/tracker.rb', line 474 def record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, intent_source: nil, duration_ms: nil) event = build_event(tool_name, arguments, result: result, is_error: is_error, error_message: , intent: intent, intent_source: intent_source, duration_ms: duration_ms) flush_inline = false @mutex.synchronize do @buffer << event trim_locked if @buffer.length >= @flush_threshold if @flush_thread&.alive? # Ask the background thread to drain now, off the hot path. The sticky # flag means the request is honored even if it arrives mid-POST. @flush_requested = true @cond.signal else flush_inline = true end end end flush if flush_inline event rescue StandardError => e # A redaction/serialization error must never propagate into the host # handler: report it, drop this one event, and return nil. @on_error.call(e) nil end |
#reserved_host_owned? ⇒ Boolean
Read by the official call hook (which lives outside the Tracker instance) to decide whether to forward a reserved-name call to the host (collision) vs answer it locally.
1385 1386 1387 |
# File 'lib/mcpeye/tracker.rb', line 1385 public def reserved_host_owned? @reserved_host_owned end |
#start_flush_thread ⇒ Object
Start the background flush thread (no-op if flush_interval is unset or a live thread already exists). Safe to call after a fork (the inherited thread is dead, so a fresh one is spawned) — call it from on_worker_boot in a forking server (Puma/Unicorn). Returns the thread (or nil when disabled).
561 562 563 564 565 566 567 |
# File 'lib/mcpeye/tracker.rb', line 561 def start_flush_thread return nil if @flush_interval.nil? return @flush_thread if @flush_thread&.alive? @stopped = false @flush_thread = Thread.new { flush_loop } end |
#stop ⇒ Object
Stop the background thread (bounded wait) and do a final flush. Idempotent.
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/mcpeye/tracker.rb', line 570 def stop @mutex.synchronize do @stopped = true @cond.broadcast end if @flush_thread begin @flush_thread.join(2) rescue StandardError nil end @flush_thread = nil end flush nil rescue StandardError => e @on_error.call(e) nil end |
#stopped? ⇒ Boolean
Whether #stop has been called (so the auto at_exit drain can skip).
591 592 593 |
# File 'lib/mcpeye/tracker.rb', line 591 def stopped? @stopped end |
#wrap(tool_name, own_intent: false, &handler) ⇒ Object
Wrap an arbitrary tool handler block so the call is captured.
handler = tracker.wrap("search_contacts") { |args| do_the_real_work(args) }
The returned proc takes the tool arguments Hash, strips the injected mcpeyeIntent out as ‘intent`, runs the original handler with the cleaned arguments, records the captured call (redacted), and returns the original result unchanged. A handler that raises is recorded as is_error and the identical exception is re-raised; a result-level isError is captured with the result omitted. Capture never raises into the host.
When ‘own_intent: true` the tool legitimately declares its OWN `mcpeyeIntent` parameter, so we pass the arguments through verbatim (never strip it — that would break the tool) and do not claim its value as agent intent. The auto-instrument path sets this for colliding tools; the manual path defaults to stripping.
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
# File 'lib/mcpeye/tracker.rb', line 395 def wrap(tool_name, own_intent: false, &handler) raise ArgumentError, "a handler block is required" unless block_given? wrapped = proc do |args = {}| # `intent_source` is the provenance of `intent`: "mcpeye" (our injected # param) | "native" (a harvested host field) | nil (no intent). # `captured_args` is what we RECORD — it may omit a harvested host field so # the worker summary doesn't double-count it; the HANDLER still receives that # field via `cleaned` (it may be a required arg). Never mutate `cleaned`. intent_source = nil if own_intent # Collision: the tool owns mcpeyeIntent. Pass args through verbatim, do not # claim its value as agent intent, and (per precedence) never host-harvest. cleaned = args.is_a?(Hash) ? args : {} intent = nil captured_args = cleaned else cleaned, intent = split_intent(args) if intent # mcpeye's own param wins whenever the agent filled it. intent_source = Intent::INTENT_SOURCE_MCPEYE captured_args = cleaned else # FALLBACK: the agent left mcpeyeIntent empty — harvest the host server's # own analytics-intent field, if this tool has an eligible one. Resolved # at CALL time against the (mutable) tracker, since host eligibility is # learned at LIST time, after this wrapper was created. harvested = harvest_host_intent(tool_name, cleaned) if harvested intent = harvested[1] intent_source = Intent::INTENT_SOURCE_NATIVE # Omit the promoted field from the CAPTURED copy ONLY. The handler still # gets it via `cleaned`. captured_args = cleaned.dup captured_args.delete(harvested[0]) captured_args.delete(harvested[0].to_sym) if harvested[0].is_a?(String) else captured_args = cleaned end end end started = monotonic_ms begin result = handler.call(cleaned) rescue StandardError => e # Host handler raised: record the failure, then re-raise the identical # exception so the agent/client sees the real error (never swallowed). record(tool_name, captured_args, is_error: true, error_message: e., intent: intent, intent_source: intent_source, duration_ms: monotonic_ms - started) raise end is_err, err_msg = result_error_info(result) if is_err record(tool_name, captured_args, is_error: true, error_message: err_msg, intent: intent, intent_source: intent_source, duration_ms: monotonic_ms - started) else record(tool_name, captured_args, result: result, intent: intent, intent_source: intent_source, duration_ms: monotonic_ms - started) end result end # Mark the proc so wrap_handler_count never double-wraps it (a second # instrument, or track + a manual instrument, would otherwise stack wrappers # and double-capture every call with distinct callIds the server can't dedup). wrapped.define_singleton_method(:mcpeye_wrapped?) { true } wrapped end |