Class: Sidekiq::Routing::Auto::JobDurationTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/routing/auto/job_duration_tracker.rb

Constant Summary collapse

REDIS_KEY_PREFIX =
"sidekiq:auto_reroute:durations"

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.average_duration(job_class, queue) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/sidekiq/routing/auto/job_duration_tracker.rb', line 8

def self.average_duration(job_class, queue)
  key = redis_key(job_class, queue)
  cutoff = Time.now.to_i - Routing::Auto.configuration.duration_tracking_window

  entries = Sidekiq.redis do |redis|
    redis.zrangebyscore(key, cutoff, "+inf")
  end

  return nil if entries.empty?

  durations = entries.map { |entry| entry.split(":").last.to_f }
  (durations.sum / durations.size).round
end

.job_class_name(worker, job) ⇒ Object

Match Sidekiq::JobRecord#klass so duration writes line up with the name NoisyNeighborDetector queries by (job.klass already unwraps ActiveJob wrappers; without this, ActiveJob jobs would record under JobWrapper and never be looked up).



42
43
44
# File 'lib/sidekiq/routing/auto/job_duration_tracker.rb', line 42

def self.job_class_name(worker, job)
  job["wrapped"] || worker.class.name
end

.redis_key(job_class, queue) ⇒ Object



34
35
36
# File 'lib/sidekiq/routing/auto/job_duration_tracker.rb', line 34

def self.redis_key(job_class, queue)
  "#{REDIS_KEY_PREFIX}:#{job_class}:#{queue}"
end

.tracked_job_classes(queue) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/sidekiq/routing/auto/job_duration_tracker.rb', line 22

def self.tracked_job_classes(queue)
  pattern = "#{REDIS_KEY_PREFIX}:*:#{queue}"
  prefix = "#{REDIS_KEY_PREFIX}:"
  suffix = ":#{queue}"

  # Class names contain "::" so split-by-colon mangles them. Strip the
  # known prefix/suffix instead — works for any namespaced class.
  Sidekiq.redis do |redis|
    redis.keys(pattern).map { |key| key.delete_prefix(prefix).delete_suffix(suffix) }
  end
end

Instance Method Details

#call(worker, job, queue) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/sidekiq/routing/auto/job_duration_tracker.rb', line 46

def call(worker, job, queue)
  start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  yield
ensure
  duration_ms = ((::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - start_time) * 1000).round
  begin
    record_duration(self.class.job_class_name(worker, job), queue, duration_ms)
  rescue StandardError => e
    Routing::Auto.logger.warn("[JobDurationTracker] failed to record duration: #{e.message}")
  end
end

#record_duration(job_class, queue, duration_ms) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/sidekiq/routing/auto/job_duration_tracker.rb', line 58

def record_duration(job_class, queue, duration_ms)
  key = redis_key(job_class, queue)
  timestamp = Time.now.to_i

  Sidekiq.redis do |redis|
    redis.zadd(key, timestamp, "#{timestamp}:#{duration_ms}")
    redis.expire(key, Routing::Auto.configuration.duration_tracking_window * 2)

    cutoff = timestamp - Routing::Auto.configuration.duration_tracking_window
    redis.zremrangebyscore(key, "-inf", cutoff)
  end
end