Class: Rapidity::Share::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/rapidity/share/base.rb

Direct Known Subclasses

Producer, Sender

Constant Summary collapse

LUA_SCRIPTS =
[]
BASE_SCRIPTS =
[:list, :info, :reset, :delete]
DEFAULT_KEY_TTL =
6000

Instance Method Summary collapse

Constructor Details

#initialize(pool, ttl: DEFAULT_KEY_TTL.to_i, logger: nil) ⇒ Base

Returns a new instance of Base.



11
12
13
14
15
16
17
# File 'lib/rapidity/share/base.rb', line 11

def initialize(pool, ttl: DEFAULT_KEY_TTL.to_i, logger: nil)
  @pool = pool
  @ttl = ttl
  @logger = logger || Logger.new(STDOUT)
  @logger.level = Logger::DEBUG
  # load_redis_scripts
end

Instance Method Details

#build_limit(redis_data) ⇒ Object



94
95
96
97
98
# File 'lib/rapidity/share/base.rb', line 94

def build_limit(redis_data)
  name = redis_data[0]
  params = redis_data[1].each_slice(2).to_h
  Limit.from_hash(name, **params.symbolize_keys)
end

#delete(limit_or_str) ⇒ OpenStruct

Deletes a limit from Redis

Parameters:

  • limit_or_str (Limit, String)

    limit object or its name

Returns:

  • (OpenStruct)

    operation result with success field



56
57
58
59
60
61
62
# File 'lib/rapidity/share/base.rb', line 56

def delete(limit_or_str)
  response = wrap_executed_script do |r|
    r.evalsha(@lua_delete, keys: [get_name(limit_or_str)])
  end

  handle_response(response)
end

#get_name(limit_or_str) ⇒ Object



90
91
92
# File 'lib/rapidity/share/base.rb', line 90

def get_name(limit_or_str)
  limit_or_str.is_a?(Limit) ? limit_or_str.name : limit_or_str
end

#handle_response(response, with_limit: false) ⇒ OpenStruct

Processes Redis response and converts it to OpenStruct

Parameters:

  • response (Array)

    raw Redis response

  • with_limit (Boolean) (defaults to: false)

    whether to include limit object in result

Returns:

  • (OpenStruct)

    structured response



82
83
84
85
86
87
88
# File 'lib/rapidity/share/base.rb', line 82

def handle_response(response, with_limit: false)
  response = response.each_slice(2).to_h
  success = response["result"] == "true"
  result_data = { success: success, **response }
  result_data[:limit] = build_limit(response["info"]) if success && with_limit
  OpenStruct.new(result_data)
end

#info(limit_or_str, ttl: @ttl) ⇒ OpenStruct

Retrieves information about a limit

Parameters:

  • limit_or_str (Limit, String)

    limit object or its name

  • ttl (Integer) (defaults to: @ttl)

    key TTL

Returns:

  • (OpenStruct)

    operation result with success and limit fields



69
70
71
72
73
74
75
# File 'lib/rapidity/share/base.rb', line 69

def info(limit_or_str, ttl: @ttl)
  response = wrap_executed_script do |r|
    r.evalsha(@lua_info, keys: [get_name(limit_or_str)], argv: [ttl])
  end

  handle_response(response, with_limit: true)
end

#list(match_pattern, max_count: 1000) ⇒ Array<Limit>

Returns a list of limits matching the pattern

Parameters:

  • match_pattern (String)

    pattern for key matching

  • max_count (Integer) (defaults to: 1000)

    maximum number of records to return

Returns:

  • (Array<Limit>)

    array of Limit objects or empty array



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/rapidity/share/base.rb', line 24

def list(match_pattern, max_count: 1000)
  response = wrap_executed_script do |r|
    r.evalsha(@lua_list, argv: [match_pattern, max_count])
  end

  response = response.each_slice(2).to_h
  if response["count"].to_i > 0
    response["limits"].map do |data|
      build_limit(data)
    end
  else
    []
  end
end

#reset(limit_or_str, ttl: @ttl) ⇒ OpenStruct

Resets limit values

Parameters:

  • limit_or_str (Limit, String)

    limit object or its name

  • ttl (Integer) (defaults to: @ttl)

    key TTL after reset

Returns:

  • (OpenStruct)

    operation result with success and limit fields



44
45
46
47
48
49
50
# File 'lib/rapidity/share/base.rb', line 44

def reset(limit_or_str, ttl: @ttl)
  response = wrap_executed_script do |r|
    r.evalsha(@lua_reset, keys: [get_name(limit_or_str)], argv: [ttl])
  end

  handle_response(response, with_limit: true)
end

#wrap_executed_script(max_retries: 5, delay: 0.1, &block) ⇒ Array

Wrapper for Redis script execution with retry logic

Parameters:

  • max_retries (Integer) (defaults to: 5)

    maximum number of retry attempts

  • delay (Float) (defaults to: 0.1)

    delay between retries in seconds

  • block (Proc)

    block containing Redis operations

Returns:

  • (Array)

    Redis response or error response



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/rapidity/share/base.rb', line 106

def wrap_executed_script(max_retries: 5, delay: 0.1, &block)
  retries_count = 0
  begin
    @pool.with do |conn|
      conn.with do |r|
        yield r
      end
    end
  rescue Redis::CannotConnectError, Redis::TimeoutError, Errno::ECONNREFUSED => e
    retries_count += 1
    if retries_count < max_retries
      @logger.warn("Redis connection error: #{e.message}.")
      sleep(delay)
      retry
    else
      @logger.error("Redis is not available: #{e.message}")
      raise e
    end
  rescue ::Redis::CommandError => e
    if e.message.include?('NOSCRIPT')
      retries_count += 1
      if retries_count < max_retries
        @logger.warn("Get not script error from redis: #{e.message}. Reload lua scripts")
        # существует вероятность что сервер мог быть перезагружен
        # и нужно заново загрузить скрипты
        load_redis_scripts
        retry
      end
    end
    raise e
  rescue TypeError => e
    if e.message.include?('Unsupported command argument type: NilClass')
      retries_count += 1
      if retries_count < max_retries
        @logger.info("First time load lua scripts")
        # При первом запуске instance_variable с lua скриптами не инициализированы, при этом
        # evalsha райзит эту ошибку. Загружаем скрипты в redis
        load_redis_scripts
        retry
      end
    end
    raise e
  end
end