Class: Wurk::History
Overview
Sidekiq Enterprise §5 Historical Metrics snapshotter. A leader-gated background thread that, every ‘config.retain_history` seconds, emits a statsd-shaped snapshot to the configured dogstatsd client — either the default §5.2 gauge set or a user-supplied collector block.
Configured in a server block:
Sidekiq.configure_server do |config|
config.dogstatsd = -> { Datadog::Statsd.new('localhost', 8125) }
config.retain_history(30) # default §5.2 gauges
# …or a custom collector:
config.retain_history(30) do |s|
Sidekiq::Queue.all.each do |q|
s.gauge("sidekiq.queue.size", q.size, tags: ["queue:#{q.name}"])
end
end
end
The block receives the raw dogstatsd client ‘s` (quacks like `Datadog::Statsd`: gauge/count/histogram/batch) and writes fully-qualified `sidekiq.*` metric names itself, matching Sidekiq Ent. Leader-gated via the cluster `dear-leader` lock so exactly one process emits per cluster.
Every snapshot is also appended to the capped Redis stream ‘history:metrics` (§5.3) — the same key a migrated Sidekiq Ent install uses — so the dashboard’s Historical view has a data source independent of any external statsd, and pre-existing Ent stream data renders without rewrite. The stream write happens whenever the snapshotter runs; the dogstatsd emit is skipped only when no client is configured.
Aliased as ‘Sidekiq::History` (drop-in contract). Spec: docs/target/sidekiq-ent.md §5.1–§5.3.
Constant Summary collapse
- SNAPSHOT_FIELDS =
Stream field → Stats reader. Single source for both the ‘history:metrics` stream entry and the default §5.2 statsd gauge set (which prefixes `sidekiq.`). Order is the display order.
{ 'processed' => :processed, 'failures' => :failed, 'enqueued' => :enqueued, 'retries' => :retry_size, 'dead' => :dead_size, 'scheduled' => :scheduled_size, 'busy' => :workers_size }.freeze
- STREAM_CAP =
Approximate cap on retained snapshots (XADD MAXLEN ~). At the default 30s interval this is ~3.5 days of history; older points age out. ‘~` lets Redis trim in whole macro-nodes, so the actual length can briefly exceed the cap — matching Ent’s best-effort retention.
10_000- STREAM_DEFAULT_LIMIT =
1000
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary
Attributes included from Component
Class Method Summary collapse
-
.numeric(value) ⇒ Object
Coerce a stream field to Int/Float for charting; leave non-numeric Ent fields (e.g. a label) untouched so nothing is silently dropped.
- .parse_entry(entry_id, fields) ⇒ Object
-
.recent(limit: STREAM_DEFAULT_LIMIT) ⇒ Object
Most-recent snapshots from the ‘history:metrics` stream, oldest→newest.
-
.stream_epoch(entry_id) ⇒ Object
Redis stream IDs are “<ms>-<seq>”; the ms half is the snapshot time.
Instance Method Summary collapse
-
#initialize(config) ⇒ History
constructor
A new instance of History.
-
#snapshot ⇒ Object
One snapshot, bypassing the leader gate and the sleep loop.
- #start ⇒ Object
- #terminate ⇒ Object
-
#tick ⇒ Object
Leader-gated: only the elected leader emits, so N workers don’t each publish the same cluster-wide gauges every interval.
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config) ⇒ History
Returns a new instance of History.
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/wurk/history.rb', line 64 def initialize(config) @config = config @interval = config.history_interval @collector = config.history_collector @stream_cap = config[:history_stream_cap] || STREAM_CAP @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @thread = nil end |
Class Method Details
.numeric(value) ⇒ Object
Coerce a stream field to Int/Float for charting; leave non-numeric Ent fields (e.g. a label) untouched so nothing is silently dropped.
137 138 139 140 141 142 |
# File 'lib/wurk/history.rb', line 137 def self.numeric(value) float = Float(value) (float % 1).zero? ? float.to_i : float rescue ::ArgumentError, ::TypeError value end |
.parse_entry(entry_id, fields) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/wurk/history.rb', line 123 def self.parse_entry(entry_id, fields) pairs = fields.is_a?(::Array) ? fields.each_slice(2).to_h : fields point = { at: stream_epoch(entry_id) } pairs.each { |field, value| point[field.to_sym] = numeric(value) } point end |
.recent(limit: STREAM_DEFAULT_LIMIT) ⇒ Object
Most-recent snapshots from the ‘history:metrics` stream, oldest→newest. Each point is `{ at: <epoch seconds>, <field>: <numeric>, … }`. Fields are read generically, so a migrated Sidekiq Ent install’s entries render without rewrite regardless of which fields they carry.
117 118 119 120 121 |
# File 'lib/wurk/history.rb', line 117 def self.recent(limit: STREAM_DEFAULT_LIMIT) count = limit.to_i.clamp(1, STREAM_CAP) entries = Wurk.redis { |c| c.call('XREVRANGE', Keys::HISTORY_METRICS, '+', '-', 'COUNT', count) } entries.reverse.map { |entry_id, fields| parse_entry(entry_id, fields) } end |
.stream_epoch(entry_id) ⇒ Object
Redis stream IDs are “<ms>-<seq>”; the ms half is the snapshot time.
131 132 133 |
# File 'lib/wurk/history.rb', line 131 def self.stream_epoch(entry_id) entry_id.to_s.split('-', 2).first.to_i / 1000.0 end |
Instance Method Details
#snapshot ⇒ Object
One snapshot, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual “snapshot now” can drive it directly. Always appends to the ‘history:metrics` stream (the dashboard’s source); additionally emits to dogstatsd when a client is configured.
106 107 108 109 110 111 |
# File 'lib/wurk/history.rb', line 106 def snapshot values = collect_values record_stream(values) emit_statsd(values) nil end |
#start ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/wurk/history.rb', line 75 def start @thread ||= safe_thread('history-snapshot') do # rubocop:disable Naming/MemoizedInstanceVariableName wait until @done tick wait end end end |
#terminate ⇒ Object
85 86 87 88 89 90 |
# File 'lib/wurk/history.rb', line 85 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick ⇒ Object
Leader-gated: only the elected leader emits, so N workers don’t each publish the same cluster-wide gauges every interval.
94 95 96 97 98 99 100 |
# File 'lib/wurk/history.rb', line 94 def tick return unless leader? snapshot rescue StandardError => e handle_exception(e, { context: 'history-snapshot' }) end |