Class: Apartment::PoolObserver

Inherits:
Object
  • Object
show all
Defined in:
lib/apartment/pool_observer.rb

Overview

Sink-agnostic observer for the v4 pool lifecycle. Subscribes to the gem’s ActiveSupport::Notifications and forwards a normalized Sample to a caller- supplied sink; optionally samples pool gauges on an interval. Ships no transport — the adopter’s sink maps Samples to CloudWatch/StatsD/logs/etc. All sink/sampler calls are error-isolated: telemetry must never raise into the gem’s instrumentation or timer path. See docs/observability.md.

Defined Under Namespace

Classes: Sample

Constant Summary collapse

COUNTER_EVENTS =

Pool-lifecycle events forwarded as counters (value 1 each).

%i[create evict cap_unmet skip_evict reaper_stopped].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sink:, backend_count: nil) ⇒ PoolObserver

Returns a new instance of PoolObserver.

Raises:

  • (ArgumentError)


36
37
38
39
40
41
42
43
# File 'lib/apartment/pool_observer.rb', line 36

def initialize(sink:, backend_count: nil)
  raise(ArgumentError, 'sink must be callable') unless sink.respond_to?(:call)

  @sink = sink
  @backend_count = backend_count
  @subscribers = []
  @sampler = nil
end

Class Method Details

.install!(sink:, sample_interval: nil, backend_count: nil) ⇒ Object

Build, subscribe, and (optionally) start the gauge sampler. Returns the observer; call #stop! to tear it down. Idempotent subscription is NOT guaranteed — install once per process (e.g. an after_initialize hook).



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/apartment/pool_observer.rb', line 24

def self.install!(sink:, sample_interval: nil, backend_count: nil)
  observer = new(sink: sink, backend_count: backend_count)
  observer.subscribe!
  observer.start_sampler!(interval: sample_interval) if sample_interval&.positive?
  observer
rescue StandardError
  # Don't leak subscriptions if a later step (e.g. a bad sample_interval)
  # raises after subscribe! has registered listeners.
  observer&.stop!
  raise
end

Instance Method Details

#sample!Object

One gauge pass: live tenant-pool count, plus the adopter’s backend count when supplied. Safe to call from start_sampler! or an external scheduler.



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/apartment/pool_observer.rb', line 56

def sample!
  total = Apartment.pool_manager&.stats&.fetch(:total_pools, 0) || 0
  emit(Sample.new(name: :tenant_pools_live, kind: :gauge, value: total, dimensions: {}, payload: {}))

  return unless @backend_count

  backends = @backend_count.call
  return if backends.nil?

  emit(Sample.new(name: :backend_connections, kind: :gauge, value: backends, dimensions: {}, payload: {}))
rescue StandardError => e
  warn_failure('sample!', e)
end

#start_sampler!(interval:) ⇒ Object



70
71
72
73
74
75
# File 'lib/apartment/pool_observer.rb', line 70

def start_sampler!(interval:)
  @sampler&.shutdown
  @sampler = Concurrent::TimerTask.new(execution_interval: interval) { sample! }
  @sampler.execute
  @sampler
end

#stop!Object

Unsubscribe from all events and shut down the sampler. Safe to call twice.



78
79
80
81
82
# File 'lib/apartment/pool_observer.rb', line 78

def stop!
  @subscribers.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) }
  @subscribers.clear
  shutdown_sampler!
end

#subscribe!Object



45
46
47
48
49
50
51
52
# File 'lib/apartment/pool_observer.rb', line 45

def subscribe!
  COUNTER_EVENTS.each do |event|
    @subscribers << ActiveSupport::Notifications.subscribe("#{event}.apartment") do |*, payload|
      record_event(event, payload || {})
    end
  end
  self
end