Class: Wurk::Metrics::History

Inherits:
Object
  • Object
show all
Includes:
Wurk::Middleware::ServerMiddleware
Defined in:
lib/wurk/metrics/history.rb

Overview

Ent feature parity (§5): server middleware that records per-job-class execution metrics into Redis time-buckets. The on-the-wire schema is wire-compat with Sidekiq 8.x’s history pane so dashboards built against ‘j|YYMMDD|H:M` HASH keys keep working unchanged.

Bucket layout (spec: docs/target/sidekiq-free.md §1.6):

j|YYMMDD|H:M    HASH   per-minute bucket, TTL = MID_TERM (3 days)
  <klass>|p     INT    processed count
  <klass>|f     INT    failed count
  <klass>|ms    INT    total ms spent

j|YYMMDD|H:m0   HASH   10-minute rollup (last digit zeroed),
                       TTL = SHORT_TERM (8 hours) — short window for
                       quick aggregate queries without scanning 600 minute keys.

<klass>-YYMMDD-H  HASH per-class hourly histogram, TTL = MID_TERM

Every bucket TTL is set on first write (EXPIRE NX-equivalent: only when the HASH was newly created in this call) — re-asserting TTL on every write would keep the bucket alive indefinitely while traffic continues, but that’s the desired behavior here: as long as a class keeps running, we keep the minute bucket around for the retention window measured from *last write*, not from first write. So we EXPIRE unconditionally.

The middleware is hot-path — every successful job pays for it. Writes are pipelined in a single round-trip per job (1 HINCRBY × 3 + 1 EXPIRE per bucket × 3 buckets = 12 commands, batched).

Constant Summary collapse

MID_TERM =

Per spec §1.6 — naming mirrors the upstream constants so anyone grepping the Sidekiq source for ‘MID_TERM` lands here.

3 * 24 * 60 * 60
SHORT_TERM =

3 days, in seconds

8 * 60 * 60
MINUTE_KEY_PREFIX =

8 hours, in seconds

'j|'
DATE_FORMAT =

YYMMDD — two-digit year per spec

'%y%m%d'

Instance Attribute Summary

Attributes included from Wurk::Middleware::ServerMiddleware

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Wurk::Middleware::ServerMiddleware

#logger, #redis, #redis_pool

Class Method Details

.hour_key(klass, time) ⇒ Object



124
125
126
127
# File 'lib/wurk/metrics/history.rb', line 124

def hour_key(klass, time)
  t = time.utc
  "#{klass}-#{t.strftime(DATE_FORMAT)}-#{t.hour}"
end

.incr_bucket(pipe, key, parts) ⇒ Object



103
104
105
106
107
108
# File 'lib/wurk/metrics/history.rb', line 103

def incr_bucket(pipe, key, parts)
  outcome_field, ms_field, ms, ttl = parts
  pipe.call('HINCRBY', key, outcome_field, 1)
  pipe.call('HINCRBY', key, ms_field, ms)
  pipe.call('EXPIRE', key, ttl)
end

.minute_key(time) ⇒ Object

Public formatters — Wurk::Metrics::Query reuses these so the two cannot drift on bucket-naming convention.



112
113
114
115
116
# File 'lib/wurk/metrics/history.rb', line 112

def minute_key(time)
  t = time.utc
  format("#{MINUTE_KEY_PREFIX}%<date>s|%<hr>d:%<min>d",
         date: t.strftime(DATE_FORMAT), hr: t.hour, min: t.min)
end

.pipeline_write(conn, klass, ms, success, buckets) ⇒ Object

Minute + 10-min rollup share a per-class ‘|p|f|ms` field layout; the hourly bucket is already class-scoped so its fields are bare `p|f|ms`. Pipeline all 9 commands in one round-trip.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/wurk/metrics/history.rb', line 85

def pipeline_write(conn, klass, ms, success, buckets)
  outcome = success ? 'p' : 'f'
  class_outcome = "#{klass}|#{outcome}"
  class_ms = "#{klass}|ms"
  conn.pipelined do |pipe|
    incr_bucket(pipe, buckets[:minute], [class_outcome, class_ms, ms, MID_TERM])
    # The minute key and the 10-min rollup key coincide whenever the
    # minute ends in 0 (rollup zeroes the last digit). Writing both
    # would double-count the shared field — the minute write above
    # already lands on it — so skip the rollup write then. Minutes
    # x1..x9 still accumulate into the x0 rollup key as normal.
    unless buckets[:rollup] == buckets[:minute]
      incr_bucket(pipe, buckets[:rollup], [class_outcome, class_ms, ms, SHORT_TERM])
    end
    incr_bucket(pipe, buckets[:hour], [outcome, 'ms', ms, MID_TERM])
  end
end

.record(klass, duration_ms, success:, redis_pool: nil, at: ::Time.now) ⇒ Object

Single Redis round-trip per job. ‘success: true` → `<klass>|p`; `success: false` → `<klass>|f`. `<klass>|ms` accumulates total runtime in milliseconds for both outcomes (so an operator can ask “how much wall-clock time has FooJob consumed?” without branching on outcome).



72
73
74
75
76
77
78
79
80
# File 'lib/wurk/metrics/history.rb', line 72

def record(klass, duration_ms, success:, redis_pool: nil, at: ::Time.now)
  return if klass.nil? || klass.empty?

  ms = duration_ms.to_i
  ms = 0 if ms.negative?
  buckets = { minute: minute_key(at), rollup: rollup_key(at), hour: hour_key(klass, at) }
  with_pool(redis_pool) { |conn| pipeline_write(conn, klass, ms, success, buckets) }
  nil
end

.rollup_key(time) ⇒ Object



118
119
120
121
122
# File 'lib/wurk/metrics/history.rb', line 118

def rollup_key(time)
  t = time.utc
  format("#{MINUTE_KEY_PREFIX}%<date>s|%<hr>d:%<min>d",
         date: t.strftime(DATE_FORMAT), hr: t.hour, min: (t.min / 10) * 10)
end

Instance Method Details

#call(_worker, job, _queue) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/wurk/metrics/history.rb', line 46

def call(_worker, job, _queue)
  klass = job['class']
  started = monotonic_ms
  success = false
  begin
    result = yield
    success = true
    result
  ensure
    duration = (monotonic_ms - started).round
    # Best-effort: a metrics write failure must never propagate into
    # the job result. The processor already finalized the ack path.
    begin
      self.class.record(klass, duration, success: success, redis_pool: redis_pool)
    rescue StandardError => e
      handle_error(e)
    end
  end
end