Class: Apidepth::Collector

Inherits:
Object
  • Object
show all
Defined in:
lib/apidepth/collector.rb

Constant Summary collapse

MAX_BATCH_SIZE =
100
MAX_QUEUE_SIZE =
5_000
FAILURE_THRESHOLD =
3
WATCHDOG_INTERVAL =
60
DEFAULT_URL =
"https://collector.apidepth.io/v1/events".freeze
PRIVATE_HOST_PATTERN =
/
  \Alocalhost\z          |
  \A127\.                |
  \A0\.0\.0\.0\z         |
  \A169\.254\.           |
  \A10\.                 |
  \A172\.(1[6-9]|2\d|3[01])\. |
  \A192\.168\.           |
  \A\[?::1\]?\z          |
  \A\[?fc                |
  \A\[?fe80:
/xi.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCollector

Returns a new instance of Collector.



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/apidepth/collector.rb', line 34

def initialize
  @queue                = Queue.new
  @stats_mutex          = Mutex.new
  @send_mutex           = Mutex.new
  @consecutive_failures = 0
  @total_dropped        = 0
  @last_flush_at        = nil
  @http                 = nil
  @cached_url           = nil

  start_flush_thread
  start_watchdog_thread
end

Instance Attribute Details

#consecutive_failuresObject (readonly)

Returns the value of attribute consecutive_failures.



32
33
34
# File 'lib/apidepth/collector.rb', line 32

def consecutive_failures
  @consecutive_failures
end

#last_flush_atObject (readonly)

Returns the value of attribute last_flush_at.



32
33
34
# File 'lib/apidepth/collector.rb', line 32

def last_flush_at
  @last_flush_at
end

#total_droppedObject (readonly)

Returns the value of attribute total_dropped.



32
33
34
# File 'lib/apidepth/collector.rb', line 32

def total_dropped
  @total_dropped
end

Class Method Details

.instanceObject



18
19
20
# File 'lib/apidepth/collector.rb', line 18

def self.instance
  @instance_mutex.synchronize { @instance ||= new }
end

.reset!Object

Tear down the existing Collector cleanly before clearing the singleton. Without teardown, every reset! leaks a flush thread and a watchdog thread. This matters in Puma cluster mode — on_worker_boot calls reset! per worker.



25
26
27
28
29
30
# File 'lib/apidepth/collector.rb', line 25

def self.reset!
  @instance_mutex.synchronize do
    @instance&.send(:teardown)
    @instance = nil
  end
end

Instance Method Details

#flush!Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/apidepth/collector.rb', line 56

def flush!
  events = drain_queue
  return if events.empty?

  send_batch(events)

  # Mirror safe_flush's stats update so last_flush_at reflects at_exit
  # delivery, not just background flushes.
  @stats_mutex.synchronize do
    @consecutive_failures = 0
    @last_flush_at        = Time.now
  end
rescue StandardError => e
  failures = @stats_mutex.synchronize { @consecutive_failures += 1 }

  begin
    Apidepth.configuration.on_flush_error&.call(e, {
                                                  dropped_events: events&.size || 0,
                                                  consecutive_failures: failures,
                                                  total_dropped: @total_dropped
                                                })
  rescue StandardError
    nil
  end

  Apidepth.logger&.warn("[Apidepth] Final flush failed: #{e.class}: #{e.message}")
end

#record(event) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/apidepth/collector.rb', line 48

def record(event)
  if @queue.size >= MAX_QUEUE_SIZE
    @stats_mutex.synchronize { @total_dropped += 1 }
    return
  end
  @queue.push(event)
end

#statsObject



84
85
86
87
88
89
90
91
92
93
# File 'lib/apidepth/collector.rb', line 84

def stats
  @stats_mutex.synchronize do
    {
      queue_size: @queue.size,
      consecutive_failures: @consecutive_failures,
      total_dropped: @total_dropped,
      last_flush_at: @last_flush_at
    }
  end
end