Class: SidekiqUniqueJobs::LockMetrics

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq_unique_jobs/lock_metrics.rb

Overview

Thread-safe lock metrics tracker. Accumulates counters in memory and flushes to Redis periodically. Modeled after Sidekiq::Metrics::ExecutionTracker.

Author:

  • Mikael Henriksson <mikael@mhenrixon.com>

Constant Summary collapse

METRICS_PREFIX =
"uniquejobs:metrics"
METRICS_TTL =

8 hours

8 * 60 * 60
EVENTS =
[:locked, :lock_failed, :unlocked, :unlock_failed, :execution_failed, :reaped].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeLockMetrics

Returns a new instance of LockMetrics.



15
16
17
18
# File 'lib/sidekiq_unique_jobs/lock_metrics.rb', line 15

def initialize
  @counters = Hash.new(0)
  @mutex = Mutex.new
end

Class Method Details

.by_type(minutes: 60) ⇒ Array<Array(String, Hash)>

Query and group by lock type for web UI display

Parameters:

  • minutes (Integer) (defaults to: 60)

    how many minutes to look back

Returns:



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/sidekiq_unique_jobs/lock_metrics.rb', line 86

def self.by_type(minutes: 60)
  raw = query(minutes: minutes)
  grouped = Hash.new { |h, k| h[k] = Hash.new(0) }

  raw.each do |key, count|
    type, event = key.split("|", 2)
    grouped[type][event.to_sym] = count
  end

  grouped.sort_by { |type, _| (type == "total") ? "zzz" : type }
end

.query(minutes: 60) ⇒ Hash<String, Integer>

Query metrics for the last N minutes

Parameters:

  • minutes (Integer) (defaults to: 60)

    how many minutes to look back

Returns:

  • (Hash<String, Integer>)

    aggregated counters



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sidekiq_unique_jobs/lock_metrics.rb', line 57

def self.query(minutes: 60)
  now = Time.now.utc
  results = Hash.new(0)

  keys = Array.new(minutes) do |i|
    t = now - (i * 60)
    "#{METRICS_PREFIX}|#{t.strftime('%y%m%d|%-H:%M')}"
  end

  Sidekiq.redis do |conn|
    responses = conn.pipelined do |pipe|
      keys.each { |key| pipe.call("HGETALL", key) }
    end

    responses.each do |data|
      next unless data.is_a?(Array) || data.is_a?(Hash)

      pairs = data.is_a?(Hash) ? data : data.each_slice(2)
      pairs.each { |field, count| results[field] += count.to_i }
    end
  end

  results
end

Instance Method Details

#flush(time = Time.now) ⇒ Object

Flush in-memory counters to Redis

Parameters:

  • time (Time) (defaults to: Time.now)

    the current time (for bucketing)



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/sidekiq_unique_jobs/lock_metrics.rb', line 35

def flush(time = Time.now)
  data = reset
  return if data.empty?

  bucket = time.utc.strftime("%y%m%d|%-H:%M")
  key = "#{METRICS_PREFIX}|#{bucket}"

  Sidekiq.redis do |conn|
    conn.pipelined do |pipe|
      data.each { |field, count| pipe.call("HINCRBY", key, field, count) }
      pipe.call("EXPIRE", key, METRICS_TTL.to_s)
    end
  end
rescue StandardError
  # Re-merge unflushed data back into counters so it's not lost
  @mutex.synchronize { data.each { |k, v| @counters[k] += v } }
end

#track(event, item) ⇒ Object

Record a lock event

Parameters:

  • event (Symbol)

    one of EVENTS

  • item (Hash)

    the Sidekiq job hash (needs “lock” key)



24
25
26
27
28
29
30
# File 'lib/sidekiq_unique_jobs/lock_metrics.rb', line 24

def track(event, item)
  lock_type = item.is_a?(Hash) ? (item["lock"] || "unknown") : "unknown"
  @mutex.synchronize do
    @counters["#{lock_type}|#{event}"] += 1
    @counters["total|#{event}"] += 1
  end
end