Class: OpenC3::SystemEventsTopic

Inherits:
Topic show all
Defined in:
lib/openc3/topics/system_events_topic.rb

Constant Summary collapse

PRIMARY_KEY =
"OPENC3__SYSTEM__EVENTS".freeze

Class Method Summary collapse

Methods inherited from Topic

all_same_db_shard?, clear_topics, del, get_cnt, get_last_offset, get_newest_message, get_oldest_message, group_topics_by_db_shard, method_missing, read_topics, trim_topic, write_ack, write_topic

Class Method Details

._active_db_shardsObject

Collect all unique target db_shards from TargetModel



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/openc3/topics/system_events_topic.rb', line 21

def self._active_db_shards
  db_shards = Set.new([0])
  # Iterate all scopes to find all target db_shards
  Store.scan_each(match: '*__openc3_targets', type: 'hash') do |key|
    Store.hgetall(key).each do |_name, json|
      parsed = JSON.parse(json, allow_nan: true, create_additions: true)
      db_shards << (parsed['db_shard'] || 0).to_i
    end
  end
  db_shards
end

.readObject



46
47
48
49
50
# File 'lib/openc3/topics/system_events_topic.rb', line 46

def self.read()
  Topic.read_topics([PRIMARY_KEY]) do |_topic, _msg_id, msg_hash, _redis|
    yield JSON.parse(msg_hash['event'])
  end
end

.update_topic_offsetsObject



33
34
35
# File 'lib/openc3/topics/system_events_topic.rb', line 33

def self.update_topic_offsets()
  Topic.update_topic_offsets([PRIMARY_KEY])
end

.write(type, event) ⇒ Object



37
38
39
40
41
42
43
44
# File 'lib/openc3/topics/system_events_topic.rb', line 37

def self.write(type, event)
  event['type'] = type
  msg = {event: JSON.generate(event, allow_nan: true)}
  # Write to all active db_shards so every interface microservice can read system events inline
  _active_db_shards.each do |db_shard|
    Topic.write_topic(PRIMARY_KEY, msg, '*', 1000, db_shard: db_shard)
  end
end