Class: PgReports::QueryMonitor

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pg_reports/query_monitor.rb

Constant Summary collapse

CACHE_KEY_ENABLED =
"pg_reports:query_monitor:enabled"
CACHE_KEY_SESSION_ID =
"pg_reports:query_monitor:session_id"
CACHE_TTL =
24.hours

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQueryMonitor

Returns a new instance of QueryMonitor.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pg_reports/query_monitor.rb', line 15

def initialize
  @subscriber = nil
  @mutex = Mutex.new
  @queries = []
  @handling_event = false

  # Local state — used by the event handler to avoid cache reads
  # (which generate SQL events and cause infinite recursion with DB-backed caches)
  @enabled = false
  @session_id = nil

  sync_from_cache
  ensure_subscription_if_enabled
end

Instance Attribute Details

#enabledObject (readonly)

Returns the value of attribute enabled.



30
31
32
# File 'lib/pg_reports/query_monitor.rb', line 30

def enabled
  @enabled
end

#session_idObject (readonly)

Returns the value of attribute session_id.



32
33
34
# File 'lib/pg_reports/query_monitor.rb', line 32

def session_id
  @session_id
end

Instance Method Details

#load_from_log(session_id: nil, limit: nil) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/pg_reports/query_monitor.rb', line 129

def load_from_log(session_id: nil, limit: nil)
  return [] unless log_file_enabled?
  return [] unless File.exist?(log_file_path)

  queries = []

  begin
    File.open(log_file_path, "r") do |f|
      f.each_line do |line|
        entry = JSON.parse(line.strip, symbolize_names: true)
        next unless entry[:type] == "query"

        # Filter by session_id if provided
        next if session_id && entry[:session_id] != session_id

        queries << entry
      rescue JSON::ParserError
        # Skip malformed lines
        next
      end
    end

    # Limit results if requested
    queries = queries.last(limit) if limit

    queries
  rescue => e
    Rails.logger.warn("PgReports: Failed to load queries from log: #{e.message}") if defined?(Rails)
    []
  end
end

#queries(limit: nil, session_id: nil) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/pg_reports/query_monitor.rb', line 113

def queries(limit: nil, session_id: nil)
  result = @queries.dup

  # Filter by session_id if provided
  if session_id
    result = result.select { |q| q[:session_id] == session_id }
  end

  # Limit results if requested
  if limit
    result = result.last(limit)
  end

  result
end

#startObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/pg_reports/query_monitor.rb', line 34

def start
  @mutex.synchronize do
    if enabled
      Rails.logger.info("PgReports: Monitoring already active, session_id=#{session_id}") if defined?(Rails)
      return {success: false, message: "Monitoring already active"}
    end

    new_session_id = SecureRandom.uuid
    @queries = []

    # Update local state first (used by event handler — no cache round-trip)
    @enabled = true
    @session_id = new_session_id

    # Store state in cache so other processes can see it
    cache_write(CACHE_KEY_ENABLED, true)
    cache_write(CACHE_KEY_SESSION_ID, new_session_id)

    Rails.logger.info("PgReports: Monitoring started, session_id=#{new_session_id}") if defined?(Rails)

    # Subscribe to sql.active_record events in THIS process
    ensure_subscription

    # Write session start marker to file
    write_session_marker("session_start")

    {success: true, message: "Query monitoring started", session_id: new_session_id}
  end
rescue => e
  @enabled = false
  @session_id = nil
  cache_write(CACHE_KEY_ENABLED, false)
  {success: false, error: e.message}
end

#statusObject



105
106
107
108
109
110
111
# File 'lib/pg_reports/query_monitor.rb', line 105

def status
  {
    enabled: enabled,
    session_id: session_id,
    query_count: @queries.size
  }
end

#stopObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/pg_reports/query_monitor.rb', line 69

def stop
  @mutex.synchronize do
    unless enabled
      return {success: false, message: "Monitoring not active"}
    end

    current_session_id = @session_id

    # Clear local state immediately — stops event handler from processing
    @enabled = false
    @session_id = nil

    # Unsubscribe from notifications in THIS process
    if @subscriber
      ActiveSupport::Notifications.unsubscribe(@subscriber)
      @subscriber = nil
    end

    # Write session end marker to file
    write_session_marker("session_end", current_session_id)

    # Flush queries to file
    flush_to_file

    # Clear state from cache so other processes see it
    cache_delete(CACHE_KEY_ENABLED)
    cache_delete(CACHE_KEY_SESSION_ID)

    @queries = []

    {success: true, message: "Query monitoring stopped", session_id: current_session_id}
  end
rescue => e
  {success: false, error: e.message}
end