Class: OpenC3::ScopeCleanupMicroservice

Inherits:
CleanupMicroservice show all
Defined in:
lib/openc3/microservices/scope_cleanup_microservice.rb

Constant Summary collapse

TSDB_HEALTH_QUERY =
"SELECT
table_name,
table_row_count,
wal_pending_row_count,
CASE
    WHEN table_suspended THEN 'SUSPENDED'
    WHEN table_memory_pressure_level = 2 THEN 'BACKOFF'
    WHEN table_memory_pressure_level = 1 THEN 'PRESSURE'
    ELSE 'OK'
END AS status,
wal_txn - table_txn AS lag_txns,
table_write_amp_p50 AS write_amp,
table_merge_rate_p99 AS slowest_merge
FROM tables()
WHERE walEnabled
ORDER BY
table_suspended DESC,
table_memory_pressure_level DESC,
wal_pending_row_count DESC;"
GROWTH_NUM_SAMPLE_PERIODS =
4

Instance Attribute Summary

Attributes inherited from Microservice

#count, #custom, #error, #logger, #microservice_status_thread, #name, #scope, #secrets, #state

Instance Method Summary collapse

Methods inherited from CleanupMicroservice

#run, #shutdown

Methods inherited from Microservice

#as_json, #microservice_cmd, run, #run, #setup_microservice_topic, #shutdown

Constructor Details

#initialize(*args) ⇒ ScopeCleanupMicroservice

Returns a new instance of ScopeCleanupMicroservice.



43
44
45
46
47
48
49
50
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 43

def initialize(*args)
  super(*args)
  @run_time = nil
  @cleanup_poll_time = nil
  @delta_time = 0.0
  @wal_pending_row_count = {}
  @lag_txns = {}
end

Instance Method Details

#cleanup(areas, bucket) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 52

def cleanup(areas, bucket)
  current_time = Time.now
  if @run_time
    delta = current_time - @run_time
    if delta > 0.0
      @delta_time += delta
    end
  end
  @run_time = current_time
  if @delta_time > @cleanup_poll_time
    @delta_time = 0.0
    super(areas, bucket)
  end

  # Always check TSDB health across all db_shards
  if @scope == 'DEFAULT'
    # Collect all unique db_shard numbers from targets
    db_shards = Set.new([0]) # Always check db_shard 0
    begin
      targets = OpenC3::TargetModel.all(scope: @scope)
      targets.each_value { |target| db_shards << target['db_shard'].to_i if target['db_shard'] }
    rescue => e
      @logger.error("QuestDB: Error getting target db_shards: #{e.formatted}")
    end

    db_shards.each do |db_shard|
      begin
        conn = OpenC3::QuestDBClient.connection(db_shard: db_shard)
        result = conn.exec(TSDB_HEALTH_QUERY)
        columns = result.fields
        rows = result.values

        table_name_column = columns.index("table_name")
        wal_pending_row_count_column = columns.index("wal_pending_row_count")
        status_column = columns.index("status")
        lag_txns_column = columns.index("lag_txns")

        rows.each do |values|
          table_name = values[table_name_column]
          # Prefix with db_shard to avoid key collisions across db_shards
          tracking_key = "s#{db_shard}__#{table_name}"
          wal_pending_row_count = values[wal_pending_row_count_column].to_i
          status = values[status_column]
          lag_txns = values[lag_txns_column].to_i

          if status != 'OK'
            @logger.error("QuestDB db_shard #{db_shard}: #{table_name} in bad state: #{status}")

            if status == 'SUSPENDED'
              # Try to automatically unsuspend
              @logger.info("QuestDB db_shard #{db_shard}: Attempting to unsuspend: #{table_name}")
              conn.exec("ALTER TABLE \"#{table_name}\" RESUME WAL;")
            end
          end

          @wal_pending_row_count[tracking_key] ||= []
          @wal_pending_row_count[tracking_key] << wal_pending_row_count
          @lag_txns[tracking_key] ||= []
          @lag_txns[tracking_key] << lag_txns

          if @wal_pending_row_count[tracking_key].length > GROWTH_NUM_SAMPLE_PERIODS
            if detect_growth(@wal_pending_row_count[tracking_key], GROWTH_NUM_SAMPLE_PERIODS)
              # Crossed threshold of sample periods of growth
              @logger.error("QuestDB db_shard #{db_shard}: #{table_name} has growing wal_pending_row_count: #{wal_pending_row_count}")
            end

            # Leave the last GROWTH_NUM_SAMPLE_PERIODS samples
            @wal_pending_row_count[tracking_key] = @wal_pending_row_count[tracking_key][-GROWTH_NUM_SAMPLE_PERIODS..-1]
          end

          if @lag_txns[tracking_key].length > GROWTH_NUM_SAMPLE_PERIODS
            if detect_growth(@lag_txns[tracking_key], GROWTH_NUM_SAMPLE_PERIODS)
              # Crossed threshold of sample periods of growth
              @logger.error("QuestDB db_shard #{db_shard}: #{table_name} has growing lag_txns: #{lag_txns}")
            end

            # Leave the last GROWTH_NUM_SAMPLE_PERIODS samples
            @lag_txns[tracking_key] = @lag_txns[tracking_key][-GROWTH_NUM_SAMPLE_PERIODS..-1]
          end
        end
      rescue => e
        OpenC3::QuestDBClient.disconnect(db_shard: db_shard)
        @logger.error("QuestDB db_shard #{db_shard} Error: #{e.formatted}")
      end
    end
  end
end

#detect_growth(array, num_samples) ⇒ Object



140
141
142
143
144
145
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 140

def detect_growth(array, num_samples)
  num_samples.times do |index|
    return false if array[index + 1] <= array[index]
  end
  return true
end

#get_areas_and_poll_timeObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/openc3/microservices/scope_cleanup_microservice.rb', line 147

def get_areas_and_poll_time
  scope = ScopeModel.get_model(name: @scope)
  areas = [
    ["#{@scope}/text_logs/openc3_log_messages", scope.text_log_retain_time],
    ["#{@scope}/tool_logs/sr", scope.tool_log_retain_time],
  ]

  if @scope == 'DEFAULT'
    areas << ["NOSCOPE/text_logs/openc3_log_messages", scope.text_log_retain_time]
    areas << ["NOSCOPE/tool_logs/sr", scope.tool_log_retain_time]
  end

  @cleanup_poll_time = scope.cleanup_poll_time
  return areas, 60 # Run every 1 minute for TSDB checks
end