Class: OpenC3::StoreQueued
- Defined in:
- lib/openc3/utilities/store_queued.rb
Direct Known Subclasses
Defined Under Namespace
Classes: MessageStruct
Constant Summary collapse
- @@instance_mutex =
Mutex used to ensure that only one instance is created
Mutex.new
Instance Attribute Summary collapse
-
#update_interval ⇒ Object
readonly
Returns the value of attribute update_interval.
Class Method Summary collapse
-
.instance(update_interval = 1, db_shard: 0) ⇒ Object
Get the singleton instance for the given db_shard Sets the update interval to 1 second by default.
-
.method_missing(message, *args, **kwargs) ⇒ Object
Delegate all unknown class methods to delegate to the instance (db_shard 0).
Instance Method Summary collapse
- #graceful_kill ⇒ Object
-
#initialize(update_interval, db_shard: 0) ⇒ StoreQueued
constructor
A new instance of StoreQueued.
-
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Record the message for pipelining by the thread.
- #process_queue ⇒ Object
- #set_update_interval(interval) ⇒ Object
- #shutdown ⇒ Object
-
#store_instance ⇒ Object
Returns the store we’re working with.
- #store_thread_body ⇒ Object
Constructor Details
#initialize(update_interval, db_shard: 0) ⇒ StoreQueued
Returns a new instance of StoreQueued.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/openc3/utilities/store_queued.rb', line 46 def initialize(update_interval, db_shard: 0) @update_interval = update_interval @db_shard = db_shard @store = store_instance() # Queue to hold the store requests @store_queue = Queue.new # Sleeper used to delay update thread @update_sleeper = Sleeper.new at_exit() do shutdown() end # Thread used to call methods on the store @update_thread = OpenC3.safe_thread(self.class.to_s) do store_thread_body() end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Record the message for pipelining by the thread
112 113 114 |
# File 'lib/openc3/utilities/store_queued.rb', line 112 def method_missing(, *args, **kwargs, &block) @store_queue.push(MessageStruct.new(, args, kwargs, block)) end |
Instance Attribute Details
#update_interval ⇒ Object (readonly)
Returns the value of attribute update_interval.
19 20 21 |
# File 'lib/openc3/utilities/store_queued.rb', line 19 def update_interval @update_interval end |
Class Method Details
.instance(update_interval = 1, db_shard: 0) ⇒ Object
Get the singleton instance for the given db_shard Sets the update interval to 1 second by default
29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/openc3/utilities/store_queued.rb', line 29 def self.instance(update_interval = 1, db_shard: 0) # seconds @instances ||= [] the_instance = @instances[db_shard] return the_instance if the_instance @@instance_mutex.synchronize do @instances ||= [] @instances[db_shard] ||= self.new(update_interval, db_shard: db_shard) return @instances[db_shard] end end |
.method_missing(message, *args, **kwargs) ⇒ Object
Delegate all unknown class methods to delegate to the instance (db_shard 0)
42 43 44 |
# File 'lib/openc3/utilities/store_queued.rb', line 42 def self.method_missing(, *args, **kwargs, &) self.instance.public_send(, *args, **kwargs, &) end |
Instance Method Details
#graceful_kill ⇒ Object
121 122 123 |
# File 'lib/openc3/utilities/store_queued.rb', line 121 def graceful_kill # Do nothing end |
#process_queue ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/openc3/utilities/store_queued.rb', line 71 def process_queue unless @store_queue.empty? # Pipeline the requests to redis to improve performance @store.redis_pool.pipelined do while !@store_queue.empty? action = @store_queue.pop() @store.public_send(action., *action.args, **action.kwargs, &action.block) end end end end |
#set_update_interval(interval) ⇒ Object
65 66 67 68 69 |
# File 'lib/openc3/utilities/store_queued.rb', line 65 def set_update_interval(interval) if interval > 0.0 @update_interval = interval end end |
#shutdown ⇒ Object
101 102 103 104 105 106 107 |
# File 'lib/openc3/utilities/store_queued.rb', line 101 def shutdown @update_sleeper.cancel if @update_sleeper OpenC3.kill_thread(self, @update_thread) if @update_thread @update_thread = nil # Drain the queue before shutdown process_queue() end |
#store_instance ⇒ Object
Returns the store we’re working with
117 118 119 |
# File 'lib/openc3/utilities/store_queued.rb', line 117 def store_instance Store.instance(db_shard: @db_shard) end |
#store_thread_body ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/openc3/utilities/store_queued.rb', line 83 def store_thread_body while true start_time = Time.now begin process_queue() rescue => e puts "StoreQueued thread error (db_shard=#{@db_shard}):\n#{e.formatted}" end # Only check whether to update at a set interval run_time = Time.now - start_time sleep_time = @update_interval - run_time sleep_time = 0 if sleep_time < 0 break if @update_sleeper.sleep(sleep_time) end end |