Class: Async::Background::Metrics

Inherits:
Object
  • Object
show all
Defined in:
lib/async/background/metrics.rb

Constant Summary collapse

SCHEMA_FIELDS =
{
  total_runs:       :u64,
  total_successes:  :u64,
  total_failures:   :u64,
  total_timeouts:   :u64,
  total_skips:      :u64,
  active_jobs:      :u32,
  last_run_at:      :u64,
  last_duration_ms: :u32
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_index:, total_workers:, shm_path: self.class.default_shm_path) ⇒ Metrics

Returns a new instance of Metrics.



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/async/background/metrics.rb', line 19

def initialize(worker_index:, total_workers:, shm_path: self.class.default_shm_path)
  require 'async/utilization'

  @registry = nil
  @enabled  = false
  @registry = ::Async::Utilization::Registry.new
  @enabled  = true
  ensure_shm!(total_workers, shm_path)
  attach_observer!(worker_index, total_workers, shm_path)
rescue LoadError, ArgumentError
  @registry = nil
  @enabled  = false
end

Instance Attribute Details

#registryObject (readonly)

Returns the value of attribute registry.



17
18
19
# File 'lib/async/background/metrics.rb', line 17

def registry
  @registry
end

Class Method Details

.default_shm_pathObject



114
115
116
# File 'lib/async/background/metrics.rb', line 114

def self.default_shm_path
  File.join(Dir.tmpdir, "async-background.shm")
end

.read_all(total_workers:, path: default_shm_path) ⇒ Object

Read metrics for all workers from the shm file. No server needed — just reads the mmap’d file.

Async::Background::Metrics.read_all(total_workers: 2)
# => [
#   { worker: 1, total_runs: 142, active_jobs: 1, ... },
#   { worker: 2, total_runs: 98,  active_jobs: 0, ... }
# ]


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/async/background/metrics.rb', line 93

def self.read_all(total_workers:, path: default_shm_path)
  require 'async/utilization'

  s = schema
  segment = segment_size
  file_size = segment * total_workers

  buffer = File.open(path, "rb") do |f|
    IO::Buffer.map(f, file_size, 0)
  end

  (1..total_workers).map do |i|
    base = (i - 1) * segment
    row = { worker: i }
    s.fields.each do |field|
      row[field.name] = buffer.get_value(field.type, base + field.offset)
    end
    row
  end
end

.schemaObject



79
80
81
82
# File 'lib/async/background/metrics.rb', line 79

def self.schema
  require 'async/utilization'
  ::Async::Utilization::Schema.build(SCHEMA_FIELDS)
end

.segment_sizeObject



118
119
120
# File 'lib/async/background/metrics.rb', line 118

def self.segment_size
  SCHEMA_FIELDS.sum { |_, type| IO::Buffer.size_of(type) }
end

Instance Method Details

#enabled?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/async/background/metrics.rb', line 33

def enabled?
  @enabled
end

#job_failed(entry, error) ⇒ Object



53
54
55
56
57
58
# File 'lib/async/background/metrics.rb', line 53

def job_failed(entry, error)
  return unless @enabled

  @registry.decrement(:active_jobs)
  @registry.increment(:total_failures)
end

#job_finished(entry, duration) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/async/background/metrics.rb', line 45

def job_finished(entry, duration)
  return unless @enabled

  @registry.decrement(:active_jobs)
  @registry.increment(:total_successes)
  @registry.set(:last_duration_ms, (duration * 1000).to_i)
end

#job_skipped(entry) ⇒ Object



67
68
69
70
71
# File 'lib/async/background/metrics.rb', line 67

def job_skipped(entry)
  return unless @enabled

  @registry.increment(:total_skips)
end

#job_started(entry) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/async/background/metrics.rb', line 37

def job_started(entry)
  return unless @enabled

  @registry.increment(:total_runs)
  @registry.increment(:active_jobs)
  @registry.set(:last_run_at, Process.clock_gettime(Process::CLOCK_REALTIME).to_i)
end

#job_timed_out(entry) ⇒ Object



60
61
62
63
64
65
# File 'lib/async/background/metrics.rb', line 60

def job_timed_out(entry)
  return unless @enabled

  @registry.decrement(:active_jobs)
  @registry.increment(:total_timeouts)
end

#valuesObject



73
74
75
76
77
# File 'lib/async/background/metrics.rb', line 73

def values
  return {} unless @enabled

  @registry.values
end