Class: Apartment::PoolObserver
- Inherits:
-
Object
- Object
- Apartment::PoolObserver
- Defined in:
- lib/apartment/pool_observer.rb
Overview
Sink-agnostic observer for the v4 pool lifecycle. Subscribes to the gem’s ActiveSupport::Notifications and forwards a normalized Sample to a caller- supplied sink; optionally samples pool gauges on an interval. Ships no transport — the adopter’s sink maps Samples to CloudWatch/StatsD/logs/etc. All sink/sampler calls are error-isolated: telemetry must never raise into the gem’s instrumentation or timer path. See docs/observability.md.
Defined Under Namespace
Classes: Sample
Constant Summary collapse
- COUNTER_EVENTS =
Pool-lifecycle events forwarded as counters (value 1 each).
%i[create evict cap_unmet skip_evict reaper_stopped].freeze
Class Method Summary collapse
-
.install!(sink:, sample_interval: nil, backend_count: nil) ⇒ Object
Build, subscribe, and (optionally) start the gauge sampler.
Instance Method Summary collapse
-
#initialize(sink:, backend_count: nil) ⇒ PoolObserver
constructor
A new instance of PoolObserver.
-
#sample! ⇒ Object
One gauge pass: live tenant-pool count, plus the adopter’s backend count when supplied.
- #start_sampler!(interval:) ⇒ Object
-
#stop! ⇒ Object
Unsubscribe from all events and shut down the sampler.
- #subscribe! ⇒ Object
Constructor Details
#initialize(sink:, backend_count: nil) ⇒ PoolObserver
Returns a new instance of PoolObserver.
36 37 38 39 40 41 42 43 |
# File 'lib/apartment/pool_observer.rb', line 36 def initialize(sink:, backend_count: nil) raise(ArgumentError, 'sink must be callable') unless sink.respond_to?(:call) @sink = sink @backend_count = backend_count @subscribers = [] @sampler = nil end |
Class Method Details
.install!(sink:, sample_interval: nil, backend_count: nil) ⇒ Object
Build, subscribe, and (optionally) start the gauge sampler. Returns the observer; call #stop! to tear it down. Idempotent subscription is NOT guaranteed — install once per process (e.g. an after_initialize hook).
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/apartment/pool_observer.rb', line 24 def self.install!(sink:, sample_interval: nil, backend_count: nil) observer = new(sink: sink, backend_count: backend_count) observer.subscribe! observer.start_sampler!(interval: sample_interval) if sample_interval&.positive? observer rescue StandardError # Don't leak subscriptions if a later step (e.g. a bad sample_interval) # raises after subscribe! has registered listeners. observer&.stop! raise end |
Instance Method Details
#sample! ⇒ Object
One gauge pass: live tenant-pool count, plus the adopter’s backend count when supplied. Safe to call from start_sampler! or an external scheduler.
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/apartment/pool_observer.rb', line 56 def sample! total = Apartment.pool_manager&.stats&.fetch(:total_pools, 0) || 0 emit(Sample.new(name: :tenant_pools_live, kind: :gauge, value: total, dimensions: {}, payload: {})) return unless @backend_count backends = @backend_count.call return if backends.nil? emit(Sample.new(name: :backend_connections, kind: :gauge, value: backends, dimensions: {}, payload: {})) rescue StandardError => e warn_failure('sample!', e) end |
#start_sampler!(interval:) ⇒ Object
70 71 72 73 74 75 |
# File 'lib/apartment/pool_observer.rb', line 70 def start_sampler!(interval:) @sampler&.shutdown @sampler = Concurrent::TimerTask.new(execution_interval: interval) { sample! } @sampler.execute @sampler end |
#stop! ⇒ Object
Unsubscribe from all events and shut down the sampler. Safe to call twice.
78 79 80 81 82 |
# File 'lib/apartment/pool_observer.rb', line 78 def stop! @subscribers.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) } @subscribers.clear shutdown_sampler! end |
#subscribe! ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/apartment/pool_observer.rb', line 45 def subscribe! COUNTER_EVENTS.each do |event| @subscribers << ActiveSupport::Notifications.subscribe("#{event}.apartment") do |*, payload| record_event(event, payload || {}) end end self end |