Class: Wurk::Limiter::Concurrent

Inherits:
Base
  • Object
show all
Defined in:
lib/wurk/limiter/concurrent.rb

Overview

Atomic slot acquisition in a ZSET. Score = expiry epoch; the acquire script first evicts expired slots (bumping the ‘reclaimed` metric) then ZADDs if there’s headroom.

On exhaustion: spin loop with backoff. The spec says “blocks via Redis stream XREAD” — that’s a perf optimization; the visible behavior is identical: blocks up to ‘wait_timeout` then OverLimit (or silent return for `policy: :ignore`).

Constant Summary collapse

WAIT_SLEEP =
0.05
METRIC_FIELDS =
%w[held held_time immediate waited wait_time overages reclaimed].freeze

Instance Attribute Summary

Attributes inherited from Base

#name, #options

Instance Method Summary collapse

Methods inherited from Base

#delete, #fingerprint, #initialize, #reset

Constructor Details

This class inherits a constructor from Wurk::Limiter::Base

Instance Method Details

#sizeObject



22
23
24
# File 'lib/wurk/limiter/concurrent.rb', line 22

def size
  Wurk::Limiter.redis { |c| c.call('ZCARD', state_key).to_i }
end

#statusObject

Uniform ‘{ used:, limit:, reset_at:, available? }` (#16) merged with the concurrent-only metric counters (§1.5) the dashboard already renders. Slots free on release rather than on a clock, so `reset_at` is the soonest in-flight slot expiry (a worst-case “available by”), or nil when idle.



31
32
33
34
35
# File 'lib/wurk/limiter/concurrent.rb', line 31

def status
  used = size
  build_status(used: used, limit: @options[:limit], reset_at: soonest_expiry)
    .merge(metrics)
end

#typeObject



20
# File 'lib/wurk/limiter/concurrent.rb', line 20

def type = :concurrent

#within_limit(&block) ⇒ Object

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/wurk/limiter/concurrent.rb', line 37

def within_limit(&block)
  raise ArgumentError, 'block required' unless block

  started = monotime
  deadline = started + @options[:wait_timeout]
  slot = random_id
  acquired_at = nil
  loop do
    result = acquire(slot)
    if result[0].to_i == 1
      acquired_at = monotime
      break
    end

    return if @options[:policy] == :ignore

    remaining = deadline - monotime
    if remaining <= 0
      bump_counter('overages')
      raise OverLimit, self
    end

    sleep [remaining, WAIT_SLEEP].min
  end

  begin
    incr_immediate_or_waited(acquired_at - started)
    block.call
  ensure
    release(slot)
    bump_counter('held_time', (monotime - acquired_at).to_i) if acquired_at
  end
end