Class: Wurk::ApiController
- Inherits:
-
ApplicationController
- Object
- ActionController::Base
- ApplicationController
- Wurk::ApiController
- Includes:
- ActionController::Live
- Defined in:
- app/controllers/wurk/api_controller.rb
Overview
JSON APIs consumed by the React SPA. Action methods stay thin; mapping to the wire shape lives in ‘Wurk::Api::Serializers`, and pagination lives in `Wurk::Api::Pagination`. SSE lives in #stream.
Wire-compat: every payload field reads from the canonical Wurk inspector objects (Stats, Queue, RetrySet, ScheduledSet, DeadSet, ProcessSet, BatchSet, Cron::LoopSet) so dashboards stay aligned with the Redis schema in ‘docs/target/sidekiq-free,pro,ent.md`.
Constant Summary collapse
- STREAM_TICK_SECONDS =
2.0- STREAM_MAX_DURATION =
600.0- HISTORY_WINDOW_UNITS =
{ 's' => 1, 'm' => 60, 'h' => 3600, 'd' => 86_400 }.freeze
- DEFAULT_HISTORY_WINDOW =
24 * 3600
- RETRY_ACTIONS =
Per-set action whitelists. Maps the SPA’s action name to the SortedEntry/JobSet method. Anything not listed 400s — keeps the bulk/ single dispatchers from reaching arbitrary methods off a request param.
{ 'retry' => :retry, 'delete' => :delete, 'kill' => :kill }.freeze
- SCHEDULED_ACTIONS =
{ 'delete' => :delete, 'add_to_queue' => :add_to_queue }.freeze
- DEAD_ACTIONS =
{ 'retry' => :retry, 'delete' => :delete }.freeze
Instance Method Summary collapse
- #batch ⇒ Object
- #batches ⇒ Object
-
#clear_queue ⇒ Object
Empties one queue (UNLINK list + drop from the ‘queues` set).
- #cron ⇒ Object
- #cron_history ⇒ Object
- #dead ⇒ Object
- #dead_all ⇒ Object
- #dead_bulk ⇒ Object
-
#dead_job ⇒ Object
— Dead set mutations ————————————————–.
-
#delete_queue_job ⇒ Object
Removes a single job from a queue by jid.
- #enqueue_cron ⇒ Object
-
#history ⇒ Object
Cluster-total throughput/failures time-series for the dashboard charts.
-
#history_snapshots ⇒ Object
Ent §5.3 Historical snapshots from the capped ‘history:metrics` stream.
- #limiters ⇒ Object
-
#meta ⇒ Object
Boot-time flags the SPA reads once to shape the UI (e.g. hide destructive actions and show the read-only banner).
- #metrics ⇒ Object
- #metrics_for_job ⇒ Object
- #pause_cron ⇒ Object
-
#pause_queue ⇒ Object
Pause/unpause a queue (Pro §6, §10.1).
- #processes ⇒ Object
-
#profiles ⇒ Object
Profiles list (v8.0+).
- #queue ⇒ Object
-
#queue_history ⇒ Object
Per-queue size/latency gauge time-series for the Metrics/Historical tab.
- #queues ⇒ Object
-
#quiet_process ⇒ Object
Busy-page controls: SIGTSTP (quiet — drop fetch, drain in-flight) and SIGTERM (stop — graceful shutdown).
- #reset_limiter ⇒ Object
- #retries ⇒ Object
- #retries_all ⇒ Object
- #retries_bulk ⇒ Object
-
#retry_job ⇒ Object
— Retry set mutations ————————————————-.
- #scheduled ⇒ Object
- #scheduled_all ⇒ Object
- #scheduled_bulk ⇒ Object
-
#scheduled_job ⇒ Object
— Scheduled set mutations ———————————————.
- #search ⇒ Object
- #stats ⇒ Object
- #stop_process ⇒ Object
-
#stream ⇒ Object
SSE: one ‘event: stats` per tick with a fresh Stats snapshot.
- #unpause_cron ⇒ Object
- #unpause_queue ⇒ Object
-
#workers ⇒ Object
Currently-executing jobs across the cluster (WorkSet), oldest first.
Instance Method Details
#batch ⇒ Object
169 170 171 172 173 174 175 176 |
# File 'app/controllers/wurk/api_controller.rb', line 169 def batch status = ::Wurk::Batch::Status.new(params[:bid].to_s) return render(json: { error: 'unknown batch' }, status: :not_found) unless status.exists? render json: status.data.transform_keys(&:to_sym) rescue ::ArgumentError render json: { error: 'unknown batch' }, status: :not_found end |
#batches ⇒ Object
162 163 164 165 166 167 |
# File 'app/controllers/wurk/api_controller.rb', line 162 def batches set = ::Wurk::BatchSet.new page = ::Wurk::Api::Pagination.window(params) rows = ::Wurk::Api::Pagination.slice(set, page) { |status| status.data.transform_keys(&:to_sym) } render json: { total: set.size, page: page[:page], count: page[:count], batches: rows } end |
#clear_queue ⇒ Object
Empties one queue (UNLINK list + drop from the ‘queues` set).
72 73 74 75 |
# File 'app/controllers/wurk/api_controller.rb', line 72 def clear_queue ::Wurk::Queue.new(params[:name].to_s).clear render json: { ok: true } end |
#cron ⇒ Object
189 190 191 192 |
# File 'app/controllers/wurk/api_controller.rb', line 189 def cron now = ::Time.now.to_i render json: ::Wurk::Cron::LoopSet.new.map { |lp| ::Wurk::Api::Serializers.cron_row(lp, now) } end |
#cron_history ⇒ Object
204 205 206 |
# File 'app/controllers/wurk/api_controller.rb', line 204 def cron_history render json: { lid: params[:lid].to_s, history: ::Wurk::Web::Enterprise::Periodic.history(params[:lid].to_s) } end |
#dead ⇒ Object
100 |
# File 'app/controllers/wurk/api_controller.rb', line 100 def dead = render_sorted_set(::Wurk::DeadSet.new) |
#dead_all ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'app/controllers/wurk/api_controller.rb', line 135 def dead_all set = ::Wurk::DeadSet.new count = case params[:cmd].to_s when 'retry' then set.retry_all when 'delete' then clear_set(set) else return render(json: { error: 'unknown action' }, status: :bad_request) end render json: { ok: true, count: count } end |
#dead_bulk ⇒ Object
133 |
# File 'app/controllers/wurk/api_controller.rb', line 133 def dead_bulk = bulk_entry_action(::Wurk::DeadSet.new, DEAD_ACTIONS) |
#dead_job ⇒ Object
— Dead set mutations ————————————————–
132 |
# File 'app/controllers/wurk/api_controller.rb', line 132 def dead_job = single_entry_action(::Wurk::DeadSet.new, DEAD_ACTIONS, params[:cmd]) |
#delete_queue_job ⇒ Object
Removes a single job from a queue by jid. LREM matches exact bytes, so we locate the record (Queue#find_job) and let it delete its own value.
79 80 81 82 83 84 |
# File 'app/controllers/wurk/api_controller.rb', line 79 def delete_queue_job record = ::Wurk::Queue.new(params[:name].to_s).find_job(params[:jid].to_s) return render(json: { error: 'unknown job' }, status: :not_found) unless record render json: { ok: true, deleted: record.delete } end |
#enqueue_cron ⇒ Object
197 198 199 200 201 202 |
# File 'app/controllers/wurk/api_controller.rb', line 197 def enqueue_cron jid = ::Wurk::Web::Enterprise::Periodic.enqueue_now(params[:lid].to_s) return render(json: { error: 'unknown loop' }, status: :not_found) if jid.nil? render json: { ok: true, jid: jid } end |
#history ⇒ Object
Cluster-total throughput/failures time-series for the dashboard charts. ‘:bucket` is 1m/5m/1h; `?window=24h` (s/m/h/d suffix) is clamped to the bucket’s retention. Recharts-ready array under ‘series`.
229 230 231 232 233 234 235 |
# File 'app/controllers/wurk/api_controller.rb', line 229 def history window = parse_window(params[:window]) series = ::Wurk::Web::Enterprise::Historical.history(params[:bucket].to_s, window: window) render json: { bucket: params[:bucket].to_s, window: window, series: series.map { |row| ::Wurk::Api::Serializers.history_point(row) } } rescue ::ArgumentError => e render json: { error: e. }, status: :bad_request end |
#history_snapshots ⇒ Object
Ent §5.3 Historical snapshots from the capped ‘history:metrics` stream. `?limit=N` (default 1000) most-recent points, oldest→newest, each `processed:, failures:, …`. Fields are read generically so a migrated Sidekiq Ent stream renders as-is.
241 242 243 244 245 246 |
# File 'app/controllers/wurk/api_controller.rb', line 241 def history_snapshots limit = ::Wurk::Api::Pagination.clamp_int( params[:limit], 1, ::Wurk::History::STREAM_CAP, ::Wurk::History::STREAM_DEFAULT_LIMIT ) render json: { snapshots: ::Wurk::Web::Enterprise::Historical.snapshots(limit: limit) } end |
#limiters ⇒ Object
178 179 180 181 182 |
# File 'app/controllers/wurk/api_controller.rb', line 178 def limiters names = ::Wurk::Web::Enterprise::Limits.list(filter: params[:substr]) page = ::Wurk::Api::Pagination.window(params) render json: { total: names.size, page: page[:page], count: page[:count], limiters: limiter_rows(names, page) } end |
#meta ⇒ Object
Boot-time flags the SPA reads once to shape the UI (e.g. hide destructive actions and show the read-only banner). Always a GET, so it stays reachable while read-only mode blocks mutations.
44 45 46 47 48 49 50 51 |
# File 'app/controllers/wurk/api_controller.rb', line 44 def config = ::Wurk::Web.config render json: { read_only: config.read_only?, read_only_message: config., custom_tabs: config.custom_tabs } end |
#metrics ⇒ Object
208 209 210 211 212 213 214 |
# File 'app/controllers/wurk/api_controller.rb', line 208 def metrics minutes = ::Wurk::Api::Pagination.clamp_int(params[:minutes], 1, ::Wurk::Metrics::Query::MAX_MINUTES, 60) rows = ::Wurk::Web::Enterprise::Historical.top(minutes: minutes, class_filter: params[:substr]) render json: { minutes: minutes, top_jobs: rows.map { |(klass, totals)| ::Wurk::Api::Serializers.metric_row(klass, totals) } } rescue ::Wurk::Metrics::Query::WindowTooWide => e render json: { error: e. }, status: :bad_request end |
#metrics_for_job ⇒ Object
216 217 218 219 220 221 222 223 224 |
# File 'app/controllers/wurk/api_controller.rb', line 216 def metrics_for_job klass = params[:klass].to_s minutes, hours = metrics_window(params) rows = ::Wurk::Web::Enterprise::Historical.for_job(klass, minutes: minutes, hours: hours) series = rows.map { |row| row.merge(at: row[:at].to_f) } render json: { klass: klass, minutes: minutes, hours: hours, series: series } rescue ::ArgumentError, ::Wurk::Metrics::Query::WindowTooWide => e render json: { error: e. }, status: :bad_request end |
#pause_cron ⇒ Object
194 |
# File 'app/controllers/wurk/api_controller.rb', line 194 def pause_cron = render_cron_action(::Wurk::Web::Enterprise::Periodic.pause(params[:lid].to_s)) |
#pause_queue ⇒ Object
Pause/unpause a queue (Pro §6, §10.1). Idempotent; returns the resulting state so the SPA can update its toggle without a refetch round-trip.
88 89 90 91 |
# File 'app/controllers/wurk/api_controller.rb', line 88 def pause_queue ::Wurk::Queue.new(params[:name].to_s).pause! render json: { ok: true, paused: true } end |
#processes ⇒ Object
145 146 147 |
# File 'app/controllers/wurk/api_controller.rb', line 145 def processes render json: ::Wurk::ProcessSet.new.map { |p| ::Wurk::Api::Serializers.process_row(p) } end |
#profiles ⇒ Object
Profiles list (v8.0+). The SPA links each row to /profiles/:key (view) and /profiles/:key/data (raw blob). Newest first.
275 276 277 278 |
# File 'app/controllers/wurk/api_controller.rb', line 275 def profiles records = ::Wurk::ProfileSet.new.map { |rec| ::Wurk::Api::Serializers.profile_record(rec) } render json: records.sort_by { |r| -(r[:started_at] || 0) } end |
#queue ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'app/controllers/wurk/api_controller.rb', line 61 def queue q = ::Wurk::Queue.new(params[:name].to_s) page = ::Wurk::Api::Pagination.window(params) jobs = ::Wurk::Api::Pagination.slice(q, page) { |rec| ::Wurk::Api::Serializers.job_record(rec) } render json: { name: q.name, size: q.size, latency: q.latency, paused: q.paused?, page: page[:page], count: page[:count], jobs: jobs } end |
#queue_history ⇒ Object
Per-queue size/latency gauge time-series for the Metrics/Historical tab. ‘:bucket` is 1m/5m/1h; `?window=24h` (s/m/h/d) is clamped to the bucket’s retention; optional ‘?queue=<name>` narrows to one queue. Each queue’s ‘points` are Recharts-ready.
252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'app/controllers/wurk/api_controller.rb', line 252 def queue_history window = parse_window(params[:window]) queues = params[:queue].present? ? [params[:queue].to_s] : nil series = ::Wurk::Web::Enterprise::Historical.queue_history(params[:bucket].to_s, window: window, queues: queues) render json: { bucket: params[:bucket].to_s, window: window, queues: series.map { |row| ::Wurk::Api::Serializers.queue_history_series(row) } } rescue ::ArgumentError => e render json: { error: e. }, status: :bad_request end |
#queues ⇒ Object
57 58 59 |
# File 'app/controllers/wurk/api_controller.rb', line 57 def queues render json: ::Wurk::Stats.new.queue_summaries.map { |q| ::Wurk::Api::Serializers.queue_summary(q) } end |
#quiet_process ⇒ Object
Busy-page controls: SIGTSTP (quiet — drop fetch, drain in-flight) and SIGTERM (stop — graceful shutdown). Both are async; the target notices on its next heartbeat (≤10s). ‘identity` absent or “all” signals every live process.
159 |
# File 'app/controllers/wurk/api_controller.rb', line 159 def quiet_process = signal_processes(:quiet!) |
#reset_limiter ⇒ Object
184 185 186 187 |
# File 'app/controllers/wurk/api_controller.rb', line 184 def reset_limiter ::Wurk::Web::Enterprise::Limits.reset(params[:name].to_s) render json: { ok: true } end |
#retries ⇒ Object
98 |
# File 'app/controllers/wurk/api_controller.rb', line 98 def retries = render_sorted_set(::Wurk::RetrySet.new) |
#retries_all ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'app/controllers/wurk/api_controller.rb', line 106 def retries_all set = ::Wurk::RetrySet.new count = case params[:cmd].to_s when 'retry' then set.retry_all when 'kill' then set.kill_all when 'delete' then clear_set(set) else return render(json: { error: 'unknown action' }, status: :bad_request) end render json: { ok: true, count: count } end |
#retries_bulk ⇒ Object
104 |
# File 'app/controllers/wurk/api_controller.rb', line 104 def retries_bulk = bulk_entry_action(::Wurk::RetrySet.new, RETRY_ACTIONS) |
#retry_job ⇒ Object
— Retry set mutations ————————————————-
103 |
# File 'app/controllers/wurk/api_controller.rb', line 103 def retry_job = single_entry_action(::Wurk::RetrySet.new, RETRY_ACTIONS, params[:cmd]) |
#scheduled ⇒ Object
99 |
# File 'app/controllers/wurk/api_controller.rb', line 99 def scheduled = render_sorted_set(::Wurk::ScheduledSet.new) |
#scheduled_all ⇒ Object
121 122 123 124 125 126 127 128 129 |
# File 'app/controllers/wurk/api_controller.rb', line 121 def scheduled_all set = ::Wurk::ScheduledSet.new count = case params[:cmd].to_s when 'delete' then clear_set(set) when 'add_to_queue' then drain_set(set, :add_to_queue) else return render(json: { error: 'unknown action' }, status: :bad_request) end render json: { ok: true, count: count } end |
#scheduled_bulk ⇒ Object
119 |
# File 'app/controllers/wurk/api_controller.rb', line 119 def scheduled_bulk = bulk_entry_action(::Wurk::ScheduledSet.new, SCHEDULED_ACTIONS) |
#scheduled_job ⇒ Object
— Scheduled set mutations ———————————————
118 |
# File 'app/controllers/wurk/api_controller.rb', line 118 def scheduled_job = single_entry_action(::Wurk::ScheduledSet.new, SCHEDULED_ACTIONS, params[:cmd]) |
#search ⇒ Object
265 266 267 268 269 270 271 |
# File 'app/controllers/wurk/api_controller.rb', line 265 def search substr = params[:substr].to_s return render(json: { substr: substr, total: 0, hits: [] }) if substr.empty? hits = ::Wurk::Web::Search.new(substr, kinds: parse_search_kinds(params), limit: parse_search_limit(params)).to_a render json: { substr: substr, total: hits.size, hits: hits } end |
#stats ⇒ Object
53 54 55 |
# File 'app/controllers/wurk/api_controller.rb', line 53 def stats render json: ::Wurk::Api::Serializers.stats_payload(::Wurk::Stats.new) end |
#stop_process ⇒ Object
160 |
# File 'app/controllers/wurk/api_controller.rb', line 160 def stop_process = signal_processes(:stop!) |
#stream ⇒ Object
SSE: one ‘event: stats` per tick with a fresh Stats snapshot. Caps at `STREAM_MAX_DURATION` so a stale browser tab can’t tie a Rails worker forever — the client reconnects automatically when the stream closes.
‘?max_duration=` and `?tick=` are test/debug knobs; the SPA never sets them. `?max_duration=0` emits one tick and closes.
286 287 288 289 290 291 292 293 |
# File 'app/controllers/wurk/api_controller.rb', line 286 def stream stream_headers! clamp = ::Wurk::Api::Pagination.method(:clamp_float) tick = clamp.call(params[:tick], 0.0, STREAM_TICK_SECONDS, STREAM_TICK_SECONDS) max_dur = clamp.call(params[:max_duration], 0.0, STREAM_MAX_DURATION, STREAM_MAX_DURATION) sse = ::ActionController::Live::SSE.new(response.stream, retry: (STREAM_TICK_SECONDS * 1000).to_i) drive_stream(sse, tick, max_dur) end |
#unpause_cron ⇒ Object
195 |
# File 'app/controllers/wurk/api_controller.rb', line 195 def unpause_cron = render_cron_action(::Wurk::Web::Enterprise::Periodic.unpause(params[:lid].to_s)) |
#unpause_queue ⇒ Object
93 94 95 96 |
# File 'app/controllers/wurk/api_controller.rb', line 93 def unpause_queue ::Wurk::Queue.new(params[:name].to_s).unpause! render json: { ok: true, paused: false } end |
#workers ⇒ Object
Currently-executing jobs across the cluster (WorkSet), oldest first. The Busy page’s process-detail modal filters client-side by process_id.
151 152 153 |
# File 'app/controllers/wurk/api_controller.rb', line 151 def workers render json: ::Wurk::WorkSet.new.map { |pid, tid, work| ::Wurk::Api::Serializers.work_row(pid, tid, work) } end |