Class: Dispatch::Rails::HeartbeatAggregator
- Inherits:
-
Object
- Object
- Dispatch::Rails::HeartbeatAggregator
- Defined in:
- lib/dispatch/rails/heartbeat_aggregator.rb
Overview
Process-global, thread-safe accumulator of per-transaction request/error counts, bucketed into fixed time windows. A background thread flushes completed windows to Dispatch as a single small POST each — so the wire cost is one request per window, not one per HTTP request. The counts are the raw material for the server-side confound guard.
Class Method Summary collapse
- .instance ⇒ Object
-
.reset! ⇒ Object
Test seam.
Instance Method Summary collapse
-
#deliver_all(now: Time.now) ⇒ Object
Ship everything, including the still-open window.
-
#deliver_ready(now: Time.now) ⇒ Object
Flush and ship — the unit of work the background thread repeats.
-
#flush(now: Time.now, include_current: false) ⇒ Object
Payloads for every window whose time has fully elapsed; those windows are then dropped.
-
#initialize ⇒ HeartbeatAggregator
constructor
A new instance of HeartbeatAggregator.
- #record(transaction:, errored:, now: Time.now) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ HeartbeatAggregator
Returns a new instance of HeartbeatAggregator.
23 24 25 26 27 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 23 def initialize @mutex = Mutex.new # { window_start_epoch => { "controller#action" => { requests:, errors: } } } @windows = Hash.new { |h, w| h[w] = Hash.new { |t, name| t[name] = { requests: 0, errors: 0 } } } end |
Class Method Details
.instance ⇒ Object
12 13 14 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 12 def instance @instance ||= new end |
.reset! ⇒ Object
Test seam.
17 18 19 20 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 17 def reset! @instance&.stop @instance = nil end |
Instance Method Details
#deliver_all(now: Time.now) ⇒ Object
Ship everything, including the still-open window. Called at process exit so a deploy/restart doesn’t drop the final window of traffic counts — the window the confound guard needs most.
62 63 64 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 62 def deliver_all(now: Time.now) flush(now: now, include_current: true).each { |payload| Transport.instance.deliver_heartbeat(payload) } end |
#deliver_ready(now: Time.now) ⇒ Object
Flush and ship — the unit of work the background thread repeats.
55 56 57 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 55 def deliver_ready(now: Time.now) flush(now: now).each { |payload| Transport.instance.deliver_heartbeat(payload) } end |
#flush(now: Time.now, include_current: false) ⇒ Object
Payloads for every window whose time has fully elapsed; those windows are then dropped. Incomplete (current) windows are left to keep accumulating —unless include_current, which drains everything (the shutdown path, where waiting for the window to elapse means losing it).
44 45 46 47 48 49 50 51 52 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 44 def flush(now: Time.now, include_current: false) @mutex.synchronize do ready = @windows.keys ready = ready.select { |w| w + window_seconds <= now.to_i } unless include_current payloads = ready.map { |w| build_payload(w) } ready.each { |w| @windows.delete(w) } payloads end end |
#record(transaction:, errored:, now: Time.now) ⇒ Object
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 29 def record(transaction:, errored:, now: Time.now) return if transaction.to_s.empty? @mutex.synchronize do counts = @windows[window_for(now)][transaction] counts[:requests] += 1 counts[:errors] += 1 if errored end ensure_flusher end |
#stop ⇒ Object
66 67 68 69 |
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 66 def stop @flusher&.kill @flusher = nil end |