Class: Takagi::Middleware::Metrics

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/middleware/metrics.rb

Overview

Request/response metrics collector with cross-process aggregation.

Each Metrics instance keeps per-process counters and periodically flushes a JSON snapshot to ‘<output_dir>/<pid>.json`. The class method `Metrics.aggregate(output_dir:)` reads every pid file in the directory and merges them into a single view. This works across `fork`-based worker pools (each worker writes its own file, the devtools HTTP endpoint in the parent reads them all).

All times are stored as Float seconds (monotonic) internally and exposed as milliseconds in snapshots.

Constant Summary collapse

LATENCY_WINDOW =
200
RATE_BUCKETS =
60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(output_dir: self.class.default_output_dir, flush_interval: 0.5, forward_to: nil) ⇒ Metrics

Returns a new instance of Metrics.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/takagi/middleware/metrics.rb', line 29

def initialize(output_dir: self.class.default_output_dir, flush_interval: 0.5, forward_to: nil)
  @metrics = default_metrics
  @mutex = Mutex.new
  @output_dir = output_dir
  @flush_interval = flush_interval
  @forward_to = forward_to
  @pid = Process.pid
  @last_flush = monotonic
  @forward_queue = Queue.new
  @forward_thread = nil
  @forward_thread_url = nil

  FileUtils.mkdir_p(@output_dir) unless File.directory?(@output_dir)
end

Instance Attribute Details

#forward_toObject

Returns the value of attribute forward_to.



27
28
29
# File 'lib/takagi/middleware/metrics.rb', line 27

def forward_to
  @forward_to
end

#metricsObject (readonly)

Returns the value of attribute metrics.



26
27
28
# File 'lib/takagi/middleware/metrics.rb', line 26

def metrics
  @metrics
end

Class Method Details

.aggregate(output_dir: default_output_dir) ⇒ Hash

Read every <output_dir>/*.json file, parse, and merge into a single aggregate snapshot. Stale or malformed files are skipped.

Returns:

  • (Hash)

    aggregated snapshot



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/takagi/middleware/metrics.rb', line 140

def self.aggregate(output_dir: default_output_dir)
  return empty_aggregate unless File.directory?(output_dir)

  totals = empty_aggregate
  alive_pids = alive_process_pids

  Dir["#{output_dir}/*.json"].each do |file|
    data = JSON.parse(File.read(file))
    next unless data.is_a?(Hash)
    # Skip snapshots from PIDs that no longer exist (stale).
    pid = data['pid']
    next if pid && !alive_pids.include?(pid)

    merge_into!(totals, data)
  rescue StandardError, JSON::ParserError
    next
  end
  totals
end

.alive_process_pidsObject



160
161
162
163
164
165
# File 'lib/takagi/middleware/metrics.rb', line 160

def self.alive_process_pids
  require 'etc'
  Dir['/proc/[0-9]*'].map { |e| e.split('/').last.to_i }
rescue StandardError
  []
end

.default_output_dirObject



133
134
135
# File 'lib/takagi/middleware/metrics.rb', line 133

def self.default_output_dir
  ENV.fetch('TAKAGI_METRICS_DIR', File.join(Dir.tmpdir, 'takagi-metrics'))
end

.empty_aggregateObject

class-level helpers for aggregation ===



363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/takagi/middleware/metrics.rb', line 363

def self.empty_aggregate
  {
    requests: 0,
    latency_ms: 0.0,
    last_code: nil,
    responses: { '2xx' => 0, '4xx' => 0, '5xx' => 0, 'other' => 0 },
    obs_events: 0,
    latency_avg_ms: 0.0,
    latency_series: [],
    current_rps: 0.0,
    routes: [],
    pids: []
  }
end

.merge_into!(totals, data) ⇒ Object



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/takagi/middleware/metrics.rb', line 378

def self.merge_into!(totals, data)
  totals[:requests] += data['requests'].to_i
  # Last seen wins (whichever process wrote last has the freshest latency)
  if data['latency_ms']
    totals[:latency_ms] = data['latency_ms'].to_f if data['latency_ms'].to_f > totals[:latency_ms]
  end
  if data['last_code'] && !totals[:last_code]
    totals[:last_code] = data['last_code']
  end

  if data['responses'].is_a?(Hash)
    data['responses'].each do |k, v|
      totals[:responses][k] = (totals[:responses][k] || 0) + v.to_i
    end
  end

  totals[:obs_events] += data['obs_events'].to_i

  if data['latency_series'].is_a?(Array)
    combined = totals[:latency_series] + data['latency_series']
    combined.sort_by! { |p| p['x'] || p[:x] }
    totals[:latency_series] = combined.last(LATENCY_WINDOW)
  end

  totals[:current_rps] += data['current_rps'].to_f

  if data['routes'].is_a?(Array)
    data['routes'].each do |route|
      existing = totals[:routes].find { |r| r['route'] == route['route'] }
      if existing
        existing['count'] = (existing['count'] || 0) + route['count'].to_i
        existing_count_total = (existing['count'] || 0)
        existing_count_old = existing_count_total - route['count'].to_i
        if existing_count_old.positive?
          total_lat = (existing['avg_ms'] || 0) * existing_count_old +
                      (route['avg_ms'] || 0) * route['count'].to_i
          existing['avg_ms'] = (total_lat / existing_count_total).round(3)
        else
          existing['avg_ms'] = route['avg_ms']
        end
        existing['last_ms'] = route['last_ms']
        if route['codes'].is_a?(Hash)
          route['codes'].each do |k, v|
            existing['codes'][k] = (existing['codes'][k] || 0) + v.to_i
          end
        end
        err_count = (existing['codes']['4xx'] || 0) + (existing['codes']['5xx'] || 0)
        existing['error_rate_pct'] = (err_count.to_f / existing_count_total * 100).round(1)
      else
        totals[:routes] << route.dup
      end
    end
  end

  if data['pid']
    totals[:pids] << data['pid'] unless totals[:pids].include?(data['pid'])
  end

  totals
end

Instance Method Details

#call(request) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/takagi/middleware/metrics.rb', line 55

def call(request)
  @mutex.synchronize { @metrics[:requests] += 1 }

  start = monotonic
  response = yield request
  latency = monotonic - start

  record(request, response, latency)
  flush_if_due
  notify_forward(request, response, latency)
  response
end

#flush!Object

Flush the snapshot to <output_dir>/<pid>.json regardless of interval.



87
88
89
90
91
92
93
94
# File 'lib/takagi/middleware/metrics.rb', line 87

def flush!
  snapshot = @mutex.synchronize do
    snap = build_snapshot
    @last_flush = monotonic
    snap
  end
  write_snapshot(snapshot)
end

#forward_urlObject

Resolve the effective forward URL — either the literal value (String/URI) passed at construction or a fresh value from a callable. Lets a Metrics instance defer URL discovery until the first request is being processed (e.g. devtools’ ingest endpoint is bound by the sinatra plugin after Metrics is installed).



51
52
53
# File 'lib/takagi/middleware/metrics.rb', line 51

def forward_url
  @forward_to.is_a?(Proc) ? @forward_to.call : @forward_to
end

#increment_request_countObject

Bump the top-level request counter without going through ‘call` (which would also start a timer and re-enter the forward chain). Used by external aggregators like the devtools ingest endpoint that build metric records out-of-band.



72
73
74
# File 'lib/takagi/middleware/metrics.rb', line 72

def increment_request_count
  @mutex.synchronize { @metrics[:requests] += 1 }
end

#output_dirString

Returns the directory where per-PID snapshots are written.

Returns:

  • (String)

    the directory where per-PID snapshots are written.



124
125
126
# File 'lib/takagi/middleware/metrics.rb', line 124

def output_dir
  @output_dir
end

#pidInteger

Returns the PID that owns this Metrics instance.

Returns:

  • (Integer)

    the PID that owns this Metrics instance.



129
130
131
# File 'lib/takagi/middleware/metrics.rb', line 129

def pid
  @pid
end

#record(request, response, latency) ⇒ Object

Record a request/response/latency into the local metrics. Public so external aggregators (e.g. devtools’ HTTP ingest endpoint) can inject pre-aggregated data without going through the middleware chain. Thread-safe.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/takagi/middleware/metrics.rb', line 100

def record(request, response, latency)
  code = extract_code(response)
  code_class = code_class_for(code)

  @mutex.synchronize do
    @metrics[:responses][code_class] += 1 if code_class
    @metrics[:last_code] = code_name(code) || @metrics[:last_code]
    @metrics[:latency] = latency

    now_ms = (Time.now.to_f * 1000).round
    ms = (latency * 1000).round(3)
    @metrics[:latency_series] << { x: now_ms, y: ms }
    @metrics[:latency_series].shift while @metrics[:latency_series].size > LATENCY_WINDOW

    key = route_key(request)
    route = (@metrics[:routes][key] ||= new_route_stat)
    route[:count] += 1
    route[:lat_sum] += latency
    route[:last] = latency
    route[:codes][code_class] += 1 if code_class
  end
end

#record_observer_notificationObject



76
77
78
79
# File 'lib/takagi/middleware/metrics.rb', line 76

def record_observer_notification
  @mutex.synchronize { @metrics[:observer_notifications] += 1 }
  flush_if_due
end

#snapshotObject

Snapshot for the LOCAL process only (used by tests / single-process mode).



82
83
84
# File 'lib/takagi/middleware/metrics.rb', line 82

def snapshot
  @mutex.synchronize { build_snapshot }
end