Class: Wurk::Limiter::Concurrent
- Defined in:
- lib/wurk/limiter/concurrent.rb
Overview
Atomic slot acquisition in a ZSET. Score = expiry epoch; the acquire script first evicts expired slots (bumping the ‘reclaimed` metric) then ZADDs if there’s headroom.
On exhaustion: spin loop with backoff. The spec says “blocks via Redis stream XREAD” — that’s a perf optimization; the visible behavior is identical: blocks up to ‘wait_timeout` then OverLimit (or silent return for `policy: :ignore`).
Constant Summary collapse
- WAIT_SLEEP =
0.05- METRIC_FIELDS =
%w[held held_time immediate waited wait_time overages reclaimed].freeze
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #size ⇒ Object
-
#status ⇒ Object
Uniform ‘{ used:, limit:, reset_at:, available? }` (#16) merged with the concurrent-only metric counters (§1.5) the dashboard already renders.
- #type ⇒ Object
- #within_limit(&block) ⇒ Object
Methods inherited from Base
#delete, #fingerprint, #initialize, #reset
Constructor Details
This class inherits a constructor from Wurk::Limiter::Base
Instance Method Details
#size ⇒ Object
22 23 24 |
# File 'lib/wurk/limiter/concurrent.rb', line 22 def size Wurk::Limiter.redis { |c| c.call('ZCARD', state_key).to_i } end |
#status ⇒ Object
Uniform ‘{ used:, limit:, reset_at:, available? }` (#16) merged with the concurrent-only metric counters (§1.5) the dashboard already renders. Slots free on release rather than on a clock, so `reset_at` is the soonest in-flight slot expiry (a worst-case “available by”), or nil when idle.
31 32 33 34 35 |
# File 'lib/wurk/limiter/concurrent.rb', line 31 def status used = size build_status(used: used, limit: @options[:limit], reset_at: soonest_expiry) .merge(metrics) end |
#type ⇒ Object
20 |
# File 'lib/wurk/limiter/concurrent.rb', line 20 def type = :concurrent |
#within_limit(&block) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/wurk/limiter/concurrent.rb', line 37 def within_limit(&block) raise ArgumentError, 'block required' unless block started = monotime deadline = started + @options[:wait_timeout] slot = random_id acquired_at = nil loop do result = acquire(slot) if result[0].to_i == 1 acquired_at = monotime break end return if @options[:policy] == :ignore remaining = deadline - monotime if remaining <= 0 bump_counter('overages') raise OverLimit, self end sleep [remaining, WAIT_SLEEP].min end begin incr_immediate_or_waited(acquired_at - started) block.call ensure release(slot) bump_counter('held_time', (monotime - acquired_at).to_i) if acquired_at end end |