Class: Igniter::Store::ServerMetrics
- Inherits:
-
Object
- Object
- Igniter::Store::ServerMetrics
- Defined in:
- lib/igniter/store/server_metrics.rb
Overview
Thread-safe metrics and telemetry collector for StoreServer.
Tracks counters (requests, errors, bytes, facts), per-connection records, subscription counts, and fires in-process alerts when configurable thresholds are exceeded.
Usage:
metrics = ServerMetrics.new(thresholds: { max_connections: 200 })
id = metrics.record_connection_accepted(remote_addr: "10.0.0.1")
metrics.record_request(connection_id: id, op: "write_fact", bytes_in: 64, bytes_out: 16)
metrics.record_connection_closed(id: id)
snap = metrics.snapshot
Defined Under Namespace
Classes: Alert, ConnectionRecord
Constant Summary collapse
- DEFAULT_THRESHOLDS =
{ max_connections: 500, error_rate: 0.1, replay_size: 10_000, quarantine_receipt_count: 10, storage_byte_size: 1_073_741_824, slow_op_count: nil # nil = disabled; set to an integer to enable }.freeze
Instance Method Summary collapse
- #alerts ⇒ Object
-
#check_alerts(backend: nil) ⇒ Object
Evaluates alert thresholds and fires new alerts when breached.
-
#initialize(thresholds: {}) ⇒ ServerMetrics
constructor
A new instance of ServerMetrics.
-
#record_connection_accepted(remote_addr:) ⇒ Object
Records a new connection.
- #record_connection_closed(id:, reason: nil) ⇒ Object
- #record_connection_rejected ⇒ Object
- #record_error(op:, error_class:) ⇒ Object
- #record_facts_replayed(count:) ⇒ Object
- #record_facts_written(count: 1) ⇒ Object
-
#record_request(connection_id:, op:, bytes_in: 0, bytes_out: 0) ⇒ Object
Records one request dispatched on a connection.
- #record_slow_op(op:) ⇒ Object
- #record_subscription_closed(store:) ⇒ Object
- #record_subscription_opened(store:) ⇒ Object
-
#snapshot(backend: nil) ⇒ Object
Returns a frozen snapshot Hash of all current metrics.
Constructor Details
#initialize(thresholds: {}) ⇒ ServerMetrics
Returns a new instance of ServerMetrics.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/igniter/store/server_metrics.rb', line 42 def initialize(thresholds: {}) @mutex = Mutex.new @thresholds = DEFAULT_THRESHOLDS.merge(thresholds) @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) @facts_written = 0 @facts_replayed = 0 @bytes_in = 0 @bytes_out = 0 @requests_total = Hash.new(0) @errors_total = Hash.new(0) @slow_ops_total = Hash.new(0) @accepted_total = 0 @closed_total = 0 @rejected_total = 0 @active_conns = {} @subscription_counts = Hash.new(0) @alerts = [] end |
Instance Method Details
#alerts ⇒ Object
199 200 201 |
# File 'lib/igniter/store/server_metrics.rb', line 199 def alerts @mutex.synchronize { @alerts.dup } end |
#check_alerts(backend: nil) ⇒ Object
Evaluates alert thresholds and fires new alerts when breached. Already-fired alerts are not re-fired (no alert storms). Returns the current alerts Array.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/igniter/store/server_metrics.rb', line 171 def check_alerts(backend: nil) @mutex.synchronize do fire_alert(:max_connections, @active_conns.size) total_req = @requests_total.values.sum if total_req.positive? total_err = @errors_total.values.sum fire_alert(:error_rate, total_err.to_f / total_req) end if backend.respond_to?(:storage_stats) begin stats = backend.storage_stats if stats qc = stats["stores"]&.values&.sum { |s| s["quarantine_receipt_count"].to_i } || 0 fire_alert(:quarantine_receipt_count, qc) bs = stats["stores"]&.values&.sum { |s| s["byte_size"].to_i } || 0 fire_alert(:storage_byte_size, bs) end rescue StandardError nil end end total_slow = @slow_ops_total.values.sum fire_alert(:slow_op_count, total_slow) if total_slow.positive? end @mutex.synchronize { @alerts.dup } end |
#record_connection_accepted(remote_addr:) ⇒ Object
Records a new connection. Returns the connection_id string.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/igniter/store/server_metrics.rb', line 62 def record_connection_accepted(remote_addr:) id = SecureRandom.hex(8) rec = ConnectionRecord.new( connection_id: id, accepted_at: Time.now, closed_at: nil, remote_addr: remote_addr.to_s, ops_count: 0, bytes_in: 0, bytes_out: 0, last_op: nil, close_reason: nil ) @mutex.synchronize { @active_conns[id] = rec; @accepted_total += 1 } id end |
#record_connection_closed(id:, reason: nil) ⇒ Object
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/igniter/store/server_metrics.rb', line 79 def record_connection_closed(id:, reason: nil) @mutex.synchronize do rec = @active_conns.delete(id) if rec rec.closed_at = Time.now rec.close_reason = reason end @closed_total += 1 end end |
#record_connection_rejected ⇒ Object
90 91 92 |
# File 'lib/igniter/store/server_metrics.rb', line 90 def record_connection_rejected @mutex.synchronize { @rejected_total += 1 } end |
#record_error(op:, error_class:) ⇒ Object
111 112 113 114 |
# File 'lib/igniter/store/server_metrics.rb', line 111 def record_error(op:, error_class:) key = "#{error_class}/#{op}" @mutex.synchronize { @errors_total[key] += 1 } end |
#record_facts_replayed(count:) ⇒ Object
124 125 126 |
# File 'lib/igniter/store/server_metrics.rb', line 124 def record_facts_replayed(count:) @mutex.synchronize { @facts_replayed += count } end |
#record_facts_written(count: 1) ⇒ Object
120 121 122 |
# File 'lib/igniter/store/server_metrics.rb', line 120 def record_facts_written(count: 1) @mutex.synchronize { @facts_written += count } end |
#record_request(connection_id:, op:, bytes_in: 0, bytes_out: 0) ⇒ Object
Records one request dispatched on a connection.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/igniter/store/server_metrics.rb', line 95 def record_request(connection_id:, op:, bytes_in: 0, bytes_out: 0) op_s = op.to_s @mutex.synchronize do @requests_total[op_s] += 1 @bytes_in += bytes_in.to_i @bytes_out += bytes_out.to_i rec = @active_conns[connection_id] if rec rec.ops_count += 1 rec.bytes_in += bytes_in.to_i rec.bytes_out += bytes_out.to_i rec.last_op = op_s end end end |
#record_slow_op(op:) ⇒ Object
116 117 118 |
# File 'lib/igniter/store/server_metrics.rb', line 116 def record_slow_op(op:) @mutex.synchronize { @slow_ops_total[op.to_s] += 1 } end |
#record_subscription_closed(store:) ⇒ Object
132 133 134 135 136 137 |
# File 'lib/igniter/store/server_metrics.rb', line 132 def record_subscription_closed(store:) @mutex.synchronize do s = store.to_s @subscription_counts[s] = [@subscription_counts[s] - 1, 0].max end end |
#record_subscription_opened(store:) ⇒ Object
128 129 130 |
# File 'lib/igniter/store/server_metrics.rb', line 128 def record_subscription_opened(store:) @mutex.synchronize { @subscription_counts[store.to_s] += 1 } end |
#snapshot(backend: nil) ⇒ Object
Returns a frozen snapshot Hash of all current metrics. backend: is optional — if the backend supports storage_stats, it is included.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/igniter/store/server_metrics.rb', line 141 def snapshot(backend: nil) now = Process.clock_gettime(Process::CLOCK_MONOTONIC) @mutex.synchronize do storage = backend.respond_to?(:storage_stats) ? (backend.storage_stats rescue nil) : nil { schema_version: 1, generated_at: Time.now.iso8601(3), uptime_ms: ((now - @started_at) * 1000).ceil, facts_written: @facts_written, facts_replayed: @facts_replayed, bytes_in: @bytes_in, bytes_out: @bytes_out, requests_total: @requests_total.dup, errors_total: @errors_total.dup, slow_ops_total: @slow_ops_total.dup, active_connections: @active_conns.size, accepted_connections_total: @accepted_total, closed_connections_total: @closed_total, rejected_connections_total: @rejected_total, subscription_count: @subscription_counts.values.sum, subscriptions_by_store: @subscription_counts.dup, storage_stats: storage, alerts: @alerts.map(&:to_h) } end end |