Class: Wurk::History

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/history.rb

Overview

Sidekiq Enterprise §5 Historical Metrics snapshotter. A leader-gated background thread that, every ‘config.retain_history` seconds, emits a statsd-shaped snapshot to the configured dogstatsd client — either the default §5.2 gauge set or a user-supplied collector block.

Configured in a server block:

Sidekiq.configure_server do |config|
  config.dogstatsd = -> { Datadog::Statsd.new('localhost', 8125) }
  config.retain_history(30)                      # default §5.2 gauges
  # …or a custom collector:
  config.retain_history(30) do |s|
    Sidekiq::Queue.all.each do |q|
      s.gauge("sidekiq.queue.size", q.size, tags: ["queue:#{q.name}"])
    end
  end
end

The block receives the raw dogstatsd client ‘s` (quacks like `Datadog::Statsd`: gauge/count/histogram/batch) and writes fully-qualified `sidekiq.*` metric names itself, matching Sidekiq Ent. Leader-gated via the cluster `dear-leader` lock so exactly one process emits per cluster.

Every snapshot is also appended to the capped Redis stream ‘history:metrics` (§5.3) — the same key a migrated Sidekiq Ent install uses — so the dashboard’s Historical view has a data source independent of any external statsd, and pre-existing Ent stream data renders without rewrite. The stream write happens whenever the snapshotter runs; the dogstatsd emit is skipped only when no client is configured.

Aliased as ‘Sidekiq::History` (drop-in contract). Spec: docs/target/sidekiq-ent.md §5.1–§5.3.

Constant Summary collapse

SNAPSHOT_FIELDS =

Stream field → Stats reader. Single source for both the ‘history:metrics` stream entry and the default §5.2 statsd gauge set (which prefixes `sidekiq.`). Order is the display order.

{
  'processed' => :processed,
  'failures' => :failed,
  'enqueued' => :enqueued,
  'retries' => :retry_size,
  'dead' => :dead_size,
  'scheduled' => :scheduled_size,
  'busy' => :workers_size
}.freeze
STREAM_CAP =

Approximate cap on retained snapshots (XADD MAXLEN ~). At the default 30s interval this is ~3.5 days of history; older points age out. ‘~` lets Redis trim in whole macro-nodes, so the actual length can briefly exceed the cap — matching Ent’s best-effort retention.

10_000
STREAM_DEFAULT_LIMIT =
1000

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary

Attributes included from Component

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ History

Returns a new instance of History.



64
65
66
67
68
69
70
71
72
73
# File 'lib/wurk/history.rb', line 64

def initialize(config)
  @config = config
  @interval = config.history_interval
  @collector = config.history_collector
  @stream_cap = config[:history_stream_cap] || STREAM_CAP
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @thread = nil
end

Class Method Details

.numeric(value) ⇒ Object

Coerce a stream field to Int/Float for charting; leave non-numeric Ent fields (e.g. a label) untouched so nothing is silently dropped.



137
138
139
140
141
142
# File 'lib/wurk/history.rb', line 137

def self.numeric(value)
  float = Float(value)
  (float % 1).zero? ? float.to_i : float
rescue ::ArgumentError, ::TypeError
  value
end

.parse_entry(entry_id, fields) ⇒ Object



123
124
125
126
127
128
# File 'lib/wurk/history.rb', line 123

def self.parse_entry(entry_id, fields)
  pairs = fields.is_a?(::Array) ? fields.each_slice(2).to_h : fields
  point = { at: stream_epoch(entry_id) }
  pairs.each { |field, value| point[field.to_sym] = numeric(value) }
  point
end

.recent(limit: STREAM_DEFAULT_LIMIT) ⇒ Object

Most-recent snapshots from the ‘history:metrics` stream, oldest→newest. Each point is `{ at: <epoch seconds>, <field>: <numeric>, … }`. Fields are read generically, so a migrated Sidekiq Ent install’s entries render without rewrite regardless of which fields they carry.



117
118
119
120
121
# File 'lib/wurk/history.rb', line 117

def self.recent(limit: STREAM_DEFAULT_LIMIT)
  count = limit.to_i.clamp(1, STREAM_CAP)
  entries = Wurk.redis { |c| c.call('XREVRANGE', Keys::HISTORY_METRICS, '+', '-', 'COUNT', count) }
  entries.reverse.map { |entry_id, fields| parse_entry(entry_id, fields) }
end

.stream_epoch(entry_id) ⇒ Object

Redis stream IDs are “<ms>-<seq>”; the ms half is the snapshot time.



131
132
133
# File 'lib/wurk/history.rb', line 131

def self.stream_epoch(entry_id)
  entry_id.to_s.split('-', 2).first.to_i / 1000.0
end

Instance Method Details

#snapshotObject

One snapshot, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual “snapshot now” can drive it directly. Always appends to the ‘history:metrics` stream (the dashboard’s source); additionally emits to dogstatsd when a client is configured.



106
107
108
109
110
111
# File 'lib/wurk/history.rb', line 106

def snapshot
  values = collect_values
  record_stream(values)
  emit_statsd(values)
  nil
end

#startObject



75
76
77
78
79
80
81
82
83
# File 'lib/wurk/history.rb', line 75

def start
  @thread ||= safe_thread('history-snapshot') do # rubocop:disable Naming/MemoizedInstanceVariableName
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



85
86
87
88
89
90
# File 'lib/wurk/history.rb', line 85

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tickObject

Leader-gated: only the elected leader emits, so N workers don’t each publish the same cluster-wide gauges every interval.



94
95
96
97
98
99
100
# File 'lib/wurk/history.rb', line 94

def tick
  return unless leader?

  snapshot
rescue StandardError => e
  handle_exception(e, { context: 'history-snapshot' })
end