Class: Rapidity::Share::Producer

Inherits:
Base
  • Object
show all
Defined in:
lib/rapidity/share/producer.rb

Constant Summary collapse

LUA_SCRIPTS =
[:init, :check_queue, :acquire_queue]

Constants inherited from Base

Base::BASE_SCRIPTS, Base::DEFAULT_KEY_TTL

Instance Method Summary collapse

Methods inherited from Base

#build_limit, #delete, #get_name, #handle_response, #info, #list, #reset, #wrap_executed_script

Constructor Details

#initialize(*args, **kwargs) ⇒ Producer

Returns a new instance of Producer.



7
8
9
# File 'lib/rapidity/share/producer.rb', line 7

def initialize(*args, **kwargs)
  super(*args, **kwargs)
end

Instance Method Details

#acquire_queue(list_limits_or_str, count: 1, ttl: @ttl) ⇒ OpenStruct

Acquires tokens from the rate limit semaphore for ‘Feedback-Driven Flow Control`

Attempts to acquire the specified number of tokens from the semaphore. If tokens are available, they are reserved for the caller.

Parameters:

  • list_limits_or_str (Array<Limit>, Array<String>)

    array of limit objects or limit names

  • count (Integer) (defaults to: 1)

    number of tokens to acquire

  • ttl (Integer) (defaults to: @ttl)

    time-to-live for the acquired tokens

Returns:

  • (OpenStruct)

    result of the acquisition attempt

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rapidity/share/producer.rb', line 56

def acquire_queue(list_limits_or_str, count: 1, ttl: @ttl)
  list_limits_or_str = [list_limits_or_str] unless list_limits_or_str.is_a?(Array)
  
  raise ArgumentError, "limits list is empty" if list_limits_or_str.empty?
  raise ArgumentError, "count must be positive" unless count > 0


  limits = if list_limits_or_str[0].is_a?(Limit)
    list_limits_or_str.map {|it| it.name}
  else
    list_limits_or_str
  end

  response = wrap_executed_script do |r|
    r.evalsha(@lua_acquire_queue, keys: [*limits], argv: [count, ttl])
  end
  handle_response(response)
end

#check_queue(limit_or_str, ttl: @ttl) ⇒ OpenStruct

Checks the current state of the rate limit semaphore for ‘Feedback-Driven Flow Control`

Retrieves information about the semaphore status for a specific limit

Parameters:

  • limit_or_str (Limit, String)

    limit object or its name

  • ttl (Integer) (defaults to: @ttl)

    time-to-live for the semaphore check

Returns:

  • (OpenStruct)

    semaphore status information



39
40
41
42
43
44
45
# File 'lib/rapidity/share/producer.rb', line 39

def check_queue(limit_or_str, ttl: @ttl)
  response = wrap_executed_script do |r|
    r.evalsha(@lua_check_queue, keys: [get_name(limit_or_str)], argv: [ttl])
  end

  handle_response(response)
end

#init(limit, ttl: @ttl) ⇒ Boolean

Initializes a new rate limit in Redis

Creates or updates a rate limit with the specified parameters. If the limit already exists, it will be updated with new parameters.

Parameters:

  • limit (Limit)

    limit object to initialize

  • ttl (Integer) (defaults to: @ttl)

    time-to-live for the limit key in seconds

Returns:

  • (Boolean)

    true if the limit was successfully initialized and persisted



19
20
21
22
23
24
25
26
# File 'lib/rapidity/share/producer.rb', line 19

def init(limit, ttl: @ttl)
  result = wrap_executed_script do |r|
    r.evalsha(@lua_init, keys: [limit.name], argv: [*limit.base_params, ttl])
  end

  limit = build_limit(result)
  limit.valid? && limit.persisted?
end

#update(*args, **kwargs) ⇒ Object



28
29
30
# File 'lib/rapidity/share/producer.rb', line 28

def update(*args, **kwargs)
  init(*args, **kwargs)
end