Class: OpenC3::StoreQueued

Inherits:
Object show all
Defined in:
lib/openc3/utilities/store_queued.rb

Direct Known Subclasses

EphemeralStoreQueued

Defined Under Namespace

Classes: MessageStruct

Constant Summary collapse

@@instance_mutex =

Mutex used to ensure that only one instance is created

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(update_interval, db_shard: 0) ⇒ StoreQueued

Returns a new instance of StoreQueued.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/openc3/utilities/store_queued.rb', line 46

def initialize(update_interval, db_shard: 0)
  @update_interval = update_interval
  @db_shard = db_shard
  @store = store_instance()
  # Queue to hold the store requests
  @store_queue = Queue.new
  # Sleeper used to delay update thread
  @update_sleeper = Sleeper.new

  at_exit() do
    shutdown()
  end

  # Thread used to call methods on the store
  @update_thread = OpenC3.safe_thread(self.class.to_s) do
    store_thread_body()
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(message, *args, **kwargs, &block) ⇒ Object

Record the message for pipelining by the thread



112
113
114
# File 'lib/openc3/utilities/store_queued.rb', line 112

def method_missing(message, *args, **kwargs, &block)
  @store_queue.push(MessageStruct.new(message, args, kwargs, block))
end

Instance Attribute Details

#update_intervalObject (readonly)

Returns the value of attribute update_interval.



19
20
21
# File 'lib/openc3/utilities/store_queued.rb', line 19

def update_interval
  @update_interval
end

Class Method Details

.instance(update_interval = 1, db_shard: 0) ⇒ Object

Get the singleton instance for the given db_shard Sets the update interval to 1 second by default



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/openc3/utilities/store_queued.rb', line 29

def self.instance(update_interval = 1, db_shard: 0) # seconds
  @instances ||= []
  the_instance = @instances[db_shard]
  return the_instance if the_instance

  @@instance_mutex.synchronize do
    @instances ||= []
    @instances[db_shard] ||= self.new(update_interval, db_shard: db_shard)
    return @instances[db_shard]
  end
end

.method_missing(message, *args, **kwargs) ⇒ Object

Delegate all unknown class methods to delegate to the instance (db_shard 0)



42
43
44
# File 'lib/openc3/utilities/store_queued.rb', line 42

def self.method_missing(message, *args, **kwargs, &)
  self.instance.public_send(message, *args, **kwargs, &)
end

Instance Method Details

#graceful_killObject



121
122
123
# File 'lib/openc3/utilities/store_queued.rb', line 121

def graceful_kill
  # Do nothing
end

#process_queueObject



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/openc3/utilities/store_queued.rb', line 71

def process_queue
  unless @store_queue.empty?
    # Pipeline the requests to redis to improve performance
    @store.redis_pool.pipelined do
      while !@store_queue.empty?
        action = @store_queue.pop()
        @store.public_send(action.message, *action.args, **action.kwargs, &action.block)
      end
    end
  end
end

#set_update_interval(interval) ⇒ Object



65
66
67
68
69
# File 'lib/openc3/utilities/store_queued.rb', line 65

def set_update_interval(interval)
  if interval > 0.0
    @update_interval = interval
  end
end

#shutdownObject



101
102
103
104
105
106
107
# File 'lib/openc3/utilities/store_queued.rb', line 101

def shutdown
  @update_sleeper.cancel if @update_sleeper
  OpenC3.kill_thread(self, @update_thread) if @update_thread
  @update_thread = nil
  # Drain the queue before shutdown
  process_queue()
end

#store_instanceObject

Returns the store we’re working with



117
118
119
# File 'lib/openc3/utilities/store_queued.rb', line 117

def store_instance
  Store.instance(db_shard: @db_shard)
end

#store_thread_bodyObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/openc3/utilities/store_queued.rb', line 83

def store_thread_body
  while true
    start_time = Time.now

    begin
      process_queue()
    rescue => e
      puts "StoreQueued thread error (db_shard=#{@db_shard}):\n#{e.formatted}"
    end

    # Only check whether to update at a set interval
    run_time = Time.now - start_time
    sleep_time = @update_interval - run_time
    sleep_time = 0 if sleep_time < 0
    break if @update_sleeper.sleep(sleep_time)
  end
end