Class: Speedshop::Cloudwatch::Reporter
- Inherits:
-
Object
- Object
- Speedshop::Cloudwatch::Reporter
- Includes:
- Singleton
- Defined in:
- lib/speedshop/cloudwatch/reporter.rb
Class Method Summary collapse
Instance Method Summary collapse
- #clear_all ⇒ Object
- #enqueue(datum) ⇒ Object
-
#flush_now! ⇒ Object
Force immediate metrics collection and flush (for testing) This bypasses the normal interval-based flushing.
-
#initialize ⇒ Reporter
constructor
A new instance of Reporter.
- #start! ⇒ Object
- #started? ⇒ Boolean
- #stop! ⇒ Object
Constructor Details
#initialize ⇒ Reporter
Returns a new instance of Reporter.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 10 def initialize @mutex = Mutex.new @condition_variable = ConditionVariable.new @queue = [] @collectors = [] @thread = nil @pid = Process.pid @running = false @dropped_since_last_flush = 0 @last_overflow_log = nil end |
Class Method Details
.reset ⇒ Object
99 100 101 102 103 104 105 106 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 99 def self.reset if instance_variable_defined?(:@singleton__instance__) reporter = instance_variable_get(:@singleton__instance__) reporter&.stop! if reporter&.started? reporter&.clear_all end instance_variable_set(:@singleton__instance__, nil) end |
Instance Method Details
#clear_all ⇒ Object
83 84 85 86 87 88 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 83 def clear_all @mutex.synchronize do @queue.clear @collectors.clear end end |
#enqueue(datum) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 67 def enqueue(datum) return unless config.environment_enabled? @mutex.synchronize do reset_after_fork! if forked? if @queue.size >= config.queue_max_size @queue.shift @dropped_since_last_flush += 1 end @queue << datum end start! unless started? end |
#flush_now! ⇒ Object
Force immediate metrics collection and flush (for testing) This bypasses the normal interval-based flushing
92 93 94 95 96 97 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 92 def flush_now! return unless @running collect_metrics flush_metrics end |
#start! ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 22 def start! return if !config.environment_enabled? || started? @mutex.synchronize do return if started? reset_after_fork! if forked? initialize_collectors Speedshop::Cloudwatch.log_info("Starting metric reporter (collectors: #{@collectors.map(&:class).join(", ")})") @running = true @thread = Thread.new do Thread.current.thread_variable_set(:fork_safe, true) Thread.current.name = "scw_reporter" run_loop end end end |
#started? ⇒ Boolean
41 42 43 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 41 def started? @running && @thread&.alive? end |
#stop! ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/speedshop/cloudwatch/reporter.rb', line 45 def stop! thread_to_join = nil @mutex.synchronize do return unless @running Speedshop::Cloudwatch.log_info("Stopping metric reporter") @running = false @condition_variable.signal thread_to_join = @thread @thread = @pid = nil @collectors.clear end return unless thread_to_join result = thread_to_join.join(2) if result.nil? Speedshop::Cloudwatch.log_info("Reporter thread did not finish within 2s timeout") else Speedshop::Cloudwatch.log_info("Reporter thread stopped gracefully") end end |