Class: Mcpeye::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/mcpeye/tracker.rb

Constant Summary collapse

DEFAULT_FLUSH_THRESHOLD =

POST once the buffer reaches this many events (eager flush). When a background flush thread is running it is woken to drain off the hot path; otherwise the flush happens inline on the caller’s thread.

20
DEFAULT_MAX_BUFFER =

Hard cap on buffered events; oldest are dropped past this while the API is down, so a permanently-unreachable ingest can never grow memory without bound and OOM the host. Mirrors the TS/Python maxBufferedEvents.

10_000
MAX_FIELD_BYTES =

A single captured field (arguments or result) is capped at this many UTF-8 bytes. One tool returning a multi-MB blob must never blow the ingest body limit (a permanent 413 requeue loop that halts ALL telemetry) or OOM the buffer — past the cap we ship a small marker. Mirrors TS/Python MAX_FIELD_BYTES.

32_768
REDACT_MARGIN_BYTES =

Margin past the cap to redact, so a secret straddling the cut is still matched in full before truncation. Mirrors TS/Python REDACT_MARGIN_BYTES.

4_096
TRUNCATED_SUFFIX =
"…[truncated]"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project_id, ingest_url: nil, ingest_secret: nil, redact: true, identity: {}, identify: nil, flush_threshold: DEFAULT_FLUSH_THRESHOLD, flush_interval: nil, denylist_fields: [], max_buffer: DEFAULT_MAX_BUFFER, capture_missing_capabilities: true, host_intent_param: true, on_error: nil) ⇒ Tracker

project_id — mcpeye project the data belongs to (required; raises on empty). ingest_url — base URL of the self-hosted mcpeye API (no trailing /ingest).

Defaults to ENV["MCPEYE_INGEST_URL"].

ingest_secret — shared secret sent as x-mcpeye-secret.

Defaults to ENV["MCPEYE_INGEST_SECRET"].

redact — when true (default) scrub arguments/result/intent/error client-side. identity — static Hash { userId:, userEmail:, client:, serverVersion: }

(batch-level fallback).

identify — optional callable for dynamic attribution. Evaluated PER CALL

on the request thread for userId/userEmail (correct per-event
attribution on a multi-user server) AND once per flush for the
batch identity. Return { userId:, userEmail:, client:,
serverVersion: }. A raising identify yields no identity for that
call/flush, never breaks the host. Example:
  identify: -> { { userId: RequestStore.store[:current_user_id] } }

flush_threshold — eager-flush buffer size (default 20). flush_interval — background flush interval in seconds (default nil = no

background thread; stay zero-thread unless asked).

denylist_fields — extra exact field names whose values are always dropped. max_buffer — hard buffer cap (default 10_000). capture_missing_capabilities — when true (default) add the reserved

`mcpeye_request_capability` tool so the agent can voice a
capability no existing tool covers, and answer that call
locally. Set false to keep the extra tool out of the manifest.

on_error — diagnostics sink for swallowed errors. Default warns with a

`[mcpeye]` prefix. Wrapped so it can never throw into the host.

Raises:

  • (ArgumentError)


249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/mcpeye/tracker.rb', line 249

def initialize(project_id,
               ingest_url: nil,
               ingest_secret: nil,
               redact: true,
               identity: {},
               identify: nil,
               flush_threshold: DEFAULT_FLUSH_THRESHOLD,
               flush_interval: nil,
               denylist_fields: [],
               max_buffer: DEFAULT_MAX_BUFFER,
               capture_missing_capabilities: true,
               host_intent_param: true,
               on_error: nil)
  raise ArgumentError, "project_id is required" if project_id.nil? || project_id.to_s.empty?

  @project_id = project_id.to_s
  @ingest_url = (ingest_url || ENV["MCPEYE_INGEST_URL"]).to_s
  @ingest_secret = ingest_secret || ENV["MCPEYE_INGEST_SECRET"]
  @redact = redact
  @identity = normalize_identity(identity)
  @identify = identify
  @flush_threshold = flush_threshold
  @flush_interval = flush_interval
  @denylist_fields = denylist_fields || []
  @max_buffer = max_buffer
  @capture_missing_capabilities = capture_missing_capabilities
  # Host-intent coexistence config (explicit field name and/or gated auto-detect):
  #   true  => gated auto-detect of "intent"
  #   false => OFF (never harvest)
  #   "str" => explicit field name, BYPASS gate (works even on no-schema paths)
  @host_intent_explicit, @host_intent_detect = parse_host_intent_config(host_intent_param)
  @on_error = wrap_on_error(on_error)

  @buffer = []
  @mutex = Mutex.new
  # Eager-flush signalling for the background thread: a sticky flag (set under
  # @mutex when the threshold is hit) + a ConditionVariable, so a wakeup raised
  # while the thread is mid-POST is never lost (TS/Python parity — Python uses a
  # persistent Event for the same reason).
  @cond = ConditionVariable.new
  @flush_requested = false
  @flush_thread = nil
  @stopped = false
  @drop_warned = false
  @reported_once = {}
  # Names of tools that declare their OWN `mcpeyeIntent` param (a collision with
  # our reserved name). Populated during schema inspection; consulted on the
  # auto-wrap path so we never strip a field the tool legitimately owns.
  @own_intent_tools = {}
  # Names WE injected mcpeyeIntent into, so a second instrument pass doesn't
  # misclassify our own injected param as tool-owned (mirrors Python's
  # injected_tools).
  @injected_tools = {}
  # Per-tool map { tool_name => host_intent_param_name } of fields to harvest as
  # a FALLBACK intent. REBUILT on every tools/list (not grow-only like
  # @own_intent_tools) so a schema change can't leave a stale-eligible field.
  # Consulted at tools/call only when mcpeyeIntent came back empty.
  @host_intent_tools = {}
  # True when the HOST already exposes a tool named mcpeye_request_capability (a
  # collision). Recorded from the official tools/list (mirrors the duck path's
  # reserved-owned handling); when true the official call path forwards the
  # reserved name to the host instead of answering locally.
  @reserved_host_owned = false
  # Latch: emit the one-time loud "harvesting host intent" activation log once.
  @host_intent_activated = false
  # Latch: warn once if an explicit host_intent_param matched no listed tool.
  @host_intent_explicit_warned = false
  # Latch: true once a tools/list ever populated the detection map. On the
  # schema paths (official/duck) this makes harvest strictly map-driven like
  # the TS reference; the explicit-config fallback applies ONLY on a no-schema
  # path (#wrap), where no tools/list ever ran.
  @host_intent_listed = false
  # MCPEYE_DEBUG=intent: log per-tool detection reason codes.
  @debug_intent = intent_debug_enabled?
end

Instance Attribute Details

#identityObject (readonly)

Returns the value of attribute identity.



221
222
223
# File 'lib/mcpeye/tracker.rb', line 221

def identity
  @identity
end

#ingest_urlObject (readonly)

Returns the value of attribute ingest_url.



221
222
223
# File 'lib/mcpeye/tracker.rb', line 221

def ingest_url
  @ingest_url
end

#project_idObject (readonly)

Returns the value of attribute project_id.



221
222
223
# File 'lib/mcpeye/tracker.rb', line 221

def project_id
  @project_id
end

#redactObject (readonly)

Returns the value of attribute redact.



221
222
223
# File 'lib/mcpeye/tracker.rb', line 221

def redact
  @redact
end

Instance Method Details

#answer_request_capability_official(_args = {}) ⇒ Object

Record a reserved-capability call and return the canned ack as a plain CallToolResult Hash. The official gem serializes call_tool’s return value directly (it does NOT call #to_h), so a Hash — not a Tool::Response object —is what reaches the client.



359
360
361
362
363
364
365
# File 'lib/mcpeye/tracker.rb', line 359

def answer_request_capability_official(_args = {})
  handle_request_capability(_args)
  { "content" => [{ "type" => "text", "text" => RequestCapability::ACK }], "isError" => false }
rescue StandardError => e
  @on_error.call(e)
  { "content" => [{ "type" => "text", "text" => RequestCapability::ACK }], "isError" => false }
end

#at_exit_drainObject

Invoked from the at_exit hook registered by Mcpeye.track. Flushes unless the tracker was already stopped, so a manual #stop never double-flushes. Public so the auto-drain behavior is testable.



598
599
600
601
602
603
604
605
# File 'lib/mcpeye/tracker.rb', line 598

def at_exit_drain
  return if @stopped

  flush
rescue StandardError => e
  @on_error.call(e)
  nil
end

#capture_missing_capabilities?Boolean

Whether the reserved mcpeye_request_capability tool is enabled. Read by the official-server capture module (which lives outside the Tracker instance).

Returns:

  • (Boolean)


351
352
353
# File 'lib/mcpeye/tracker.rb', line 351

def capture_missing_capabilities?
  @capture_missing_capabilities
end

#flushObject

POST the buffered events as a single IngestPayload. No-op (returns nil) when the buffer is empty or ingest_url is unset. Returns the Net::HTTPResponse on success. NEVER raises: transport failures requeue the batch at the front (order-preserved, subject to the cap) and report via on_error; a poison (unserializable) batch is dropped with a report.



515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
# File 'lib/mcpeye/tracker.rb', line 515

def flush
  if blank?(@ingest_url)
    report_once(
      :url_blank,
      RuntimeError.new("ingest_url not configured — events stay buffered, nothing is flowing")
    )
    return nil
  end

  batch = nil
  @mutex.synchronize do
    return nil if @buffer.empty?

    batch = @buffer
    @buffer = []
  end

  warn_secret_once if blank?(@ingest_secret)

  body =
    begin
      JSON.generate(build_payload(batch))
    rescue StandardError => e
      # Structurally un-serializable payload: drop it (not re-buffered, to
      # avoid a poison-batch loop), report, never raise.
      @on_error.call(e)
      return nil
    end

  begin
    response = post_ingest(body)
    code = response.code.to_i
    raise "ingest responded #{code}" unless (200..299).cover?(code)

    response
  rescue StandardError => e
    @on_error.call(e)
    requeue(batch)
    nil
  end
end

#harvest_host_intent(tool_name, args) ⇒ Object

Resolve the eligible host-intent value to promote for a call, or nil. Returns

param_name, value

when the tool has an eligible, non-denylisted host field

whose arg is a non-empty string; else nil. Reads string OR symbol arg keys. Mirrors TS harvestHostIntent (strictly map-driven once a tools/list has run), plus an explicit-config fallback for the no-schema path (#wrap): there no tools/list ever populated the map, so an EXPLICIT host_intent_param is honored directly against the call args (the integrator opted in). On the schema paths (official/duck) @host_intent_listed is true, so we stay map-only — a non-string or absent explicit field is NOT harvested from a stray runtime arg, exactly like TS. Gated auto-detect always requires a map entry.



1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
# File 'lib/mcpeye/tracker.rb', line 1365

public def harvest_host_intent(tool_name, args)
  param = host_intent_param_for(tool_name)
  param = @host_intent_explicit if param.nil? && !@host_intent_listed
  return nil if param.nil?
  # PII guard: never promote a denylisted field name into the standalone intent.
  return nil if Intent.denied_field?(param, @denylist_fields)
  return nil unless args.is_a?(Hash)

  v = args[param]
  v = args[param.to_sym] if v.nil? && param.is_a?(String)
  return [param, v] if v.is_a?(String) && !v.strip.empty?

  nil
rescue StandardError
  nil
end

#host_intent_param_for(tool_name) ⇒ Object

The eligible host-intent param for a tool (call-time lookup), or nil. Public so the duck-wrap proc and the official call hook — which resolve eligibility at CALL time against this mutable tracker — share one source of truth.



1351
1352
1353
# File 'lib/mcpeye/tracker.rb', line 1351

public def host_intent_param_for(tool_name)
  @host_intent_tools[tool_name.to_s]
end

#inject_intent_param(server) ⇒ Object

Inject the optional mcpeyeIntent property into every discoverable tool’s input schema. Returns the server (fail-open). Collision-safe (never clobbers a tool that already declares mcpeyeIntent), never touches ‘required`, and skips frozen schemas rather than raising FrozenError into boot.



371
372
373
374
375
376
377
# File 'lib/mcpeye/tracker.rb', line 371

def inject_intent_param(server)
  inject_count(server)
  server
rescue StandardError => e
  @on_error.call(e)
  server
end

#instrument(server) ⇒ Object

Best-effort instrumentation of an MCP server object. Injects the mcpeyeIntent param into discoverable tool schemas and wraps discoverable handlers so calls are captured automatically. Returns the server unchanged (fail-open) when no shape matches — you can still use #wrap / #record. Reports once if nothing could be introspected.



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/mcpeye/tracker.rb', line 330

def instrument(server)
  # Official mcp gem (MCP::Server + MCP::Tool subclasses): dispatch-level,
  # per-server capture. Distinct from the duck-typed path below, whose tools
  # are handler-Hashes the official gem never produces.
  return instrument_official(server) if official_mcp_server?(server)

  injected = inject_count(server)
  # Add the reserved tool AFTER intent injection (so it never gets an mcpeyeIntent
  # param) and BEFORE handler wrapping (so its pre-wrapped handler is skipped, not
  # double-wrapped). A no-op when capture_missing_capabilities is off.
  append_request_capability_tool(server)
  wrapped = wrap_handler_count(server)
  warn_uninstrumentable(server) if injected.zero? && wrapped.zero?
  server
rescue StandardError => e
  @on_error.call(e)
  server
end

#own_intent_tool?(tool_name) ⇒ Boolean

True when a tool declares its OWN mcpeyeIntent param (a collision). Read by the official call hook so it never strips a field the tool legitimately owns and never host-harvests for a collision tool (precedence: collision wins).

Returns:

  • (Boolean)


1392
1393
1394
# File 'lib/mcpeye/tracker.rb', line 1392

public def own_intent_tool?(tool_name)
  @own_intent_tools.key?(tool_name.to_s)
end

#pendingObject

Number of events waiting to be flushed (useful in tests / Rails shutdown).



608
609
610
# File 'lib/mcpeye/tracker.rb', line 608

def pending
  @mutex.synchronize { @buffer.length }
end

#record(tool_name, arguments = {}, result: nil, is_error: false, error_message: nil, intent: nil, intent_source: nil, duration_ms: nil) ⇒ Object

Capture a single tool call into the buffer (redacting + size-bounding if enabled). Triggers an eager flush once the buffer hits the threshold: woken on the background thread when one is running (off the hot path), otherwise flushed SYNCHRONOUSLY on this (the caller’s) thread — set flush_interval: to avoid that blocking POST. Returns the captured event Hash, or nil if the event could not be built (reported via on_error, dropped) — never raises into the host.



474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/mcpeye/tracker.rb', line 474

def record(tool_name, arguments = {},
           result: nil,
           is_error: false,
           error_message: nil,
           intent: nil,
           intent_source: nil,
           duration_ms: nil)
  event = build_event(tool_name, arguments,
                      result: result, is_error: is_error,
                      error_message: error_message, intent: intent,
                      intent_source: intent_source, duration_ms: duration_ms)

  flush_inline = false
  @mutex.synchronize do
    @buffer << event
    trim_locked
    if @buffer.length >= @flush_threshold
      if @flush_thread&.alive?
        # Ask the background thread to drain now, off the hot path. The sticky
        # flag means the request is honored even if it arrives mid-POST.
        @flush_requested = true
        @cond.signal
      else
        flush_inline = true
      end
    end
  end
  flush if flush_inline
  event
rescue StandardError => e
  # A redaction/serialization error must never propagate into the host
  # handler: report it, drop this one event, and return nil.
  @on_error.call(e)
  nil
end

#reserved_host_owned?Boolean

Read by the official call hook (which lives outside the Tracker instance) to decide whether to forward a reserved-name call to the host (collision) vs answer it locally.

Returns:

  • (Boolean)


1385
1386
1387
# File 'lib/mcpeye/tracker.rb', line 1385

public def reserved_host_owned?
  @reserved_host_owned
end

#start_flush_threadObject

Start the background flush thread (no-op if flush_interval is unset or a live thread already exists). Safe to call after a fork (the inherited thread is dead, so a fresh one is spawned) — call it from on_worker_boot in a forking server (Puma/Unicorn). Returns the thread (or nil when disabled).



561
562
563
564
565
566
567
# File 'lib/mcpeye/tracker.rb', line 561

def start_flush_thread
  return nil if @flush_interval.nil?
  return @flush_thread if @flush_thread&.alive?

  @stopped = false
  @flush_thread = Thread.new { flush_loop }
end

#stopObject

Stop the background thread (bounded wait) and do a final flush. Idempotent.



570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/mcpeye/tracker.rb', line 570

def stop
  @mutex.synchronize do
    @stopped = true
    @cond.broadcast
  end
  if @flush_thread
    begin
      @flush_thread.join(2)
    rescue StandardError
      nil
    end
    @flush_thread = nil
  end
  flush
  nil
rescue StandardError => e
  @on_error.call(e)
  nil
end

#stopped?Boolean

Whether #stop has been called (so the auto at_exit drain can skip).

Returns:

  • (Boolean)


591
592
593
# File 'lib/mcpeye/tracker.rb', line 591

def stopped?
  @stopped
end

#wrap(tool_name, own_intent: false, &handler) ⇒ Object

Wrap an arbitrary tool handler block so the call is captured.

handler = tracker.wrap("search_contacts") { |args| do_the_real_work(args) }

The returned proc takes the tool arguments Hash, strips the injected mcpeyeIntent out as ‘intent`, runs the original handler with the cleaned arguments, records the captured call (redacted), and returns the original result unchanged. A handler that raises is recorded as is_error and the identical exception is re-raised; a result-level isError is captured with the result omitted. Capture never raises into the host.

When ‘own_intent: true` the tool legitimately declares its OWN `mcpeyeIntent` parameter, so we pass the arguments through verbatim (never strip it — that would break the tool) and do not claim its value as agent intent. The auto-instrument path sets this for colliding tools; the manual path defaults to stripping.

Raises:

  • (ArgumentError)


395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# File 'lib/mcpeye/tracker.rb', line 395

def wrap(tool_name, own_intent: false, &handler)
  raise ArgumentError, "a handler block is required" unless block_given?

  wrapped = proc do |args = {}|
    # `intent_source` is the provenance of `intent`: "mcpeye" (our injected
    # param) | "native" (a harvested host field) | nil (no intent).
    # `captured_args` is what we RECORD — it may omit a harvested host field so
    # the worker summary doesn't double-count it; the HANDLER still receives that
    # field via `cleaned` (it may be a required arg). Never mutate `cleaned`.
    intent_source = nil
    if own_intent
      # Collision: the tool owns mcpeyeIntent. Pass args through verbatim, do not
      # claim its value as agent intent, and (per precedence) never host-harvest.
      cleaned = args.is_a?(Hash) ? args : {}
      intent = nil
      captured_args = cleaned
    else
      cleaned, intent = split_intent(args)
      if intent
        # mcpeye's own param wins whenever the agent filled it.
        intent_source = Intent::INTENT_SOURCE_MCPEYE
        captured_args = cleaned
      else
        # FALLBACK: the agent left mcpeyeIntent empty — harvest the host server's
        # own analytics-intent field, if this tool has an eligible one. Resolved
        # at CALL time against the (mutable) tracker, since host eligibility is
        # learned at LIST time, after this wrapper was created.
        harvested = harvest_host_intent(tool_name, cleaned)
        if harvested
          intent = harvested[1]
          intent_source = Intent::INTENT_SOURCE_NATIVE
          # Omit the promoted field from the CAPTURED copy ONLY. The handler still
          # gets it via `cleaned`.
          captured_args = cleaned.dup
          captured_args.delete(harvested[0])
          captured_args.delete(harvested[0].to_sym) if harvested[0].is_a?(String)
        else
          captured_args = cleaned
        end
      end
    end
    started = monotonic_ms
    begin
      result = handler.call(cleaned)
    rescue StandardError => e
      # Host handler raised: record the failure, then re-raise the identical
      # exception so the agent/client sees the real error (never swallowed).
      record(tool_name, captured_args, is_error: true, error_message: e.message,
                                       intent: intent, intent_source: intent_source,
                                       duration_ms: monotonic_ms - started)
      raise
    end

    is_err, err_msg = result_error_info(result)
    if is_err
      record(tool_name, captured_args, is_error: true, error_message: err_msg,
                                       intent: intent, intent_source: intent_source,
                                       duration_ms: monotonic_ms - started)
    else
      record(tool_name, captured_args, result: result,
                                       intent: intent, intent_source: intent_source,
                                       duration_ms: monotonic_ms - started)
    end
    result
  end

  # Mark the proc so wrap_handler_count never double-wraps it (a second
  # instrument, or track + a manual instrument, would otherwise stack wrappers
  # and double-capture every call with distinct callIds the server can't dedup).
  wrapped.define_singleton_method(:mcpeye_wrapped?) { true }
  wrapped
end