Class: DispatchPolicy::PartitionsController
- Inherits:
-
ApplicationController
- Object
- ActionController::Base
- ApplicationController
- DispatchPolicy::PartitionsController
- Defined in:
- app/controllers/dispatch_policy/partitions_controller.rb
Constant Summary collapse
- DRAIN_MAX_PER_REQUEST =
10_000- DRAIN_BATCH_SIZE =
200- PAGE_SIZE =
100
Class Method Summary collapse
-
.drain_partition!(partition, cap: DRAIN_MAX_PER_REQUEST) ⇒ Object
Force-admits up to DRAIN_MAX_PER_REQUEST due jobs in DRAIN_BATCH_SIZE batches.
Instance Method Summary collapse
- #admit ⇒ Object
-
#drain ⇒ Object
Empties the partition by force-admitting every staged job through the forwarder, bypassing all gates.
- #index ⇒ Object
-
#pagination_params(overrides = {}) ⇒ Object
Build URL params preserving filters, replacing the cursor.
- #show ⇒ Object
Class Method Details
.drain_partition!(partition, cap: DRAIN_MAX_PER_REQUEST) ⇒ Object
Force-admits up to DRAIN_MAX_PER_REQUEST due jobs in DRAIN_BATCH_SIZE batches. Optional ‘cap` lets the policy-wide drain bound the TOTAL across partitions. Returns [drained, due_remaining, scheduled_remaining] — due_remaining is claimable-now work the cap left behind; scheduled_remaining is future-scheduled rows the claim can’t touch yet.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'app/controllers/dispatch_policy/partitions_controller.rb', line 125 def self.drain_partition!(partition, cap: DRAIN_MAX_PER_REQUEST) cap = [cap, DRAIN_MAX_PER_REQUEST].min drained = 0 while drained < cap batch_limit = [DRAIN_BATCH_SIZE, cap - drained].min forwarded = ManualAdmission.force!( policy_name: partition.policy_name, partition_key: partition.partition_key, limit: batch_limit ) break if forwarded.zero? drained += forwarded end scope = StagedJob.for_partition(partition.policy_name, partition.partition_key) due_remaining = scope.due.count scheduled_remaining = scope.count - due_remaining [drained, due_remaining, scheduled_remaining] end |
Instance Method Details
#admit ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'app/controllers/dispatch_policy/partitions_controller.rb', line 83 def admit # Bound the count: an unbounded value would force a single # DELETE…RETURNING + dispatch of the whole backlog in one transaction, # bypassing the batching/cap that #drain uses precisely to avoid # request timeouts and giant transactions. A non-numeric value falls # back to 1 instead of raising (ArgumentError → 500). count = (Integer(params[:count], exception: false) || 1).clamp(1, DRAIN_MAX_PER_REQUEST) forwarded = ManualAdmission.force!( policy_name: @partition.policy_name, partition_key: @partition.partition_key, limit: count ) redirect_to partition_path(@partition), notice: "Forwarded #{forwarded} job(s)." end |
#drain ⇒ Object
Empties the partition by force-admitting every staged job through the forwarder, bypassing all gates. Bounded at DRAIN_MAX_PER_REQUEST so a huge backlog can’t time the controller out — the operator clicks again for the next batch.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'app/controllers/dispatch_policy/partitions_controller.rb', line 102 def drain drained, due_remaining, scheduled_remaining = self.class.drain_partition!(@partition) notice = if due_remaining.positive? "Drained #{drained} job(s); #{due_remaining} still pending — click drain again to continue." elsif scheduled_remaining.positive? # The claim only picks up rows whose scheduled_at has arrived, so # future-scheduled jobs can't be drained now. Saying "click again" # would just loop forwarding zero. "Drained #{drained} job(s); #{scheduled_remaining} scheduled for later remain." else "Drained #{drained} job(s); partition empty." end redirect_to partition_path(@partition), notice: notice end |
#index ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'app/controllers/dispatch_policy/partitions_controller.rb', line 12 def index base = Partition.all base = base.for_policy(params[:policy]) if params[:policy].present? base = base.for_shard(params[:shard]) if params[:shard].present? if params[:q].present? # Escape %/_ so a literal key containing them (e.g. "discount_50%") # matches literally instead of as ILIKE wildcards. base = base.where("partition_key ILIKE ?", "%#{Partition.sanitize_sql_like(params[:q])}%") end base = base.where("pending_count > 0") if params[:only_pending] == "1" @sort = DispatchPolicy::CursorPagination::SORTS.key?(params[:sort]) ? params[:sort] : DispatchPolicy::CursorPagination::DEFAULT_SORT sort_def = DispatchPolicy::CursorPagination.sort_for(@sort) @total = base.count # cheap on indexed columns; nice to display @cursor = DispatchPolicy::CursorPagination.decode(params[:cursor]) paginated = DispatchPolicy::CursorPagination.apply(base, @sort, @cursor) .order(Arel.sql(sort_def[:sql_order])) .limit(PAGE_SIZE + 1) .to_a @has_more = paginated.size > PAGE_SIZE @partitions = paginated.first(PAGE_SIZE) @next_cursor = if @has_more && @partitions.any? v, id = DispatchPolicy::CursorPagination.extract(@partitions.last, @sort) DispatchPolicy::CursorPagination.encode(v, id) end @policy = params[:policy] @shard = params[:shard] @query = params[:q] @only_pending = params[:only_pending] == "1" # Policy-level pause flags so rows show their EFFECTIVE state: a # partition created after a pause has status 'active' but is not # being admitted (claim_partitions skips the whole policy). @paused_policies = PolicySetting.paused.pluck(:policy_name).to_set shards_scope = Partition.all shards_scope = shards_scope.for_policy(params[:policy]) if params[:policy].present? @shards = shards_scope.distinct.pluck(:shard).sort end |
#pagination_params(overrides = {}) ⇒ Object
Build URL params preserving filters, replacing the cursor.
58 59 60 61 62 63 64 65 66 67 |
# File 'app/controllers/dispatch_policy/partitions_controller.rb', line 58 def pagination_params(overrides = {}) { policy: @policy.presence, shard: @shard.presence, q: @query.presence, sort: (@sort if @sort != DispatchPolicy::CursorPagination::DEFAULT_SORT), only_pending: ("1" if @only_pending), cursor: nil }.compact.merge(overrides) end |
#show ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'app/controllers/dispatch_policy/partitions_controller.rb', line 70 def show # Order matches the tick's claim order (claim_staged_jobs!) so the list # reflects what would actually be admitted first, not the reverse. @recent_jobs = StagedJob .for_partition(@partition.policy_name, @partition.partition_key) .order(Arel.sql("priority DESC, scheduled_at ASC NULLS FIRST, id ASC")) .limit(50) # The whole policy may be paused even if this partition's own status # is 'active' (it was created after the pause). claim_partitions skips # the policy regardless, so surface the effective state. @policy_paused = PolicySetting.for_policy(@partition.policy_name).pick(:paused) || false end |