Module: TalkToYourApp::Plugins::Jobs::Adapters::Sidekiq

Defined in:
lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb

Overview

Sidekiq adapter. Reads queue, retry, and dead-set data through Sidekiq’s public API. Read-only: it never enqueues, retries, or kills. All four methods return the common shape defined by Jobs::Interface.

Constant Summary collapse

REQUIRED_GEM =
{ const: "Sidekiq", gem_name: "sidekiq" }.freeze

Class Method Summary collapse

Class Method Details

.failed_jobs(limit:) ⇒ Object



42
43
44
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 42

def failed_jobs(limit:)
  ::Sidekiq::DeadSet.new.first(limit).map { |entry| job_hash(entry) }
end

.healthObject

Worker/queue health. Lists the running Sidekiq processes with their concurrency and busy-thread counts, plus set sizes.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 48

def health
  require "sidekiq/api"
  stats = ::Sidekiq::Stats.new
  processes = ::Sidekiq::ProcessSet.new.map do |process|
    {
      hostname: process["hostname"],
      pid: process["pid"],
      concurrency: process["concurrency"],
      busy: process["busy"],
      queues: process["queues"],
      started_at: process["started_at"] ? Time.at(process["started_at"].to_f).utc.iso8601 : nil,
      quiet: process["quiet"] == "true",
    }
  end
  {
    adapter: "sidekiq",
    processes: processes,
    process_count: processes.size,
    total_concurrency: processes.sum { |p| p[:concurrency].to_i },
    busy_threads: stats.workers_size,
    enqueued: stats.enqueued,
    scheduled: stats.scheduled_size,
    retries: stats.retry_size,
    dead: stats.dead_size,
    default_queue_latency: ::Sidekiq::Queue.new.latency,
  }
end

.iso8601(unix_timestamp) ⇒ Object

Sidekiq stores timestamps as Unix floats; surface ISO-8601 so the field type matches the Solid Queue adapter.



106
107
108
109
110
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 106

def iso8601(unix_timestamp)
  return nil unless unix_timestamp

  Time.at(unix_timestamp.to_f).utc.iso8601
end

.job_hash(entry) ⇒ Object

Both Sidekiq::JobRecord and Sidekiq::SortedEntry expose the raw job hash via #item. Keys are always present (nil where absent) so the shape is stable across adapters and across recent/failed.



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 92

def job_hash(entry)
  item = entry.respond_to?(:item) ? entry.item : entry
  {
    jid: item["jid"],
    class: item["class"] || item["wrapped"],
    queue: item["queue"],
    args: item["args"],
    enqueued_at: iso8601(item["enqueued_at"] || item["created_at"]),
    error_message: item["error_message"],
  }
end

.queue_sizesObject



21
22
23
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 21

def queue_sizes
  ::Sidekiq::Stats.new.queues
end

.rate_metrics(window:) ⇒ Object

Sidekiq exposes cumulative and day-resolution counts, not arbitrary trailing windows; the response says so via ‘note`.



78
79
80
81
82
83
84
85
86
87
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 78

def rate_metrics(window:)
  stats = ::Sidekiq::Stats.new
  {
    window_seconds: window,
    processed: stats.processed,
    failed: stats.failed,
    enqueued: stats.enqueued,
    note: "Sidekiq reports cumulative processed/failed counts; the window is not applied at sub-day resolution.",
  }
end

.raw_enqueued_at(entry) ⇒ Object



112
113
114
115
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 112

def raw_enqueued_at(entry)
  item = entry.respond_to?(:item) ? entry.item : entry
  (item["enqueued_at"] || item["created_at"] || 0).to_f
end

.recent_jobs(limit:) ⇒ Object

Sidekiq has no first-class “recent jobs” API; we scan the queues and the retry set, sort newest-first by enqueued time, and cap at limit. Per-queue fetch is bounded so a many-queue install does not materialize limit*queue_count records to return ‘limit`; this can miss the very newest jobs when they cluster in one queue, which is acceptable for an inspection tool.



31
32
33
34
35
36
37
38
39
40
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 31

def recent_jobs(limit:)
  queues = ::Sidekiq::Queue.all
  per_queue = [(limit / [queues.size, 1].max) + 1, limit].min
  queued = queues.flat_map { |queue| queue.first(per_queue) }
  retries = ::Sidekiq::RetrySet.new.first(limit)
  (queued + retries)
    .sort_by { |entry| -raw_enqueued_at(entry) }
    .first(limit)
    .map { |entry| job_hash(entry) }
end

.required_gemObject



17
18
19
# File 'lib/talk_to_your_app/plugins/jobs/adapters/sidekiq.rb', line 17

def required_gem
  REQUIRED_GEM
end