Class: Flare::MetricFlusher
- Inherits:
-
Object
- Object
- Flare::MetricFlusher
- Defined in:
- lib/flare/metric_flusher.rb
Overview
Background threads that periodically drain in-memory metrics and submit them via HTTP. Uses concurrent-ruby TimerTask + FixedThreadPool, matching the pattern in Flipper’s telemetry.
Fork-safe: detects forked processes and restarts automatically.
Constant Summary collapse
- DEFAULT_INTERVAL =
seconds
60- DEFAULT_SHUTDOWN_TIMEOUT =
seconds
5
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
-
#shutdown_timeout ⇒ Object
readonly
Returns the value of attribute shutdown_timeout.
Instance Method Summary collapse
-
#after_fork ⇒ Object
Re-initialize after fork.
-
#flush_now ⇒ Object
Manually trigger a flush (useful for testing or forced flushes).
-
#initialize(storage:, submitter:, interval: DEFAULT_INTERVAL, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT) ⇒ MetricFlusher
constructor
A new instance of MetricFlusher.
- #restart ⇒ Object
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(storage:, submitter:, interval: DEFAULT_INTERVAL, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT) ⇒ MetricFlusher
Returns a new instance of MetricFlusher.
18 19 20 21 22 23 24 25 |
# File 'lib/flare/metric_flusher.rb', line 18 def initialize(storage:, submitter:, interval: DEFAULT_INTERVAL, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT) @storage = storage @submitter = submitter @interval = interval @shutdown_timeout = shutdown_timeout @pid = $$ @stopped = false end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
16 17 18 |
# File 'lib/flare/metric_flusher.rb', line 16 def interval @interval end |
#shutdown_timeout ⇒ Object (readonly)
Returns the value of attribute shutdown_timeout.
16 17 18 |
# File 'lib/flare/metric_flusher.rb', line 16 def shutdown_timeout @shutdown_timeout end |
Instance Method Details
#after_fork ⇒ Object
Re-initialize after fork. Called automatically by MetricSpanProcessor on first span in the new process, or manually from Puma/Unicorn after_fork hooks.
95 96 97 98 |
# File 'lib/flare/metric_flusher.rb', line 95 def after_fork @pid = $$ restart end |
#flush_now ⇒ Object
Manually trigger a flush (useful for testing or forced flushes).
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/flare/metric_flusher.rb', line 72 def flush_now return 0 unless @storage && @submitter drained = @storage.drain return 0 if drained.empty? count, error = @submitter.submit(drained) if error warn "[Flare] Metric submission error: #{error.}" end count rescue => e warn "[Flare] Metric flush error: #{e.}" 0 end |
#restart ⇒ Object
65 66 67 68 69 |
# File 'lib/flare/metric_flusher.rb', line 65 def restart @stopped = false stop start end |
#running? ⇒ Boolean
88 89 90 |
# File 'lib/flare/metric_flusher.rb', line 88 def running? @timer&.running? || false end |
#start ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/flare/metric_flusher.rb', line 27 def start @stopped = false @pool = Concurrent::FixedThreadPool.new(1, { max_queue: 20, fallback_policy: :discard, name: "flare-metrics-submit-pool".freeze, }) @timer = Concurrent::TimerTask.execute({ execution_interval: @interval, name: "flare-metrics-drain-timer".freeze, }) { post_to_pool } end |
#stop ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/flare/metric_flusher.rb', line 42 def stop return if @stopped @stopped = true Flare.log "Shutting down metrics flusher, draining remaining metrics..." if @timer @timer.shutdown @timer.wait_for_termination(1) @timer.kill unless @timer.shutdown? end if @pool post_to_pool # one last drain @pool.shutdown pool_terminated = @pool.wait_for_termination(@shutdown_timeout) @pool.kill unless pool_terminated end Flare.log "Metrics flusher stopped" end |