Class: Apidepth::Collector
- Inherits:
-
Object
- Object
- Apidepth::Collector
- Defined in:
- lib/apidepth/collector.rb
Constant Summary collapse
- MAX_BATCH_SIZE =
100- MAX_QUEUE_SIZE =
5_000- FAILURE_THRESHOLD =
3- WATCHDOG_INTERVAL =
60- DEFAULT_URL =
"https://collector.apidepth.io/v1/events".freeze
- PRIVATE_HOST_PATTERN =
/ \Alocalhost\z | \A127\. | \A0\.0\.0\.0\z | \A169\.254\. | \A10\. | \A172\.(1[6-9]|2\d|3[01])\. | \A192\.168\. | \A\[?::1\]?\z | \A\[?fc | \A\[?fe80: /xi.freeze
Instance Attribute Summary collapse
-
#consecutive_failures ⇒ Object
readonly
Returns the value of attribute consecutive_failures.
-
#last_flush_at ⇒ Object
readonly
Returns the value of attribute last_flush_at.
-
#total_dropped ⇒ Object
readonly
Returns the value of attribute total_dropped.
Class Method Summary collapse
- .instance ⇒ Object
-
.reset! ⇒ Object
Tear down the existing Collector cleanly before clearing the singleton.
Instance Method Summary collapse
- #flush! ⇒ Object
-
#initialize ⇒ Collector
constructor
A new instance of Collector.
- #record(event) ⇒ Object
- #stats ⇒ Object
Constructor Details
#initialize ⇒ Collector
Returns a new instance of Collector.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/apidepth/collector.rb', line 34 def initialize @queue = Queue.new @stats_mutex = Mutex.new @send_mutex = Mutex.new @consecutive_failures = 0 @total_dropped = 0 @last_flush_at = nil @http = nil @cached_url = nil start_flush_thread start_watchdog_thread end |
Instance Attribute Details
#consecutive_failures ⇒ Object (readonly)
Returns the value of attribute consecutive_failures.
32 33 34 |
# File 'lib/apidepth/collector.rb', line 32 def consecutive_failures @consecutive_failures end |
#last_flush_at ⇒ Object (readonly)
Returns the value of attribute last_flush_at.
32 33 34 |
# File 'lib/apidepth/collector.rb', line 32 def last_flush_at @last_flush_at end |
#total_dropped ⇒ Object (readonly)
Returns the value of attribute total_dropped.
32 33 34 |
# File 'lib/apidepth/collector.rb', line 32 def total_dropped @total_dropped end |
Class Method Details
.instance ⇒ Object
18 19 20 |
# File 'lib/apidepth/collector.rb', line 18 def self.instance @instance_mutex.synchronize { @instance ||= new } end |
.reset! ⇒ Object
Tear down the existing Collector cleanly before clearing the singleton. Without teardown, every reset! leaks a flush thread and a watchdog thread. This matters in Puma cluster mode — on_worker_boot calls reset! per worker.
25 26 27 28 29 30 |
# File 'lib/apidepth/collector.rb', line 25 def self.reset! @instance_mutex.synchronize do @instance&.send(:teardown) @instance = nil end end |
Instance Method Details
#flush! ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/apidepth/collector.rb', line 56 def flush! events = drain_queue return if events.empty? send_batch(events) # Mirror safe_flush's stats update so last_flush_at reflects at_exit # delivery, not just background flushes. @stats_mutex.synchronize do @consecutive_failures = 0 @last_flush_at = Time.now end rescue StandardError => e failures = @stats_mutex.synchronize { @consecutive_failures += 1 } begin Apidepth.configuration.on_flush_error&.call(e, { dropped_events: events&.size || 0, consecutive_failures: failures, total_dropped: @total_dropped }) rescue StandardError nil end Apidepth.logger&.warn("[Apidepth] Final flush failed: #{e.class}: #{e.}") end |
#record(event) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/apidepth/collector.rb', line 48 def record(event) if @queue.size >= MAX_QUEUE_SIZE @stats_mutex.synchronize { @total_dropped += 1 } return end @queue.push(event) end |
#stats ⇒ Object
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/apidepth/collector.rb', line 84 def stats @stats_mutex.synchronize do { queue_size: @queue.size, consecutive_failures: @consecutive_failures, total_dropped: @total_dropped, last_flush_at: @last_flush_at } end end |