Class: Sidekiq::Fiber::Stats
- Inherits:
-
Object
- Object
- Sidekiq::Fiber::Stats
- Defined in:
- lib/sidekiq/fiber/stats.rb
Overview
Writes fiber execution stats to Redis. Called from the processor at key lifecycle points:
- fiber starts → record in-flight fiber
- fiber completes → remove in-flight, increment completed counter
- semaphore changes → update utilization
All keys are namespaced under “sidekiq-fiber:” and expire automatically so stale data doesn’t accumulate after a worker restarts.
Constant Summary collapse
- NAMESPACE =
"sidekiq-fiber"- FIBER_TTL =
10 minutes — long-running fiber alert threshold
600- THREAD_TTL =
2 minutes — thread stats expire if worker dies
120- GLOBAL_TTL =
120
Instance Method Summary collapse
- #all_in_flight_fibers ⇒ Object
-
#all_thread_stats ⇒ Object
── Readers (used by Web UI) ─────────────────────────────────────────────.
- #deregister_thread(thread_id:) ⇒ Object
- #fiber_completed(jid:, thread_id:) ⇒ Object
-
#fiber_started(jid:, job_class:, thread_id:) ⇒ Object
Called when a fiber starts executing a job.
- #global_summary ⇒ Object
-
#initialize(redis_pool) ⇒ Stats
constructor
A new instance of Stats.
- #register_thread(thread_id:, fiber_concurrency:) ⇒ Object
- #update_thread_stats(thread_id:, semaphore_size:, semaphore_acquired:) ⇒ Object
Constructor Details
#initialize(redis_pool) ⇒ Stats
Returns a new instance of Stats.
17 18 19 |
# File 'lib/sidekiq/fiber/stats.rb', line 17 def initialize(redis_pool) @redis = redis_pool end |
Instance Method Details
#all_in_flight_fibers ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/sidekiq/fiber/stats.rb', line 87 def all_in_flight_fibers @redis.with do |conn| keys = conn.keys("#{NAMESPACE}:fiber:*") keys.filter_map do |key| data = conn.hgetall(key) next if data.empty? jid = key.split(":").last data.merge( "jid" => jid, "running_for" => (Time.now.to_f - data["started_at"].to_f).round(1) ) end end end |
#all_thread_stats ⇒ Object
── Readers (used by Web UI) ─────────────────────────────────────────────
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/sidekiq/fiber/stats.rb', line 76 def all_thread_stats @redis.with do |conn| thread_ids = conn.smembers(threads_index_key) thread_ids.filter_map do |tid| stats = conn.hgetall(thread_key(tid)) next if stats.empty? stats.merge("thread_id" => tid) end end end |
#deregister_thread(thread_id:) ⇒ Object
67 68 69 70 71 72 |
# File 'lib/sidekiq/fiber/stats.rb', line 67 def deregister_thread(thread_id:) safe_redis do |conn| conn.srem(threads_index_key, thread_id) conn.del(thread_key(thread_id)) end end |
#fiber_completed(jid:, thread_id:) ⇒ Object
34 35 36 37 38 39 40 41 |
# File 'lib/sidekiq/fiber/stats.rb', line 34 def fiber_completed(jid:, thread_id:) safe_redis do |conn| conn.del(fiber_key(jid)) conn.hincrby(thread_key(thread_id), "completed_total", 1) conn.hset(thread_key(thread_id), "last_completed_at", Time.now.to_f) conn.expire(thread_key(thread_id), THREAD_TTL) end end |
#fiber_started(jid:, job_class:, thread_id:) ⇒ Object
Called when a fiber starts executing a job.
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/sidekiq/fiber/stats.rb', line 22 def fiber_started(jid:, job_class:, thread_id:) safe_redis do |conn| key = fiber_key(jid) conn.hset(key, "job_class", job_class, "thread_id", thread_id, "started_at", Time.now.to_f ) conn.expire(key, FIBER_TTL) end end |
#global_summary ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/sidekiq/fiber/stats.rb', line 102 def global_summary @redis.with do |conn| thread_ids = conn.smembers(threads_index_key) total_active = 0 total_max = 0 thread_ids.each do |tid| stats = conn.hgetall(thread_key(tid)) total_active += stats["semaphore_acquired"].to_i total_max += stats["semaphore_size"].to_i end { thread_count: thread_ids.size, total_active: total_active, total_max: total_max } end end |
#register_thread(thread_id:, fiber_concurrency:) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/sidekiq/fiber/stats.rb', line 53 def register_thread(thread_id:, fiber_concurrency:) safe_redis do |conn| conn.sadd(threads_index_key, thread_id) conn.expire(threads_index_key, GLOBAL_TTL) conn.hset(thread_key(thread_id), "semaphore_size", fiber_concurrency, "semaphore_acquired", 0, "completed_total", 0, "last_completed_at", "" ) conn.expire(thread_key(thread_id), THREAD_TTL) end end |
#update_thread_stats(thread_id:, semaphore_size:, semaphore_acquired:) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/sidekiq/fiber/stats.rb', line 43 def update_thread_stats(thread_id:, semaphore_size:, semaphore_acquired:) safe_redis do |conn| conn.hset(thread_key(thread_id), "semaphore_size", semaphore_size, "semaphore_acquired", semaphore_acquired ) conn.expire(thread_key(thread_id), THREAD_TTL) end end |