Class: Wurk::ApiController

Inherits:
ApplicationController show all
Includes:
ActionController::Live
Defined in:
app/controllers/wurk/api_controller.rb

Overview

JSON APIs consumed by the React SPA. Action methods stay thin; mapping to the wire shape lives in ‘Wurk::Api::Serializers`, and pagination lives in `Wurk::Api::Pagination`. SSE lives in #stream.

Wire-compat: every payload field reads from the canonical Wurk inspector objects (Stats, Queue, RetrySet, ScheduledSet, DeadSet, ProcessSet, BatchSet, Cron::LoopSet) so dashboards stay aligned with the Redis schema in ‘docs/target/sidekiq-free,pro,ent.md`.

Constant Summary collapse

STREAM_TICK_SECONDS =
2.0
STREAM_MAX_DURATION =
600.0
HISTORY_WINDOW_UNITS =
{ 's' => 1, 'm' => 60, 'h' => 3600, 'd' => 86_400 }.freeze
DEFAULT_HISTORY_WINDOW =
24 * 3600
RETRY_ACTIONS =

Per-set action whitelists. Maps the SPA’s action name to the SortedEntry/JobSet method. Anything not listed 400s — keeps the bulk/ single dispatchers from reaching arbitrary methods off a request param.

{ 'retry' => :retry, 'delete' => :delete, 'kill' => :kill }.freeze
SCHEDULED_ACTIONS =
{ 'delete' => :delete, 'add_to_queue' => :add_to_queue }.freeze
DEAD_ACTIONS =
{ 'retry' => :retry, 'delete' => :delete }.freeze

Instance Method Summary collapse

Instance Method Details

#batchObject



169
170
171
172
173
174
175
176
# File 'app/controllers/wurk/api_controller.rb', line 169

def batch
  status = ::Wurk::Batch::Status.new(params[:bid].to_s)
  return render(json: { error: 'unknown batch' }, status: :not_found) unless status.exists?

  render json: status.data.transform_keys(&:to_sym)
rescue ::ArgumentError
  render json: { error: 'unknown batch' }, status: :not_found
end

#batchesObject



162
163
164
165
166
167
# File 'app/controllers/wurk/api_controller.rb', line 162

def batches
  set = ::Wurk::BatchSet.new
  page = ::Wurk::Api::Pagination.window(params)
  rows = ::Wurk::Api::Pagination.slice(set, page) { |status| status.data.transform_keys(&:to_sym) }
  render json: { total: set.size, page: page[:page], count: page[:count], batches: rows }
end

#clear_queueObject

Empties one queue (UNLINK list + drop from the ‘queues` set).



72
73
74
75
# File 'app/controllers/wurk/api_controller.rb', line 72

def clear_queue
  ::Wurk::Queue.new(params[:name].to_s).clear
  render json: { ok: true }
end

#cronObject



189
190
191
192
# File 'app/controllers/wurk/api_controller.rb', line 189

def cron
  now = ::Time.now.to_i
  render json: ::Wurk::Cron::LoopSet.new.map { |lp| ::Wurk::Api::Serializers.cron_row(lp, now) }
end

#cron_historyObject



204
205
206
# File 'app/controllers/wurk/api_controller.rb', line 204

def cron_history
  render json: { lid: params[:lid].to_s, history: ::Wurk::Web::Enterprise::Periodic.history(params[:lid].to_s) }
end

#deadObject



100
# File 'app/controllers/wurk/api_controller.rb', line 100

def dead      = render_sorted_set(::Wurk::DeadSet.new)

#dead_allObject



135
136
137
138
139
140
141
142
143
# File 'app/controllers/wurk/api_controller.rb', line 135

def dead_all
  set = ::Wurk::DeadSet.new
  count = case params[:cmd].to_s
          when 'retry'  then set.retry_all
          when 'delete' then clear_set(set)
          else return render(json: { error: 'unknown action' }, status: :bad_request)
          end
  render json: { ok: true, count: count }
end

#dead_bulkObject



133
# File 'app/controllers/wurk/api_controller.rb', line 133

def dead_bulk = bulk_entry_action(::Wurk::DeadSet.new, DEAD_ACTIONS)

#dead_jobObject

— Dead set mutations ————————————————–



132
# File 'app/controllers/wurk/api_controller.rb', line 132

def dead_job  = single_entry_action(::Wurk::DeadSet.new, DEAD_ACTIONS, params[:cmd])

#delete_queue_jobObject

Removes a single job from a queue by jid. LREM matches exact bytes, so we locate the record (Queue#find_job) and let it delete its own value.



79
80
81
82
83
84
# File 'app/controllers/wurk/api_controller.rb', line 79

def delete_queue_job
  record = ::Wurk::Queue.new(params[:name].to_s).find_job(params[:jid].to_s)
  return render(json: { error: 'unknown job' }, status: :not_found) unless record

  render json: { ok: true, deleted: record.delete }
end

#enqueue_cronObject



197
198
199
200
201
202
# File 'app/controllers/wurk/api_controller.rb', line 197

def enqueue_cron
  jid = ::Wurk::Web::Enterprise::Periodic.enqueue_now(params[:lid].to_s)
  return render(json: { error: 'unknown loop' }, status: :not_found) if jid.nil?

  render json: { ok: true, jid: jid }
end

#historyObject

Cluster-total throughput/failures time-series for the dashboard charts. ‘:bucket` is 1m/5m/1h; `?window=24h` (s/m/h/d suffix) is clamped to the bucket’s retention. Recharts-ready array under ‘series`.



229
230
231
232
233
234
235
# File 'app/controllers/wurk/api_controller.rb', line 229

def history
  window = parse_window(params[:window])
  series = ::Wurk::Web::Enterprise::Historical.history(params[:bucket].to_s, window: window)
  render json: { bucket: params[:bucket].to_s, window: window, series: series.map { |row| ::Wurk::Api::Serializers.history_point(row) } }
rescue ::ArgumentError => e
  render json: { error: e.message }, status: :bad_request
end

#history_snapshotsObject

Ent §5.3 Historical snapshots from the capped ‘history:metrics` stream. `?limit=N` (default 1000) most-recent points, oldest→newest, each `processed:, failures:, …`. Fields are read generically so a migrated Sidekiq Ent stream renders as-is.



241
242
243
244
245
246
# File 'app/controllers/wurk/api_controller.rb', line 241

def history_snapshots
  limit = ::Wurk::Api::Pagination.clamp_int(
    params[:limit], 1, ::Wurk::History::STREAM_CAP, ::Wurk::History::STREAM_DEFAULT_LIMIT
  )
  render json: { snapshots: ::Wurk::Web::Enterprise::Historical.snapshots(limit: limit) }
end

#limitersObject



178
179
180
181
182
# File 'app/controllers/wurk/api_controller.rb', line 178

def limiters
  names = ::Wurk::Web::Enterprise::Limits.list(filter: params[:substr])
  page = ::Wurk::Api::Pagination.window(params)
  render json: { total: names.size, page: page[:page], count: page[:count], limiters: limiter_rows(names, page) }
end

#metaObject

Boot-time flags the SPA reads once to shape the UI (e.g. hide destructive actions and show the read-only banner). Always a GET, so it stays reachable while read-only mode blocks mutations.



44
45
46
47
48
49
50
51
# File 'app/controllers/wurk/api_controller.rb', line 44

def meta
  config = ::Wurk::Web.config
  render json: {
    read_only: config.read_only?,
    read_only_message: config.read_only_message,
    custom_tabs: config.custom_tabs
  }
end

#metricsObject



208
209
210
211
212
213
214
# File 'app/controllers/wurk/api_controller.rb', line 208

def metrics
  minutes = ::Wurk::Api::Pagination.clamp_int(params[:minutes], 1, ::Wurk::Metrics::Query::MAX_MINUTES, 60)
  rows = ::Wurk::Web::Enterprise::Historical.top(minutes: minutes, class_filter: params[:substr])
  render json: { minutes: minutes, top_jobs: rows.map { |(klass, totals)| ::Wurk::Api::Serializers.metric_row(klass, totals) } }
rescue ::Wurk::Metrics::Query::WindowTooWide => e
  render json: { error: e.message }, status: :bad_request
end

#metrics_for_jobObject



216
217
218
219
220
221
222
223
224
# File 'app/controllers/wurk/api_controller.rb', line 216

def metrics_for_job
  klass = params[:klass].to_s
  minutes, hours = metrics_window(params)
  rows = ::Wurk::Web::Enterprise::Historical.for_job(klass, minutes: minutes, hours: hours)
  series = rows.map { |row| row.merge(at: row[:at].to_f) }
  render json: { klass: klass, minutes: minutes, hours: hours, series: series }
rescue ::ArgumentError, ::Wurk::Metrics::Query::WindowTooWide => e
  render json: { error: e.message }, status: :bad_request
end

#pause_cronObject



194
# File 'app/controllers/wurk/api_controller.rb', line 194

def pause_cron   = render_cron_action(::Wurk::Web::Enterprise::Periodic.pause(params[:lid].to_s))

#pause_queueObject

Pause/unpause a queue (Pro §6, §10.1). Idempotent; returns the resulting state so the SPA can update its toggle without a refetch round-trip.



88
89
90
91
# File 'app/controllers/wurk/api_controller.rb', line 88

def pause_queue
  ::Wurk::Queue.new(params[:name].to_s).pause!
  render json: { ok: true, paused: true }
end

#processesObject



145
146
147
# File 'app/controllers/wurk/api_controller.rb', line 145

def processes
  render json: ::Wurk::ProcessSet.new.map { |p| ::Wurk::Api::Serializers.process_row(p) }
end

#profilesObject

Profiles list (v8.0+). The SPA links each row to /profiles/:key (view) and /profiles/:key/data (raw blob). Newest first.



275
276
277
278
# File 'app/controllers/wurk/api_controller.rb', line 275

def profiles
  records = ::Wurk::ProfileSet.new.map { |rec| ::Wurk::Api::Serializers.profile_record(rec) }
  render json: records.sort_by { |r| -(r[:started_at] || 0) }
end

#queueObject



61
62
63
64
65
66
67
68
69
# File 'app/controllers/wurk/api_controller.rb', line 61

def queue
  q = ::Wurk::Queue.new(params[:name].to_s)
  page = ::Wurk::Api::Pagination.window(params)
  jobs = ::Wurk::Api::Pagination.slice(q, page) { |rec| ::Wurk::Api::Serializers.job_record(rec) }
  render json: {
    name: q.name, size: q.size, latency: q.latency, paused: q.paused?,
    page: page[:page], count: page[:count], jobs: jobs
  }
end

#queue_historyObject

Per-queue size/latency gauge time-series for the Metrics/Historical tab. ‘:bucket` is 1m/5m/1h; `?window=24h` (s/m/h/d) is clamped to the bucket’s retention; optional ‘?queue=<name>` narrows to one queue. Each queue’s ‘points` are Recharts-ready.



252
253
254
255
256
257
258
259
260
261
262
263
# File 'app/controllers/wurk/api_controller.rb', line 252

def queue_history
  window = parse_window(params[:window])
  queues = params[:queue].present? ? [params[:queue].to_s] : nil
  series = ::Wurk::Web::Enterprise::Historical.queue_history(params[:bucket].to_s, window: window, queues: queues)
  render json: {
    bucket: params[:bucket].to_s,
    window: window,
    queues: series.map { |row| ::Wurk::Api::Serializers.queue_history_series(row) }
  }
rescue ::ArgumentError => e
  render json: { error: e.message }, status: :bad_request
end

#queuesObject



57
58
59
# File 'app/controllers/wurk/api_controller.rb', line 57

def queues
  render json: ::Wurk::Stats.new.queue_summaries.map { |q| ::Wurk::Api::Serializers.queue_summary(q) }
end

#quiet_processObject

Busy-page controls: SIGTSTP (quiet — drop fetch, drain in-flight) and SIGTERM (stop — graceful shutdown). Both are async; the target notices on its next heartbeat (≤10s). ‘identity` absent or “all” signals every live process.



159
# File 'app/controllers/wurk/api_controller.rb', line 159

def quiet_process = signal_processes(:quiet!)

#reset_limiterObject



184
185
186
187
# File 'app/controllers/wurk/api_controller.rb', line 184

def reset_limiter
  ::Wurk::Web::Enterprise::Limits.reset(params[:name].to_s)
  render json: { ok: true }
end

#retriesObject



98
# File 'app/controllers/wurk/api_controller.rb', line 98

def retries   = render_sorted_set(::Wurk::RetrySet.new)

#retries_allObject



106
107
108
109
110
111
112
113
114
115
# File 'app/controllers/wurk/api_controller.rb', line 106

def retries_all
  set = ::Wurk::RetrySet.new
  count = case params[:cmd].to_s
          when 'retry'  then set.retry_all
          when 'kill'   then set.kill_all
          when 'delete' then clear_set(set)
          else return render(json: { error: 'unknown action' }, status: :bad_request)
          end
  render json: { ok: true, count: count }
end

#retries_bulkObject



104
# File 'app/controllers/wurk/api_controller.rb', line 104

def retries_bulk = bulk_entry_action(::Wurk::RetrySet.new, RETRY_ACTIONS)

#retry_jobObject

— Retry set mutations ————————————————-



103
# File 'app/controllers/wurk/api_controller.rb', line 103

def retry_job    = single_entry_action(::Wurk::RetrySet.new, RETRY_ACTIONS, params[:cmd])

#scheduledObject



99
# File 'app/controllers/wurk/api_controller.rb', line 99

def scheduled = render_sorted_set(::Wurk::ScheduledSet.new)

#scheduled_allObject



121
122
123
124
125
126
127
128
129
# File 'app/controllers/wurk/api_controller.rb', line 121

def scheduled_all
  set = ::Wurk::ScheduledSet.new
  count = case params[:cmd].to_s
          when 'delete'       then clear_set(set)
          when 'add_to_queue' then drain_set(set, :add_to_queue)
          else return render(json: { error: 'unknown action' }, status: :bad_request)
          end
  render json: { ok: true, count: count }
end

#scheduled_bulkObject



119
# File 'app/controllers/wurk/api_controller.rb', line 119

def scheduled_bulk = bulk_entry_action(::Wurk::ScheduledSet.new, SCHEDULED_ACTIONS)

#scheduled_jobObject

— Scheduled set mutations ———————————————



118
# File 'app/controllers/wurk/api_controller.rb', line 118

def scheduled_job  = single_entry_action(::Wurk::ScheduledSet.new, SCHEDULED_ACTIONS, params[:cmd])

#searchObject



265
266
267
268
269
270
271
# File 'app/controllers/wurk/api_controller.rb', line 265

def search
  substr = params[:substr].to_s
  return render(json: { substr: substr, total: 0, hits: [] }) if substr.empty?

  hits = ::Wurk::Web::Search.new(substr, kinds: parse_search_kinds(params), limit: parse_search_limit(params)).to_a
  render json: { substr: substr, total: hits.size, hits: hits }
end

#statsObject



53
54
55
# File 'app/controllers/wurk/api_controller.rb', line 53

def stats
  render json: ::Wurk::Api::Serializers.stats_payload(::Wurk::Stats.new)
end

#stop_processObject



160
# File 'app/controllers/wurk/api_controller.rb', line 160

def stop_process  = signal_processes(:stop!)

#streamObject

SSE: one ‘event: stats` per tick with a fresh Stats snapshot. Caps at `STREAM_MAX_DURATION` so a stale browser tab can’t tie a Rails worker forever — the client reconnects automatically when the stream closes.

‘?max_duration=` and `?tick=` are test/debug knobs; the SPA never sets them. `?max_duration=0` emits one tick and closes.



286
287
288
289
290
291
292
293
# File 'app/controllers/wurk/api_controller.rb', line 286

def stream
  stream_headers!
  clamp = ::Wurk::Api::Pagination.method(:clamp_float)
  tick = clamp.call(params[:tick], 0.0, STREAM_TICK_SECONDS, STREAM_TICK_SECONDS)
  max_dur = clamp.call(params[:max_duration], 0.0, STREAM_MAX_DURATION, STREAM_MAX_DURATION)
  sse = ::ActionController::Live::SSE.new(response.stream, retry: (STREAM_TICK_SECONDS * 1000).to_i)
  drive_stream(sse, tick, max_dur)
end

#unpause_cronObject



195
# File 'app/controllers/wurk/api_controller.rb', line 195

def unpause_cron = render_cron_action(::Wurk::Web::Enterprise::Periodic.unpause(params[:lid].to_s))

#unpause_queueObject



93
94
95
96
# File 'app/controllers/wurk/api_controller.rb', line 93

def unpause_queue
  ::Wurk::Queue.new(params[:name].to_s).unpause!
  render json: { ok: true, paused: false }
end

#workersObject

Currently-executing jobs across the cluster (WorkSet), oldest first. The Busy page’s process-detail modal filters client-side by process_id.



151
152
153
# File 'app/controllers/wurk/api_controller.rb', line 151

def workers
  render json: ::Wurk::WorkSet.new.map { |pid, tid, work| ::Wurk::Api::Serializers.work_row(pid, tid, work) }
end