Class: Igniter::Store::ServerMetrics

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/server_metrics.rb

Overview

Thread-safe metrics and telemetry collector for StoreServer.

Tracks counters (requests, errors, bytes, facts), per-connection records, subscription counts, and fires in-process alerts when configurable thresholds are exceeded.

Usage:

metrics = ServerMetrics.new(thresholds: { max_connections: 200 })
id = metrics.record_connection_accepted(remote_addr: "10.0.0.1")
metrics.record_request(connection_id: id, op: "write_fact", bytes_in: 64, bytes_out: 16)
metrics.record_connection_closed(id: id)
snap = metrics.snapshot

Defined Under Namespace

Classes: Alert, ConnectionRecord

Constant Summary collapse

DEFAULT_THRESHOLDS =
{
  max_connections:          500,
  error_rate:               0.1,
  replay_size:              10_000,
  quarantine_receipt_count: 10,
  storage_byte_size:        1_073_741_824,
  slow_op_count:            nil   # nil = disabled; set to an integer to enable
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(thresholds: {}) ⇒ ServerMetrics

Returns a new instance of ServerMetrics.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/igniter/store/server_metrics.rb', line 42

def initialize(thresholds: {})
  @mutex               = Mutex.new
  @thresholds          = DEFAULT_THRESHOLDS.merge(thresholds)
  @started_at          = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @facts_written       = 0
  @facts_replayed      = 0
  @bytes_in            = 0
  @bytes_out           = 0
  @requests_total      = Hash.new(0)
  @errors_total        = Hash.new(0)
  @slow_ops_total      = Hash.new(0)
  @accepted_total      = 0
  @closed_total        = 0
  @rejected_total      = 0
  @active_conns        = {}
  @subscription_counts = Hash.new(0)
  @alerts              = []
end

Instance Method Details

#alertsObject



199
200
201
# File 'lib/igniter/store/server_metrics.rb', line 199

def alerts
  @mutex.synchronize { @alerts.dup }
end

#check_alerts(backend: nil) ⇒ Object

Evaluates alert thresholds and fires new alerts when breached. Already-fired alerts are not re-fired (no alert storms). Returns the current alerts Array.



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/igniter/store/server_metrics.rb', line 171

def check_alerts(backend: nil)
  @mutex.synchronize do
    fire_alert(:max_connections, @active_conns.size)
    total_req = @requests_total.values.sum
    if total_req.positive?
      total_err = @errors_total.values.sum
      fire_alert(:error_rate, total_err.to_f / total_req)
    end
    if backend.respond_to?(:storage_stats)
      begin
        stats = backend.storage_stats
        if stats
          qc = stats["stores"]&.values&.sum { |s| s["quarantine_receipt_count"].to_i } || 0
          fire_alert(:quarantine_receipt_count, qc)
          bs = stats["stores"]&.values&.sum { |s| s["byte_size"].to_i } || 0
          fire_alert(:storage_byte_size, bs)
        end
      rescue StandardError
        nil
      end
    end

    total_slow = @slow_ops_total.values.sum
    fire_alert(:slow_op_count, total_slow) if total_slow.positive?
  end
  @mutex.synchronize { @alerts.dup }
end

#record_connection_accepted(remote_addr:) ⇒ Object

Records a new connection. Returns the connection_id string.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/igniter/store/server_metrics.rb', line 62

def record_connection_accepted(remote_addr:)
  id  = SecureRandom.hex(8)
  rec = ConnectionRecord.new(
    connection_id: id,
    accepted_at:   Time.now,
    closed_at:     nil,
    remote_addr:   remote_addr.to_s,
    ops_count:     0,
    bytes_in:      0,
    bytes_out:     0,
    last_op:       nil,
    close_reason:  nil
  )
  @mutex.synchronize { @active_conns[id] = rec; @accepted_total += 1 }
  id
end

#record_connection_closed(id:, reason: nil) ⇒ Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/igniter/store/server_metrics.rb', line 79

def record_connection_closed(id:, reason: nil)
  @mutex.synchronize do
    rec = @active_conns.delete(id)
    if rec
      rec.closed_at    = Time.now
      rec.close_reason = reason
    end
    @closed_total += 1
  end
end

#record_connection_rejectedObject



90
91
92
# File 'lib/igniter/store/server_metrics.rb', line 90

def record_connection_rejected
  @mutex.synchronize { @rejected_total += 1 }
end

#record_error(op:, error_class:) ⇒ Object



111
112
113
114
# File 'lib/igniter/store/server_metrics.rb', line 111

def record_error(op:, error_class:)
  key = "#{error_class}/#{op}"
  @mutex.synchronize { @errors_total[key] += 1 }
end

#record_facts_replayed(count:) ⇒ Object



124
125
126
# File 'lib/igniter/store/server_metrics.rb', line 124

def record_facts_replayed(count:)
  @mutex.synchronize { @facts_replayed += count }
end

#record_facts_written(count: 1) ⇒ Object



120
121
122
# File 'lib/igniter/store/server_metrics.rb', line 120

def record_facts_written(count: 1)
  @mutex.synchronize { @facts_written += count }
end

#record_request(connection_id:, op:, bytes_in: 0, bytes_out: 0) ⇒ Object

Records one request dispatched on a connection.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/igniter/store/server_metrics.rb', line 95

def record_request(connection_id:, op:, bytes_in: 0, bytes_out: 0)
  op_s = op.to_s
  @mutex.synchronize do
    @requests_total[op_s] += 1
    @bytes_in  += bytes_in.to_i
    @bytes_out += bytes_out.to_i
    rec = @active_conns[connection_id]
    if rec
      rec.ops_count += 1
      rec.bytes_in  += bytes_in.to_i
      rec.bytes_out += bytes_out.to_i
      rec.last_op    = op_s
    end
  end
end

#record_slow_op(op:) ⇒ Object



116
117
118
# File 'lib/igniter/store/server_metrics.rb', line 116

def record_slow_op(op:)
  @mutex.synchronize { @slow_ops_total[op.to_s] += 1 }
end

#record_subscription_closed(store:) ⇒ Object



132
133
134
135
136
137
# File 'lib/igniter/store/server_metrics.rb', line 132

def record_subscription_closed(store:)
  @mutex.synchronize do
    s = store.to_s
    @subscription_counts[s] = [@subscription_counts[s] - 1, 0].max
  end
end

#record_subscription_opened(store:) ⇒ Object



128
129
130
# File 'lib/igniter/store/server_metrics.rb', line 128

def record_subscription_opened(store:)
  @mutex.synchronize { @subscription_counts[store.to_s] += 1 }
end

#snapshot(backend: nil) ⇒ Object

Returns a frozen snapshot Hash of all current metrics. backend: is optional — if the backend supports storage_stats, it is included.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/igniter/store/server_metrics.rb', line 141

def snapshot(backend: nil)
  now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @mutex.synchronize do
    storage = backend.respond_to?(:storage_stats) ? (backend.storage_stats rescue nil) : nil
    {
      schema_version:             1,
      generated_at:               Time.now.iso8601(3),
      uptime_ms:                  ((now - @started_at) * 1000).ceil,
      facts_written:              @facts_written,
      facts_replayed:             @facts_replayed,
      bytes_in:                   @bytes_in,
      bytes_out:                  @bytes_out,
      requests_total:             @requests_total.dup,
      errors_total:               @errors_total.dup,
      slow_ops_total:             @slow_ops_total.dup,
      active_connections:         @active_conns.size,
      accepted_connections_total: @accepted_total,
      closed_connections_total:   @closed_total,
      rejected_connections_total: @rejected_total,
      subscription_count:         @subscription_counts.values.sum,
      subscriptions_by_store:     @subscription_counts.dup,
      storage_stats:              storage,
      alerts:                     @alerts.map(&:to_h)
    }
  end
end