Module: SourceMonitor::Jobs::Visibility

Defined in:
lib/source_monitor/jobs/visibility.rb

Class Method Summary collapse

Class Method Details

.adapter_nameObject



21
22
23
# File 'lib/source_monitor/jobs/visibility.rb', line 21

def adapter_name
  ActiveJob::Base.queue_adapter_name
end

.last_enqueued_at(queue_name) ⇒ Object



29
30
31
# File 'lib/source_monitor/jobs/visibility.rb', line 29

def last_enqueued_at(queue_name)
  state_for(queue_name)[:last_enqueued_at]
end

.last_finished_at(queue_name) ⇒ Object



37
38
39
# File 'lib/source_monitor/jobs/visibility.rb', line 37

def last_finished_at(queue_name)
  state_for(queue_name)[:last_finished_at]
end

.last_started_at(queue_name) ⇒ Object



33
34
35
# File 'lib/source_monitor/jobs/visibility.rb', line 33

def last_started_at(queue_name)
  state_for(queue_name)[:last_started_at]
end

.monitorObject



79
80
81
# File 'lib/source_monitor/jobs/visibility.rb', line 79

def monitor
  @monitor ||= Monitor.new
end

.queue_depth(queue_name) ⇒ Object



25
26
27
# File 'lib/source_monitor/jobs/visibility.rb', line 25

def queue_depth(queue_name)
  state_for(queue_name)[:depth]
end

.queue_identifier(job) ⇒ Object



59
60
61
# File 'lib/source_monitor/jobs/visibility.rb', line 59

def queue_identifier(job)
  (job.queue_name || SourceMonitor.config.fetch_queue_name).to_s
end

.queue_stateObject



63
64
65
66
67
# File 'lib/source_monitor/jobs/visibility.rb', line 63

def queue_state
  @queue_state ||= Hash.new do |hash, key|
    hash[key] = { depth: 0, last_enqueued_at: nil, last_started_at: nil, last_finished_at: nil }
  end
end

.reset!Object



41
42
43
44
45
# File 'lib/source_monitor/jobs/visibility.rb', line 41

def reset!
  synchronize do
    @queue_state = nil
  end
end

.setup!Object



11
12
13
14
15
16
17
18
19
# File 'lib/source_monitor/jobs/visibility.rb', line 11

def setup!
  return if @subscribed || !SourceMonitor.config.job_metrics_enabled

  subscribe_enqueue
  subscribe_perform_start
  subscribe_perform

  @subscribed = true
end

.state_for(name) ⇒ Object



69
70
71
72
73
# File 'lib/source_monitor/jobs/visibility.rb', line 69

def state_for(name)
  synchronize do
    queue_state[name.to_s]
  end
end

.state_snapshotObject



47
48
49
50
51
# File 'lib/source_monitor/jobs/visibility.rb', line 47

def state_snapshot
  synchronize do
    queue_state.transform_values(&:dup)
  end
end

.subscribe_enqueueObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/source_monitor/jobs/visibility.rb', line 83

def subscribe_enqueue
  ActiveSupport::Notifications.subscribe("enqueue.active_job") do |_event, _start, _finish, _id, payload|
    job = payload[:job]
    next unless trackable_job?(job)

    queue_name = queue_identifier(job)

    synchronize do
      state = queue_state[queue_name]
      state[:depth] += 1
      state[:last_enqueued_at] = Time.current
      SourceMonitor::Metrics.gauge("jobs_queue_depth_#{queue_name}", state[:depth])
      SourceMonitor::Metrics.gauge("jobs_last_enqueued_at_#{queue_name}", state[:last_enqueued_at])
    end
  end
end

.subscribe_performObject



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/source_monitor/jobs/visibility.rb', line 117

def subscribe_perform
  ActiveSupport::Notifications.subscribe("perform.active_job") do |_event, _start, _finish, _id, payload|
    job = payload[:job]
    next unless trackable_job?(job)

    queue_name = queue_identifier(job)

    synchronize do
      state = queue_state[queue_name]
      state[:last_finished_at] = Time.current
      SourceMonitor::Metrics.gauge("jobs_last_finished_at_#{queue_name}", state[:last_finished_at])
    end
  end
end

.subscribe_perform_startObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/source_monitor/jobs/visibility.rb', line 100

def subscribe_perform_start
  ActiveSupport::Notifications.subscribe("perform_start.active_job") do |_event, _start, _finish, _id, payload|
    job = payload[:job]
    next unless trackable_job?(job)

    queue_name = queue_identifier(job)

    synchronize do
      state = queue_state[queue_name]
      state[:depth] = [ state[:depth] - 1, 0 ].max
      state[:last_started_at] = Time.current
      SourceMonitor::Metrics.gauge("jobs_queue_depth_#{queue_name}", state[:depth])
      SourceMonitor::Metrics.gauge("jobs_last_started_at_#{queue_name}", state[:last_started_at])
    end
  end
end

.synchronize(&block) ⇒ Object



75
76
77
# File 'lib/source_monitor/jobs/visibility.rb', line 75

def synchronize(&block)
  monitor.synchronize(&block)
end

.trackable_job?(job) ⇒ Boolean

Returns:

  • (Boolean)


53
54
55
56
57
# File 'lib/source_monitor/jobs/visibility.rb', line 53

def trackable_job?(job)
  job.class.name.start_with?("SourceMonitor::")
rescue StandardError
  false
end