Class: OpenC3::Store
Direct Known Subclasses
Constant Summary collapse
- DB_SHARD_CACHE_TIMEOUT =
seconds
60- @@db_shard_cache =
DB_Shard cache: { “scope__target_name” => [db_shard_number, Time] }
{}
- @@db_shard_cache_mutex =
Mutex.new
- @@instance_mutex =
Mutex used to ensure that only one instance is created
Mutex.new
Instance Attribute Summary collapse
-
#redis_pool ⇒ Object
readonly
Returns the value of attribute redis_pool.
-
#redis_url ⇒ Object
readonly
Returns the value of attribute redis_url.
Class Method Summary collapse
-
.db_shard_for_target(target_name, scope: "DEFAULT") ⇒ Object
Look up the db_shard number for a target with a 1-minute cache.
-
.instance(pool_size = 100, db_shard: 0) ⇒ Object
Get the singleton instance.
-
.method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown class methods to delegate to the instance.
Instance Method Summary collapse
- #build_redis ⇒ Object
- #get_last_offset(topic) ⇒ Object
- #get_newest_message(topic) ⇒ Object
-
#get_oldest_message(topic) ⇒ Object
Stream APIs.
-
#initialize(pool_size = 10, db_shard: 0) ⇒ Store
constructor
A new instance of Store.
-
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown methods to redis through the @redis_pool.
- #read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil) ⇒ Object
-
#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer
Trims older entries of the redis stream if needed.
- #update_topic_offsets(topics) ⇒ Object
-
#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true') ⇒ String
Add new entry to the redis stream.
Constructor Details
#initialize(pool_size = 10, db_shard: 0) ⇒ Store
Returns a new instance of Store.
118 119 120 121 122 123 124 |
# File 'lib/openc3/utilities/store_autoload.rb', line 118 def initialize(pool_size = 10, db_shard: 0) @redis_username = ENV['OPENC3_REDIS_USERNAME'] @redis_key = ENV['OPENC3_REDIS_PASSWORD'] hostname = ENV['OPENC3_REDIS_HOSTNAME'].to_s.gsub("SHARDNUM", db_shard.to_s) @redis_url = "redis://#{hostname}:#{ENV.fetch('OPENC3_REDIS_PORT', 6379)}" @redis_pool = StoreConnectionPool.new(size: pool_size) { build_redis() } end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown methods to redis through the @redis_pool
114 115 116 |
# File 'lib/openc3/utilities/store_autoload.rb', line 114 def method_missing(, *args, **kwargs, &block) @redis_pool.with { |redis| redis.public_send(, *args, **kwargs, &block) } end |
Instance Attribute Details
#redis_pool ⇒ Object (readonly)
Returns the value of attribute redis_pool.
61 62 63 |
# File 'lib/openc3/utilities/store_autoload.rb', line 61 def redis_pool @redis_pool end |
#redis_url ⇒ Object (readonly)
Returns the value of attribute redis_url.
60 61 62 |
# File 'lib/openc3/utilities/store_autoload.rb', line 60 def redis_url @redis_url end |
Class Method Details
.db_shard_for_target(target_name, scope: "DEFAULT") ⇒ Object
Look up the db_shard number for a target with a 1-minute cache. Reads directly from Redis db_shard 0 to avoid circular deps with TargetModel. Non-target-specific data (nil target_name) always returns db_shard 0.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/openc3/utilities/store_autoload.rb', line 66 def self.db_shard_for_target(target_name, scope: "DEFAULT") return 0 unless target_name cache_key = "#{scope}__#{target_name}" now = Time.now @@db_shard_cache_mutex.synchronize do cached = @@db_shard_cache[cache_key] if cached db_shard, cached_at = cached return db_shard if (now - cached_at) < DB_SHARD_CACHE_TIMEOUT end end begin json = Store.instance(db_shard: 0).hget("#{scope}__openc3_targets", target_name) db_shard = json ? JSON.parse(json)['db_shard'].to_i : 0 rescue db_shard = 0 end @@db_shard_cache_mutex.synchronize do @@db_shard_cache[cache_key] = [db_shard, now] end db_shard end |
.instance(pool_size = 100, db_shard: 0) ⇒ Object
Get the singleton instance
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/openc3/utilities/store_autoload.rb', line 95 def self.instance(pool_size = 100, db_shard: 0) # Logger.level = Logger::DEBUG @instances ||= [] the_instance = @instances[db_shard] return the_instance if the_instance @@instance_mutex.synchronize do @instances ||= [] @instances[db_shard] ||= self.new(pool_size, db_shard: db_shard) return @instances[db_shard] end end |
.method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown class methods to delegate to the instance
109 110 111 |
# File 'lib/openc3/utilities/store_autoload.rb', line 109 def self.method_missing(, *args, **kwargs, &block) self.instance.public_send(, *args, **kwargs, &block) end |
Instance Method Details
#build_redis ⇒ Object
126 127 128 |
# File 'lib/openc3/utilities/store_autoload.rb', line 126 def build_redis return Redis.new(url: @redis_url, username: @redis_username, password: @redis_key) end |
#get_last_offset(topic) ⇒ Object
159 160 161 162 163 164 165 166 167 168 |
# File 'lib/openc3/utilities/store_autoload.rb', line 159 def get_last_offset(topic) @redis_pool.with do |redis| result = redis.xrevrange(topic, count: 1) if result and result[0] and result[0][0] result[0][0] else "0-0" end end end |
#get_newest_message(topic) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/openc3/utilities/store_autoload.rb', line 145 def (topic) @redis_pool.with do |redis| # Default in xrevrange is range end '+', start '-' which means get all # elements from higher ID to lower ID and since we're limiting to 1 # we get the last element. See https://redis.io/commands/xrevrange. result = redis.xrevrange(topic, count: 1) if result and result.length > 0 return result[0] else return nil end end end |
#get_oldest_message(topic) ⇒ Object
Stream APIs
134 135 136 137 138 139 140 141 142 143 |
# File 'lib/openc3/utilities/store_autoload.rb', line 134 def (topic) @redis_pool.with do |redis| result = redis.xrange(topic, count: 1) if result and result.length > 0 return result[0] else return nil end end end |
#read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/openc3/utilities/store_autoload.rb', line 190 def read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil) return {} if topics.empty? Thread.current[:topic_offsets] ||= {} topic_offsets = Thread.current[:topic_offsets] begin # Logger.debug "read_topics: #{topics}, #{offsets} pool:#{@redis_pool}" @redis_pool.with do |redis| offsets = update_topic_offsets(topics) unless offsets result = redis.xread(topics, offsets, block: timeout_ms, count: count) if result and result.length > 0 result.each do |topic, | .each do |msg_id, msg_hash| topic_offsets[topic] = msg_id yield topic, msg_id, msg_hash, redis if block_given? end end end # Logger.debug "result:#{result}" if result and result.length > 0 return result end rescue Redis::TimeoutError return {} # Should return an empty hash not array - xread returns a hash end end |
#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer
Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim
253 254 255 256 257 |
# File 'lib/openc3/utilities/store_autoload.rb', line 253 def trim_topic(topic, minid, approximate = true, limit: 0) @redis_pool.with do |redis| return redis.xtrim_minid(topic, minid, approximate: approximate, limit: limit) end end |
#update_topic_offsets(topics) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/openc3/utilities/store_autoload.rb', line 170 def update_topic_offsets(topics) offsets = [] topics.each do |topic| # Normally we will just be grabbing the topic offset # this allows xread to get everything past this point Thread.current[:topic_offsets] ||= {} topic_offsets = Thread.current[:topic_offsets] last_id = topic_offsets[topic] if last_id offsets << last_id else # If there is no topic offset this is the first call. # Get the last offset ID so we'll start getting everything from now on offsets << get_last_offset(topic) topic_offsets[topic] = offsets[-1] end end return offsets end |
#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true') ⇒ String
Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd
232 233 234 235 236 237 |
# File 'lib/openc3/utilities/store_autoload.rb', line 232 def write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true') id = '*' if id.nil? @redis_pool.with do |redis| return redis.xadd(topic, msg_hash, id: id, maxlen: maxlen, approximate: approximate) end end |