Class: Dispatch::Rails::HeartbeatAggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/dispatch/rails/heartbeat_aggregator.rb

Overview

Process-global, thread-safe accumulator of per-transaction request/error counts, bucketed into fixed time windows. A background thread flushes completed windows to Dispatch as a single small POST each — so the wire cost is one request per window, not one per HTTP request. The counts are the raw material for the server-side confound guard.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHeartbeatAggregator

Returns a new instance of HeartbeatAggregator.



23
24
25
26
27
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 23

def initialize
  @mutex = Mutex.new
  # { window_start_epoch => { "controller#action" => { requests:, errors: } } }
  @windows = Hash.new { |h, w| h[w] = Hash.new { |t, name| t[name] = { requests: 0, errors: 0 } } }
end

Class Method Details

.instanceObject



12
13
14
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 12

def instance
  @instance ||= new
end

.reset!Object

Test seam.



17
18
19
20
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 17

def reset!
  @instance&.stop
  @instance = nil
end

Instance Method Details

#deliver_all(now: Time.now) ⇒ Object

Ship everything, including the still-open window. Called at process exit so a deploy/restart doesn’t drop the final window of traffic counts — the window the confound guard needs most.



62
63
64
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 62

def deliver_all(now: Time.now)
  flush(now: now, include_current: true).each { |payload| Transport.instance.deliver_heartbeat(payload) }
end

#deliver_ready(now: Time.now) ⇒ Object

Flush and ship — the unit of work the background thread repeats.



55
56
57
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 55

def deliver_ready(now: Time.now)
  flush(now: now).each { |payload| Transport.instance.deliver_heartbeat(payload) }
end

#flush(now: Time.now, include_current: false) ⇒ Object

Payloads for every window whose time has fully elapsed; those windows are then dropped. Incomplete (current) windows are left to keep accumulating —unless include_current, which drains everything (the shutdown path, where waiting for the window to elapse means losing it).



44
45
46
47
48
49
50
51
52
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 44

def flush(now: Time.now, include_current: false)
  @mutex.synchronize do
    ready = @windows.keys
    ready = ready.select { |w| w + window_seconds <= now.to_i } unless include_current
    payloads = ready.map { |w| build_payload(w) }
    ready.each { |w| @windows.delete(w) }
    payloads
  end
end

#record(transaction:, errored:, now: Time.now) ⇒ Object



29
30
31
32
33
34
35
36
37
38
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 29

def record(transaction:, errored:, now: Time.now)
  return if transaction.to_s.empty?

  @mutex.synchronize do
    counts = @windows[window_for(now)][transaction]
    counts[:requests] += 1
    counts[:errors] += 1 if errored
  end
  ensure_flusher
end

#stopObject



66
67
68
69
# File 'lib/dispatch/rails/heartbeat_aggregator.rb', line 66

def stop
  @flusher&.kill
  @flusher = nil
end