Class: Flare::MetricFlusher

Inherits:
Object
  • Object
show all
Defined in:
lib/flare/metric_flusher.rb

Overview

Background threads that periodically drain in-memory metrics and submit them via HTTP. Uses concurrent-ruby TimerTask + FixedThreadPool, matching the pattern in Flipper’s telemetry.

Fork-safe: detects forked processes and restarts automatically.

Constant Summary collapse

DEFAULT_INTERVAL =

seconds

60
DEFAULT_SHUTDOWN_TIMEOUT =

seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(storage:, submitter:, interval: DEFAULT_INTERVAL, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT) ⇒ MetricFlusher

Returns a new instance of MetricFlusher.



18
19
20
21
22
23
24
25
# File 'lib/flare/metric_flusher.rb', line 18

def initialize(storage:, submitter:, interval: DEFAULT_INTERVAL, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT)
  @storage = storage
  @submitter = submitter
  @interval = interval
  @shutdown_timeout = shutdown_timeout
  @pid = $$
  @stopped = false
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



16
17
18
# File 'lib/flare/metric_flusher.rb', line 16

def interval
  @interval
end

#shutdown_timeoutObject (readonly)

Returns the value of attribute shutdown_timeout.



16
17
18
# File 'lib/flare/metric_flusher.rb', line 16

def shutdown_timeout
  @shutdown_timeout
end

Instance Method Details

#after_forkObject

Re-initialize after fork. Called automatically by MetricSpanProcessor on first span in the new process, or manually from Puma/Unicorn after_fork hooks.



95
96
97
98
# File 'lib/flare/metric_flusher.rb', line 95

def after_fork
  @pid = $$
  restart
end

#flush_nowObject

Manually trigger a flush (useful for testing or forced flushes).



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/flare/metric_flusher.rb', line 72

def flush_now
  return 0 unless @storage && @submitter

  drained = @storage.drain
  return 0 if drained.empty?

  count, error = @submitter.submit(drained)
  if error
    warn "[Flare] Metric submission error: #{error.message}"
  end
  count
rescue => e
  warn "[Flare] Metric flush error: #{e.message}"
  0
end

#restartObject



65
66
67
68
69
# File 'lib/flare/metric_flusher.rb', line 65

def restart
  @stopped = false
  stop
  start
end

#running?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/flare/metric_flusher.rb', line 88

def running?
  @timer&.running? || false
end

#startObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/flare/metric_flusher.rb', line 27

def start
  @stopped = false

  @pool = Concurrent::FixedThreadPool.new(1, {
    max_queue: 20,
    fallback_policy: :discard,
    name: "flare-metrics-submit-pool".freeze,
  })

  @timer = Concurrent::TimerTask.execute({
    execution_interval: @interval,
    name: "flare-metrics-drain-timer".freeze,
  }) { post_to_pool }
end

#stopObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/flare/metric_flusher.rb', line 42

def stop
  return if @stopped

  @stopped = true

  Flare.log "Shutting down metrics flusher, draining remaining metrics..."

  if @timer
    @timer.shutdown
    @timer.wait_for_termination(1)
    @timer.kill unless @timer.shutdown?
  end

  if @pool
    post_to_pool # one last drain
    @pool.shutdown
    pool_terminated = @pool.wait_for_termination(@shutdown_timeout)
    @pool.kill unless pool_terminated
  end

  Flare.log "Metrics flusher stopped"
end