Class: Cosmo::Job::Limit
- Inherits:
-
Object
- Object
- Cosmo::Job::Limit
- Defined in:
- lib/cosmo/job/limit.rb
Overview
Distributed concurrency limiter backed by NATS Key-Value with per-message TTL.
Each unit of concurrency is a numbered KV slot:
"{concurrency_key}/0", "{concurrency_key}/1", ..., "{concurrency_key}/{limit-1}"
Acquiring a slot is a single atomic ‘set` (CAS with last-revision=0). Only one worker can win a given slot; losers try the next number. When a job finishes the slot is deleted; if the worker crashes NATS expires it automatically via the per-message Nats-TTL header.
Constant Summary collapse
- BUCKET =
"cosmo_jobs_limits"
Class Method Summary collapse
Instance Method Summary collapse
-
#acquire(key, jid:, limit:, duration:) ⇒ String?
Try to acquire one of the numbered slots for
key. -
#initialize ⇒ Limit
constructor
A new instance of Limit.
-
#release(slot) ⇒ Object
Release a previously acquired slot.
Constructor Details
Class Method Details
.instance ⇒ Object
17 18 19 |
# File 'lib/cosmo/job/limit.rb', line 17 def self.instance @instance ||= new end |
Instance Method Details
#acquire(key, jid:, limit:, duration:) ⇒ String?
Try to acquire one of the numbered slots for key.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/cosmo/job/limit.rb', line 32 def acquire(key, jid:, limit:, duration:) 0.upto(limit - 1) do |i| slot = "#{key}/#{i}" @kv.set(slot, jid, ttl: duration) return slot rescue NATS::KeyValue::KeyWrongLastSequenceError next # slot is live, try the next one end nil # all slots occupied end |
#release(slot) ⇒ Object
Release a previously acquired slot.
44 45 46 47 48 |
# File 'lib/cosmo/job/limit.rb', line 44 def release(slot) @kv.delete(slot) rescue NATS::Error # best effort — slot TTL will reclaim it if delete fails end |