Class: Cosmo::Job::Limit

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmo/job/limit.rb

Overview

Distributed concurrency limiter backed by NATS Key-Value with per-message TTL.

Each unit of concurrency is a numbered KV slot:

"{concurrency_key}/0", "{concurrency_key}/1", ..., "{concurrency_key}/{limit-1}"

Acquiring a slot is a single atomic ‘set` (CAS with last-revision=0). Only one worker can win a given slot; losers try the next number. When a job finishes the slot is deleted; if the worker crashes NATS expires it automatically via the per-message Nats-TTL header.

Constant Summary collapse

BUCKET =
"cosmo_jobs_limits"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeLimit

Returns a new instance of Limit.



21
22
23
# File 'lib/cosmo/job/limit.rb', line 21

def initialize
  @kv = API::KV.new(BUCKET, allow_msg_ttl: true)
end

Class Method Details

.instanceObject



17
18
19
# File 'lib/cosmo/job/limit.rb', line 17

def self.instance
  @instance ||= new
end

Instance Method Details

#acquire(key, jid:, limit:, duration:) ⇒ String?

Try to acquire one of the numbered slots for key.

Parameters:

  • key (String)

    concurrency key

  • jid (String)

    stored as the slot value for observability

  • limit (Integer)

    number of slots (0 … limit-1)

  • duration (Integer)

    seconds before the slot is auto-expired by NATS

Returns:

  • (String, nil)

    the acquired slot key, or nil when all slots are taken



32
33
34
35
36
37
38
39
40
41
# File 'lib/cosmo/job/limit.rb', line 32

def acquire(key, jid:, limit:, duration:)
  0.upto(limit - 1) do |i|
    slot = "#{key}/#{i}"
    @kv.set(slot, jid, ttl: duration)
    return slot
  rescue NATS::KeyValue::KeyWrongLastSequenceError
    next # slot is live, try the next one
  end
  nil # all slots occupied
end

#release(slot) ⇒ Object

Release a previously acquired slot.



44
45
46
47
48
# File 'lib/cosmo/job/limit.rb', line 44

def release(slot)
  @kv.delete(slot)
rescue NATS::Error
  # best effort — slot TTL will reclaim it if delete fails
end