Class: OpenC3::Topic

Inherits:
Object show all
Defined in:
lib/openc3/topics/topic.rb

Class Method Summary collapse

Class Method Details

.all_same_db_shard?(db_shard_groups) ⇒ Boolean

Check if all db_shard groups resolve to a single db_shard (fast path).

Returns:

  • (Boolean)


96
97
98
# File 'lib/openc3/topics/topic.rb', line 96

def self.all_same_db_shard?(db_shard_groups)
  db_shard_groups.length <= 1
end

.clear_topics(topics, maxlen = 0, db_shard: 0) ⇒ Object



27
28
29
30
# File 'lib/openc3/topics/topic.rb', line 27

def self.clear_topics(topics, maxlen = 0, db_shard: 0)
  store = EphemeralStore.instance(db_shard: db_shard)
  topics.each { |topic| store.xtrim(topic, maxlen) }
end

.del(topic, db_shard: 0) ⇒ Object



68
69
70
# File 'lib/openc3/topics/topic.rb', line 68

def self.del(topic, db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).del(topic)
end

.get_cnt(topic, db_shard: 0) ⇒ Object



32
33
34
35
# File 'lib/openc3/topics/topic.rb', line 32

def self.get_cnt(topic, db_shard: 0)
  _, packet = EphemeralStore.instance(db_shard: db_shard).get_newest_message(topic)
  packet ? packet["received_count"].to_i : 0
end

.get_last_offset(topic, db_shard: 0) ⇒ Object



56
57
58
# File 'lib/openc3/topics/topic.rb', line 56

def self.get_last_offset(topic, db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).get_last_offset(topic)
end

.get_newest_message(topic, db_shard: 0) ⇒ Object



48
49
50
# File 'lib/openc3/topics/topic.rb', line 48

def self.get_newest_message(topic, db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).get_newest_message(topic)
end

.get_oldest_message(topic, db_shard: 0) ⇒ Object



52
53
54
# File 'lib/openc3/topics/topic.rb', line 52

def self.get_oldest_message(topic, db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).get_oldest_message(topic)
end

.group_topics_by_db_shard(topics, target_pattern:, scope:) ⇒ Hash

Group topics by db_shard. Each topic’s target name is extracted and looked up. Topics matching target_pattern are db_sharded; others go to db_shard 0.

Parameters:

  • topics (Array<String>)

    List of topic strings

  • target_pattern (String)

    Substring to identify target-specific topics (e.g. 'CMD}TARGET__', '__TELEMETRY__')

  • scope (String)

    Scope name for db_shard lookup

Returns:

  • (Hash)

    { db_shard => [topic, ...] }



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/openc3/topics/topic.rb', line 78

def self.group_topics_by_db_shard(topics, target_pattern:, scope:)
  groups = {}
  topics.each do |topic|
    if topic.include?(target_pattern)
      target_name = topic.match(/__\{?([^}_]+)\}?__/)[1] rescue nil
      # Handle CMD}TARGET__ pattern where target is after TARGET__
      target_name = topic.split('TARGET__')[1] if target_pattern.include?('TARGET__') && target_name.nil?
      db_shard = (Store.db_shard_for_target(target_name, scope: scope) || 0).to_i
    else
      db_shard = 0
    end
    groups[db_shard] ||= []
    groups[db_shard] << topic
  end
  groups
end

.method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown class methods to EphemeralStore db_shard 0 (system-level topics)



23
24
25
# File 'lib/openc3/topics/topic.rb', line 23

def self.method_missing(message, *args, **kwargs, &block)
  EphemeralStore.public_send(message, *args, **kwargs, &block)
end

.read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil, db_shard: 0, &block) ⇒ Object



44
45
46
# File 'lib/openc3/topics/topic.rb', line 44

def self.read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil, db_shard: 0, &block)
  EphemeralStore.instance(db_shard: db_shard).read_topics(topics, offsets, timeout_ms, count, &block)
end

.trim_topic(topic, minid, approximate = true, limit: 0, db_shard: 0) ⇒ Object



64
65
66
# File 'lib/openc3/topics/topic.rb', line 64

def self.trim_topic(topic, minid, approximate = true, limit: 0, db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).trim_topic(topic, minid, approximate, limit: limit)
end

.update_topic_offsets(topics, db_shard: 0) ⇒ Object



60
61
62
# File 'lib/openc3/topics/topic.rb', line 60

def self.update_topic_offsets(topics, db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).update_topic_offsets(topics)
end

.write_ack(topic, result, msg_id, db_shard: 0) ⇒ Object

Build the ACK topic from a command/router topic and write the ack.



101
102
103
104
105
106
# File 'lib/openc3/topics/topic.rb', line 101

def self.write_ack(topic, result, msg_id, db_shard: 0)
  ack_topic = topic.split("__")
  ack_topic[1] = 'ACK' + ack_topic[1]
  ack_topic = ack_topic.join("__")
  Topic.write_topic(ack_topic, { 'result' => result, 'id' => msg_id }, '*', 100, db_shard: db_shard)
end

.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true', db_shard: 0) ⇒ Object

DB_Shard-aware topic methods for target-specific streams. These explicitly route to the correct EphemeralStore db_shard.



40
41
42
# File 'lib/openc3/topics/topic.rb', line 40

def self.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true', db_shard: 0)
  EphemeralStore.instance(db_shard: db_shard).write_topic(topic, msg_hash, id, maxlen, approximate)
end