Class: OpenC3::Topic
Direct Known Subclasses
AutonomicTopic, CalendarTopic, CommandDecomTopic, CommandTopic, ConfigTopic, DecomInterfaceTopic, InterfaceTopic, LimitsEventTopic, NotebookTopic, QueueTopic, RouterTopic, SystemEventsTopic, TelemetryDecomTopic, TelemetryTopic, TimelineTopic
Class Method Summary
collapse
-
.all_same_db_shard?(db_shard_groups) ⇒ Boolean
Check if all db_shard groups resolve to a single db_shard (fast path).
-
.clear_topics(topics, maxlen = 0, db_shard: 0) ⇒ Object
-
.del(topic, db_shard: 0) ⇒ Object
-
.get_cnt(topic, db_shard: 0) ⇒ Object
-
.get_last_offset(topic, db_shard: 0) ⇒ Object
-
.get_newest_message(topic, db_shard: 0) ⇒ Object
-
.get_oldest_message(topic, db_shard: 0) ⇒ Object
-
.group_topics_by_db_shard(topics, target_pattern:, scope:) ⇒ Hash
Group topics by db_shard.
-
.method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown class methods to EphemeralStore db_shard 0 (system-level topics).
-
.read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil, db_shard: 0, &block) ⇒ Object
-
.trim_topic(topic, minid, approximate = true, limit: 0, db_shard: 0) ⇒ Object
-
.update_topic_offsets(topics, db_shard: 0) ⇒ Object
-
.write_ack(topic, result, msg_id, db_shard: 0) ⇒ Object
Build the ACK topic from a command/router topic and write the ack.
-
.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true', db_shard: 0) ⇒ Object
DB_Shard-aware topic methods for target-specific streams.
Class Method Details
.all_same_db_shard?(db_shard_groups) ⇒ Boolean
Check if all db_shard groups resolve to a single db_shard (fast path).
96
97
98
|
# File 'lib/openc3/topics/topic.rb', line 96
def self.all_same_db_shard?(db_shard_groups)
db_shard_groups.length <= 1
end
|
.clear_topics(topics, maxlen = 0, db_shard: 0) ⇒ Object
27
28
29
30
|
# File 'lib/openc3/topics/topic.rb', line 27
def self.clear_topics(topics, maxlen = 0, db_shard: 0)
store = EphemeralStore.instance(db_shard: db_shard)
topics.each { |topic| store.xtrim(topic, maxlen) }
end
|
.del(topic, db_shard: 0) ⇒ Object
68
69
70
|
# File 'lib/openc3/topics/topic.rb', line 68
def self.del(topic, db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).del(topic)
end
|
.get_cnt(topic, db_shard: 0) ⇒ Object
32
33
34
35
|
# File 'lib/openc3/topics/topic.rb', line 32
def self.get_cnt(topic, db_shard: 0)
_, packet = EphemeralStore.instance(db_shard: db_shard).get_newest_message(topic)
packet ? packet["received_count"].to_i : 0
end
|
.get_last_offset(topic, db_shard: 0) ⇒ Object
56
57
58
|
# File 'lib/openc3/topics/topic.rb', line 56
def self.get_last_offset(topic, db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).get_last_offset(topic)
end
|
.get_newest_message(topic, db_shard: 0) ⇒ Object
48
49
50
|
# File 'lib/openc3/topics/topic.rb', line 48
def self.get_newest_message(topic, db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).get_newest_message(topic)
end
|
.get_oldest_message(topic, db_shard: 0) ⇒ Object
52
53
54
|
# File 'lib/openc3/topics/topic.rb', line 52
def self.get_oldest_message(topic, db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).get_oldest_message(topic)
end
|
.group_topics_by_db_shard(topics, target_pattern:, scope:) ⇒ Hash
Group topics by db_shard. Each topic’s target name is extracted and looked up. Topics matching target_pattern are db_sharded; others go to db_shard 0.
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/openc3/topics/topic.rb', line 78
def self.group_topics_by_db_shard(topics, target_pattern:, scope:)
groups = {}
topics.each do |topic|
if topic.include?(target_pattern)
target_name = topic.match(/__\{?([^}_]+)\}?__/)[1] rescue nil
target_name = topic.split('TARGET__')[1] if target_pattern.include?('TARGET__') && target_name.nil?
db_shard = (Store.db_shard_for_target(target_name, scope: scope) || 0).to_i
else
db_shard = 0
end
groups[db_shard] ||= []
groups[db_shard] << topic
end
groups
end
|
.method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown class methods to EphemeralStore db_shard 0 (system-level topics)
23
24
25
|
# File 'lib/openc3/topics/topic.rb', line 23
def self.method_missing(message, *args, **kwargs, &block)
EphemeralStore.public_send(message, *args, **kwargs, &block)
end
|
.read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil, db_shard: 0, &block) ⇒ Object
44
45
46
|
# File 'lib/openc3/topics/topic.rb', line 44
def self.read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil, db_shard: 0, &block)
EphemeralStore.instance(db_shard: db_shard).read_topics(topics, offsets, timeout_ms, count, &block)
end
|
.trim_topic(topic, minid, approximate = true, limit: 0, db_shard: 0) ⇒ Object
64
65
66
|
# File 'lib/openc3/topics/topic.rb', line 64
def self.trim_topic(topic, minid, approximate = true, limit: 0, db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).trim_topic(topic, minid, approximate, limit: limit)
end
|
.update_topic_offsets(topics, db_shard: 0) ⇒ Object
60
61
62
|
# File 'lib/openc3/topics/topic.rb', line 60
def self.update_topic_offsets(topics, db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).update_topic_offsets(topics)
end
|
.write_ack(topic, result, msg_id, db_shard: 0) ⇒ Object
Build the ACK topic from a command/router topic and write the ack.
101
102
103
104
105
106
|
# File 'lib/openc3/topics/topic.rb', line 101
def self.write_ack(topic, result, msg_id, db_shard: 0)
ack_topic = topic.split("__")
ack_topic[1] = 'ACK' + ack_topic[1]
ack_topic = ack_topic.join("__")
Topic.write_topic(ack_topic, { 'result' => result, 'id' => msg_id }, '*', 100, db_shard: db_shard)
end
|
.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true', db_shard: 0) ⇒ Object
DB_Shard-aware topic methods for target-specific streams. These explicitly route to the correct EphemeralStore db_shard.
40
41
42
|
# File 'lib/openc3/topics/topic.rb', line 40
def self.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true', db_shard: 0)
EphemeralStore.instance(db_shard: db_shard).write_topic(topic, msg_hash, id, maxlen, approximate)
end
|