Class: Async::Background::Metrics
- Inherits:
-
Object
- Object
- Async::Background::Metrics
- Defined in:
- lib/async/background/metrics.rb
Constant Summary collapse
- SCHEMA_FIELDS =
{ total_runs: :u64, total_successes: :u64, total_failures: :u64, total_timeouts: :u64, total_skips: :u64, active_jobs: :u32, last_run_at: :u64, last_duration_ms: :u32 }.freeze
Instance Attribute Summary collapse
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
Class Method Summary collapse
- .default_shm_path ⇒ Object
-
.read_all(total_workers:, path: default_shm_path) ⇒ Object
Read metrics for all workers from the shm file.
- .schema ⇒ Object
- .segment_size ⇒ Object
Instance Method Summary collapse
- #enabled? ⇒ Boolean
-
#initialize(worker_index:, total_workers:, shm_path: self.class.default_shm_path) ⇒ Metrics
constructor
A new instance of Metrics.
- #job_failed(entry, error) ⇒ Object
- #job_finished(entry, duration) ⇒ Object
- #job_skipped(entry) ⇒ Object
- #job_started(entry) ⇒ Object
- #job_timed_out(entry) ⇒ Object
- #values ⇒ Object
Constructor Details
#initialize(worker_index:, total_workers:, shm_path: self.class.default_shm_path) ⇒ Metrics
Returns a new instance of Metrics.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/async/background/metrics.rb', line 19 def initialize(worker_index:, total_workers:, shm_path: self.class.default_shm_path) require 'async/utilization' @registry = nil @enabled = false @registry = ::Async::Utilization::Registry.new @enabled = true ensure_shm!(total_workers, shm_path) attach_observer!(worker_index, total_workers, shm_path) rescue LoadError, ArgumentError @registry = nil @enabled = false end |
Instance Attribute Details
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
17 18 19 |
# File 'lib/async/background/metrics.rb', line 17 def registry @registry end |
Class Method Details
.default_shm_path ⇒ Object
114 115 116 |
# File 'lib/async/background/metrics.rb', line 114 def self.default_shm_path File.join(Dir.tmpdir, "async-background.shm") end |
.read_all(total_workers:, path: default_shm_path) ⇒ Object
Read metrics for all workers from the shm file. No server needed — just reads the mmap’d file.
Async::Background::Metrics.read_all(total_workers: 2)
# => [
# { worker: 1, total_runs: 142, active_jobs: 1, ... },
# { worker: 2, total_runs: 98, active_jobs: 0, ... }
# ]
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/async/background/metrics.rb', line 93 def self.read_all(total_workers:, path: default_shm_path) require 'async/utilization' s = schema segment = segment_size file_size = segment * total_workers buffer = File.open(path, "rb") do |f| IO::Buffer.map(f, file_size, 0) end (1..total_workers).map do |i| base = (i - 1) * segment row = { worker: i } s.fields.each do |field| row[field.name] = buffer.get_value(field.type, base + field.offset) end row end end |
.schema ⇒ Object
79 80 81 82 |
# File 'lib/async/background/metrics.rb', line 79 def self.schema require 'async/utilization' ::Async::Utilization::Schema.build(SCHEMA_FIELDS) end |
.segment_size ⇒ Object
118 119 120 |
# File 'lib/async/background/metrics.rb', line 118 def self.segment_size SCHEMA_FIELDS.sum { |_, type| IO::Buffer.size_of(type) } end |
Instance Method Details
#enabled? ⇒ Boolean
33 34 35 |
# File 'lib/async/background/metrics.rb', line 33 def enabled? @enabled end |
#job_failed(entry, error) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/async/background/metrics.rb', line 53 def job_failed(entry, error) return unless @enabled @registry.decrement(:active_jobs) @registry.increment(:total_failures) end |
#job_finished(entry, duration) ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/async/background/metrics.rb', line 45 def job_finished(entry, duration) return unless @enabled @registry.decrement(:active_jobs) @registry.increment(:total_successes) @registry.set(:last_duration_ms, (duration * 1000).to_i) end |
#job_skipped(entry) ⇒ Object
67 68 69 70 71 |
# File 'lib/async/background/metrics.rb', line 67 def job_skipped(entry) return unless @enabled @registry.increment(:total_skips) end |
#job_started(entry) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/async/background/metrics.rb', line 37 def job_started(entry) return unless @enabled @registry.increment(:total_runs) @registry.increment(:active_jobs) @registry.set(:last_run_at, Process.clock_gettime(Process::CLOCK_REALTIME).to_i) end |
#job_timed_out(entry) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/async/background/metrics.rb', line 60 def job_timed_out(entry) return unless @enabled @registry.decrement(:active_jobs) @registry.increment(:total_timeouts) end |
#values ⇒ Object
73 74 75 76 77 |
# File 'lib/async/background/metrics.rb', line 73 def values return {} unless @enabled @registry.values end |