Module: SourceMonitor::Jobs::Visibility
- Defined in:
- lib/source_monitor/jobs/visibility.rb
Class Method Summary collapse
- .adapter_name ⇒ Object
- .last_enqueued_at(queue_name) ⇒ Object
- .last_finished_at(queue_name) ⇒ Object
- .last_started_at(queue_name) ⇒ Object
- .monitor ⇒ Object
- .queue_depth(queue_name) ⇒ Object
- .queue_identifier(job) ⇒ Object
- .queue_state ⇒ Object
- .reset! ⇒ Object
- .setup! ⇒ Object
- .state_for(name) ⇒ Object
- .state_snapshot ⇒ Object
- .subscribe_enqueue ⇒ Object
- .subscribe_perform ⇒ Object
- .subscribe_perform_start ⇒ Object
- .synchronize(&block) ⇒ Object
- .trackable_job?(job) ⇒ Boolean
Class Method Details
.adapter_name ⇒ Object
21 22 23 |
# File 'lib/source_monitor/jobs/visibility.rb', line 21 def adapter_name ActiveJob::Base.queue_adapter_name end |
.last_enqueued_at(queue_name) ⇒ Object
29 30 31 |
# File 'lib/source_monitor/jobs/visibility.rb', line 29 def last_enqueued_at(queue_name) state_for(queue_name)[:last_enqueued_at] end |
.last_finished_at(queue_name) ⇒ Object
37 38 39 |
# File 'lib/source_monitor/jobs/visibility.rb', line 37 def last_finished_at(queue_name) state_for(queue_name)[:last_finished_at] end |
.last_started_at(queue_name) ⇒ Object
33 34 35 |
# File 'lib/source_monitor/jobs/visibility.rb', line 33 def last_started_at(queue_name) state_for(queue_name)[:last_started_at] end |
.monitor ⇒ Object
79 80 81 |
# File 'lib/source_monitor/jobs/visibility.rb', line 79 def monitor @monitor ||= Monitor.new end |
.queue_depth(queue_name) ⇒ Object
25 26 27 |
# File 'lib/source_monitor/jobs/visibility.rb', line 25 def queue_depth(queue_name) state_for(queue_name)[:depth] end |
.queue_identifier(job) ⇒ Object
59 60 61 |
# File 'lib/source_monitor/jobs/visibility.rb', line 59 def queue_identifier(job) (job.queue_name || SourceMonitor.config.fetch_queue_name).to_s end |
.queue_state ⇒ Object
63 64 65 66 67 |
# File 'lib/source_monitor/jobs/visibility.rb', line 63 def queue_state @queue_state ||= Hash.new do |hash, key| hash[key] = { depth: 0, last_enqueued_at: nil, last_started_at: nil, last_finished_at: nil } end end |
.reset! ⇒ Object
41 42 43 44 45 |
# File 'lib/source_monitor/jobs/visibility.rb', line 41 def reset! synchronize do @queue_state = nil end end |
.setup! ⇒ Object
11 12 13 14 15 16 17 18 19 |
# File 'lib/source_monitor/jobs/visibility.rb', line 11 def setup! return if @subscribed || !SourceMonitor.config.job_metrics_enabled subscribe_enqueue subscribe_perform_start subscribe_perform @subscribed = true end |
.state_for(name) ⇒ Object
69 70 71 72 73 |
# File 'lib/source_monitor/jobs/visibility.rb', line 69 def state_for(name) synchronize do queue_state[name.to_s] end end |
.state_snapshot ⇒ Object
47 48 49 50 51 |
# File 'lib/source_monitor/jobs/visibility.rb', line 47 def state_snapshot synchronize do queue_state.transform_values(&:dup) end end |
.subscribe_enqueue ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/source_monitor/jobs/visibility.rb', line 83 def subscribe_enqueue ActiveSupport::Notifications.subscribe("enqueue.active_job") do |_event, _start, _finish, _id, payload| job = payload[:job] next unless trackable_job?(job) queue_name = queue_identifier(job) synchronize do state = queue_state[queue_name] state[:depth] += 1 state[:last_enqueued_at] = Time.current SourceMonitor::Metrics.gauge("jobs_queue_depth_#{queue_name}", state[:depth]) SourceMonitor::Metrics.gauge("jobs_last_enqueued_at_#{queue_name}", state[:last_enqueued_at]) end end end |
.subscribe_perform ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/source_monitor/jobs/visibility.rb', line 117 def subscribe_perform ActiveSupport::Notifications.subscribe("perform.active_job") do |_event, _start, _finish, _id, payload| job = payload[:job] next unless trackable_job?(job) queue_name = queue_identifier(job) synchronize do state = queue_state[queue_name] state[:last_finished_at] = Time.current SourceMonitor::Metrics.gauge("jobs_last_finished_at_#{queue_name}", state[:last_finished_at]) end end end |
.subscribe_perform_start ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/source_monitor/jobs/visibility.rb', line 100 def subscribe_perform_start ActiveSupport::Notifications.subscribe("perform_start.active_job") do |_event, _start, _finish, _id, payload| job = payload[:job] next unless trackable_job?(job) queue_name = queue_identifier(job) synchronize do state = queue_state[queue_name] state[:depth] = [ state[:depth] - 1, 0 ].max state[:last_started_at] = Time.current SourceMonitor::Metrics.gauge("jobs_queue_depth_#{queue_name}", state[:depth]) SourceMonitor::Metrics.gauge("jobs_last_started_at_#{queue_name}", state[:last_started_at]) end end end |
.synchronize(&block) ⇒ Object
75 76 77 |
# File 'lib/source_monitor/jobs/visibility.rb', line 75 def synchronize(&block) monitor.synchronize(&block) end |
.trackable_job?(job) ⇒ Boolean
53 54 55 56 57 |
# File 'lib/source_monitor/jobs/visibility.rb', line 53 def trackable_job?(job) job.class.name.start_with?("SourceMonitor::") rescue StandardError false end |