Module: DebugMcp::NotificationsSubscriber
- Defined in:
- lib/debug_mcp/notifications_subscriber.rb
Constant Summary collapse
- BUFFER_MAX =
1000- VERSION =
Bumped whenever INJECTION_CODE’s structure changes so that an older buffer module already injected into a long-running target process is replaced instead of silently kept. The injected module exposes ‘.version` and the injection guard re-defines the module when the versions differ.
"2"- STORE_SQL_MAX =
Save-time caps applied inside the target process, before JSON encoding. These bound transport size (the debug socket is line-oriented — see parse_json_array) and limit how much raw SQL / path text we retain.
2000- STORE_PATH_MAX =
1000- SUBSCRIBED_EVENTS =
%w[ sql.active_record render_template.action_view render_partial.action_view render_collection.action_view cache_read.active_support cache_write.active_support cache_fetch_hit.active_support cache_generate.active_support cache_delete.active_support enqueue.active_job start_processing.action_controller process_action.action_controller ].freeze
- INJECTION_CODE =
Code injected (via Base64 eval) into the target process. It defines a process-global ::DebugMcpNotificationsBuffer module that subscribes to ActiveSupport::Notifications and buffers recent events.
Lifecycle notes (these fix bugs found by machine verification):
-
Module DEFINITION and subscriber ACTIVATION are separated. The ‘.install` call lives OUTSIDE the `unless defined?` guard and is always invoked, so a module that was defined but left with zero subscriptions (e.g. a previous install attempt raised in trap context) recovers on the next injection. `.install` itself is idempotent via `@subscriptions.any?`.
-
Reads never block: ‘safe_read` uses Mutex#try_lock and falls back to a lockless read. A blocking `synchronize` would deadlock if a debugger- stopped thread is holding the mutex; the process is paused during reads, so no writer is actually running and a lockless read is safe.
-
A version mismatch re-defines the module (after uninstalling the old one).
-
<<~RUBY unless defined?(::DebugMcpNotificationsBuffer) && ::DebugMcpNotificationsBuffer.respond_to?(:version) && ::DebugMcpNotificationsBuffer.version == #{VERSION.inspect} if defined?(::DebugMcpNotificationsBuffer) && ::DebugMcpNotificationsBuffer.respond_to?(:uninstall) ::DebugMcpNotificationsBuffer.uninstall rescue nil end module ::DebugMcpNotificationsBuffer class << self attr_reader :buffer, :subscriptions def version; #{VERSION.inspect}; end def buffer_max; #{BUFFER_MAX}; end def subscribed_events; #{SUBSCRIBED_EVENTS.inspect}.freeze; end def init! @buffer = [] @subscriptions = [] @mutex = Mutex.new @seq = 0 @dropped = 0 @installed_at = nil @buffer_started_at = Time.now.to_f end def push(event) @mutex.synchronize do @seq += 1 event[:seq] = @seq @buffer << event while @buffer.size > buffer_max @buffer.shift @dropped += 1 end end end # Read without ever blocking. If try_lock fails the mutex is held by # a debugger-stopped thread; we read locklessly (safe while paused). def safe_read locked = @mutex.try_lock begin yield ensure @mutex.unlock if locked end end def fetch_by_request_id(request_id) safe_read { @buffer.select { |e| e[:request_id] == request_id } } end def fetch_since(timestamp) safe_read { @buffer.select { |e| e[:timestamp] >= timestamp } } end def fetch_last(n) safe_read { @buffer.last(n) } end def fetch_after_seq(cursor) safe_read { @buffer.select { |e| e[:seq] > cursor } } end def clear @mutex.synchronize { @buffer.clear } end def metadata safe_read do { version: version, installed: @subscriptions.any?, installed_at: @installed_at, buffer_started_at: @buffer_started_at, buffer_size: @buffer.size, buffer_max: buffer_max, dropped_count: @dropped, oldest_seq: (@buffer.first && @buffer.first[:seq]), newest_seq: (@buffer.last && @buffer.last[:seq]), last_seq: @seq, subscriptions_count: @subscriptions.size, } end end def install return if @subscriptions.any? callback = lambda do |name, started, finished, _id, payload| req_id = extract_request_id(name, payload) src = Thread.current[:_debug_mcp_event_source] || :request push({ name: name, timestamp: started.to_f, duration_ms: ((finished.to_f - started.to_f) * 1000).round(2), request_id: req_id, source: src, data: sanitize_payload(name, payload), }) Thread.current[:_debug_mcp_request_id] = nil if name == "process_action.action_controller" rescue StandardError # never raise from subscriber callback end subscribed_events.each do |event_name| @subscriptions << ::ActiveSupport::Notifications.subscribe(event_name, &callback) end @installed_at = Time.now.to_f end def uninstall @subscriptions.each { |s| ::ActiveSupport::Notifications.unsubscribe(s) } @subscriptions.clear end def extract_request_id(name, payload) return nil unless payload.is_a?(Hash) req_id = payload[:request_id] if name == "start_processing.action_controller" && payload[:headers] h = payload[:headers] req_id ||= safe_header_lookup(h, "action_dispatch.request_id") req_id ||= safe_header_lookup(h, "HTTP_X_REQUEST_ID") Thread.current[:_debug_mcp_request_id] = req_id if req_id end req_id || Thread.current[:_debug_mcp_request_id] end def safe_header_lookup(headers, key) headers[key] rescue StandardError nil end def sanitize_payload(name, payload) return {} unless payload.is_a?(Hash) case name when "sql.active_record" { sql: truncate_text(payload[:sql].to_s, #{STORE_SQL_MAX}), query_name: payload[:name].to_s, cached: payload[:cached] ? true : false, binds: safe_binds(payload[:type_casted_binds] || payload[:binds]), } when /\\Arender_/ { identifier: payload[:identifier].to_s, layout: payload[:layout]&.to_s, count: payload[:count], } when /\\Acache_/ { key: payload[:key].to_s[0, 200], hit: payload[:hit], store: payload[:store]&.to_s, } when "enqueue.active_job" job = payload[:job] { job_class: (job.class.name rescue nil), job_id: (job.respond_to?(:job_id) ? job.job_id : nil), queue_name: (job.respond_to?(:queue_name) ? job.queue_name : nil), arguments: (job.respond_to?(:arguments) ? job.arguments : []).map { |a| safe_inspect(a, 100) }, } when "start_processing.action_controller", "process_action.action_controller" { controller: payload[:controller], action: payload[:action], method: payload[:method], path: truncate_text(payload[:path].to_s, #{STORE_PATH_MAX}), format: payload[:format].to_s, status: payload[:status], view_runtime: payload[:view_runtime], db_runtime: payload[:db_runtime], } else {} end rescue StandardError => e { error: "payload_sanitize_failed: \#{e.class}" } end def truncate_text(str, limit) return str if str.length <= limit str[0, limit] + "...[truncated \#{str.length - limit} chars]" rescue StandardError "<untruncatable>" end def safe_binds(binds) return [] unless binds.respond_to?(:each) binds.map { |b| safe_inspect(b, 100) } rescue StandardError [] end def safe_inspect(obj, limit) s = obj.inspect s.length > limit ? s[0, limit] + "..." : s rescue StandardError "<uninspectable>" end end init! end end ::DebugMcpNotificationsBuffer.install RUBY
Class Method Summary collapse
-
.fetch_after_seq(client, cursor, limit = nil) ⇒ Object
Fetch events with seq strictly greater than cursor.
-
.fetch_by_request_id(client, request_id) ⇒ Object
Fetch events for a request_id.
-
.fetch_last(client, n) ⇒ Object
Fetch the last n buffered events.
-
.fetch_since(client, timestamp) ⇒ Object
Fetch events fired at-or-after the given timestamp.
-
.install(client) ⇒ Object
Inject the subscriber into the Rails process and activate it.
-
.metadata(client) ⇒ Object
Fetch subscriber metadata (version, installed_at, buffer_size, dropped_count, seq cursors, …).
-
.uninstall(client) ⇒ Object
Remove the subscriber.
Class Method Details
.fetch_after_seq(client, cursor, limit = nil) ⇒ Object
Fetch events with seq strictly greater than cursor. Clock-independent cursor pagination — pass the previous response’s newest_seq. An optional limit caps the result in the TARGET process (oldest-after-cursor first), so a busy buffer never serializes all 1000 events just to be sliced later.
337 338 339 340 341 342 343 344 345 346 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 337 def fetch_after_seq(client, cursor, limit = nil) selector = "::DebugMcpNotificationsBuffer.fetch_after_seq(#{cursor.to_i})" selector += ".first(#{limit.to_i})" if limit code = RailsHelper.json_command( "defined?(::DebugMcpNotificationsBuffer) ? #{selector}.to_json : '[]'", ) RailsHelper.decode_json_result(client.send_command(code), []) rescue DebugMcp::Error [] end |
.fetch_by_request_id(client, request_id) ⇒ Object
Fetch events for a request_id. Returns array of event hashes (symbolized keys).
296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 296 def fetch_by_request_id(client, request_id) return [] unless request_id code = RailsHelper.json_command( "defined?(::DebugMcpNotificationsBuffer) ? " \ "::DebugMcpNotificationsBuffer.fetch_by_request_id(#{request_id.inspect}).to_json : '[]'", ) RailsHelper.decode_json_result(client.send_command(code), []) rescue DebugMcp::Error [] end |
.fetch_last(client, n) ⇒ Object
Fetch the last n buffered events. Clock-independent.
323 324 325 326 327 328 329 330 331 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 323 def fetch_last(client, n) code = RailsHelper.json_command( "defined?(::DebugMcpNotificationsBuffer) ? " \ "::DebugMcpNotificationsBuffer.fetch_last(#{n.to_i}).to_json : '[]'", ) RailsHelper.decode_json_result(client.send_command(code), []) rescue DebugMcp::Error [] end |
.fetch_since(client, timestamp) ⇒ Object
Fetch events fired at-or-after the given timestamp. Prefer fetch_last / fetch_after_seq: those are clock-independent, whereas this compares against a client-supplied timestamp and is sensitive to clock skew between the MCP host and the target process.
312 313 314 315 316 317 318 319 320 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 312 def fetch_since(client, ) code = RailsHelper.json_command( "defined?(::DebugMcpNotificationsBuffer) ? " \ "::DebugMcpNotificationsBuffer.fetch_since(#{.to_f}).to_json : '[]'", ) RailsHelper.decode_json_result(client.send_command(code), []) rescue DebugMcp::Error [] end |
.install(client) ⇒ Object
Inject the subscriber into the Rails process and activate it. Idempotent. Returns true on success, false otherwise.
Returns false in signal trap context WITHOUT sending the injection: in trap context ActiveSupport::Notifications.subscribe raises ThreadError (Fanout#subscribe takes an internal mutex), and merely defining the module there would leave it with zero subscriptions. We refuse early so the caller can surface RailsHelper::TRAP_CONTEXT_HINT instead.
273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 273 def install(client) return false unless RailsHelper.rails?(client) return false if RailsHelper.trap_context?(client) encoded = Base64.strict_encode64(INJECTION_CODE) cmd = "p begin; require 'base64'; eval(::Base64.decode64('#{encoded}').force_encoding('UTF-8')); " \ ":debug_mcp_subscriber_ok; rescue => __e; \"\#{__e.class}: \#{__e.message}\"; end" result = client.send_command(cmd) result.include?("debug_mcp_subscriber_ok") rescue DebugMcp::Error false end |
.metadata(client) ⇒ Object
Fetch subscriber metadata (version, installed_at, buffer_size, dropped_count, seq cursors, …). Returns {} if not installed.
350 351 352 353 354 355 356 357 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 350 def (client) code = RailsHelper.json_command( "defined?(::DebugMcpNotificationsBuffer) ? ::DebugMcpNotificationsBuffer.metadata.to_json : '{}'", ) RailsHelper.decode_json_result(client.send_command(code), {}) rescue DebugMcp::Error {} end |
.uninstall(client) ⇒ Object
Remove the subscriber. Best-effort; returns nil on error.
287 288 289 290 291 292 293 |
# File 'lib/debug_mcp/notifications_subscriber.rb', line 287 def uninstall(client) client.send_command( "::DebugMcpNotificationsBuffer.uninstall if defined?(::DebugMcpNotificationsBuffer)", ) rescue DebugMcp::Error nil end |