Class: OpenC3::QuestDBClient

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/utilities/questdb_client.rb

Overview

Utility class for QuestDB data encoding and decoding. This provides a common interface for serializing/deserializing COSMOS data types when writing to and reading from QuestDB.

Defined Under Namespace

Classes: QuestDBError

Constant Summary collapse

DB_SHARD_CACHE_TIMEOUT =

seconds

60
TIMESTAMP_ITEMS =

Special timestamp items that are calculated from PACKET_TIMESECONDS/RECEIVED_TIMESECONDS columns rather than stored as separate columns. PACKET_TIMESECONDS and RECEIVED_TIMESECONDS are stored as timestamp_ns columns and need conversion to float seconds on read. The TIMEFORMATTED items are derived from these timestamp columns.

{
  'PACKET_TIMEFORMATTED' => { source: 'PACKET_TIMESECONDS', format: :formatted },
  'RECEIVED_TIMEFORMATTED' => { source: 'RECEIVED_TIMESECONDS', format: :formatted }
}.freeze
STORED_TIMESTAMP_ITEMS =

Stored timestamp items that are stored as timestamp_ns columns and need conversion to float seconds on read. Distinguished from calculated items above.

Set.new(['PACKET_TIMESECONDS', 'RECEIVED_TIMESECONDS']).freeze
FLOAT64_POS_INF_SENTINEL =

64-bit double sentinels (for FLOAT 64-bit columns)

1.7976931348623155e308
FLOAT64_NEG_INF_SENTINEL =
-1.7976931348623155e308
FLOAT64_NAN_SENTINEL =
-1.7976931348623153e308
FLOAT32_POS_INF_STORED =

32-bit float sentinels (what we read back after 32-bit storage)

3.4028232635611926e38
FLOAT32_NEG_INF_STORED =
-3.4028232635611926e38
FLOAT32_NAN_STORED =
-3.4028230607370965e38
TIMESTAMP_SELECT =

SQL: nanosecond-precision packet timestamp for explicit SELECT lists. PG wire protocol truncates timestamp_ns to microseconds; CAST AS LONG preserves full precision.

'CAST(PACKET_TIMESECONDS AS LONG) as PACKET_TIMESECONDS'
TIMESTAMP_EXTRAS =

SQL: nanosecond-precision timestamps for SELECT * queries (different aliases avoid column name collision).

'CAST(PACKET_TIMESECONDS AS LONG) as "__pkt_time_ns", CAST(RECEIVED_TIMESECONDS AS LONG) as "__rx_time_ns"'

Class Method Summary collapse

Class Method Details

.add_timestamp_entries!(entry, timestamp_ns, prefix) ⇒ Object

Add TIMESECONDS and TIMEFORMATTED entries to a hash from a nanosecond timestamp. Used when building packet entries from CAST(timestamp AS LONG) columns.

Parameters:

  • entry (Hash)

    Entry hash to populate

  • timestamp_ns (Integer)

    Nanoseconds since epoch

  • prefix (String)

    ‘PACKET’ or ‘RECEIVED’



575
576
577
578
579
580
# File 'lib/openc3/utilities/questdb_client.rb', line 575

def self.add_timestamp_entries!(entry, timestamp_ns, prefix)
  return unless timestamp_ns
  utc_time = nsec_to_utc_time(timestamp_ns)
  entry["#{prefix}_TIMESECONDS"] = format_timestamp(utc_time, :seconds)
  entry["#{prefix}_TIMEFORMATTED"] = format_timestamp(utc_time, :formatted)
end

.build_aggregation_selects(safe_item_name, value_type, item_name: nil) ⇒ Array<String>, Hash

Build aggregation SELECT columns (min/max/avg/stddev) for a single item. Returns the SELECT fragments and a column_mapping hash.

Parameters:

  • safe_item_name (String)

    Sanitized column name

  • value_type (Symbol)

    :RAW or :CONVERTED

  • item_name (String, nil) (defaults to: nil)

    Original (unsanitized) item name for mapping values. Defaults to safe_item_name if not provided.

Returns:

  • (Array<String>, Hash)

    Two-element array: [select_fragments, column_mapping] column_mapping maps result column alias to [item_name, reduced_type, value_type]



516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/openc3/utilities/questdb_client.rb', line 516

def self.build_aggregation_selects(safe_item_name, value_type, item_name: nil)
  item_name ||= safe_item_name
  selects = []
  mapping = {}
  case value_type
  when :RAW
    col = safe_item_name
    { 'N' => :MIN, 'X' => :MAX, 'A' => :AVG, 'S' => :STDDEV }.each do |suffix, reduced_type|
      alias_name = "#{safe_item_name}__#{suffix}"
      selects << "#{reduced_type.to_s.downcase}(\"#{col}\") as \"#{alias_name}\""
      mapping[alias_name] = [item_name, reduced_type, :RAW]
    end
  when :CONVERTED
    col = "#{safe_item_name}__C"
    { 'CN' => :MIN, 'CX' => :MAX, 'CA' => :AVG, 'CS' => :STDDEV }.each do |suffix, reduced_type|
      alias_name = "#{safe_item_name}__#{suffix}"
      selects << "#{reduced_type.to_s.downcase}(\"#{col}\") as \"#{alias_name}\""
      mapping[alias_name] = [item_name, reduced_type, :CONVERTED]
    end
  else
    # No aggregation for FORMATTED type since it is a string
    raise QuestDBError.new("Unsupported value type for aggregation: #{value_type}")
  end
  [selects, mapping]
end

.build_item_columns_query(table_name, column_names, start_time, end_time, include_received_ts: false) ⇒ String

Build a SELECT query for specific item columns from a single table.

Parameters:

  • table_name (String)

    Sanitized QuestDB table name

  • column_names (Array<String>)

    Quoted column expressions (e.g., ‘“TEMP1__C”’)

  • start_time (Integer)

    Start timestamp in nanoseconds

  • end_time (Integer, nil)

    End timestamp in nanoseconds

  • include_received_ts (Boolean) (defaults to: false)

    Whether to include RECEIVED_TIMESECONDS

Returns:

  • (String)

    Complete SQL query (without LIMIT clause)



650
651
652
653
654
655
656
657
658
# File 'lib/openc3/utilities/questdb_client.rb', line 650

def self.build_item_columns_query(table_name, column_names, start_time, end_time, include_received_ts: false)
  names = column_names.dup
  names << TIMESTAMP_SELECT
  names << "RECEIVED_TIMESECONDS" if include_received_ts
  names << "COSMOS_EXTRA"
  query = "SELECT #{names.join(', ')} FROM \"#{table_name}\""
  query += time_where_clause(start_time, end_time)
  query
end

.build_item_defs_map(packet_def) ⇒ Hash

Build a hash mapping sanitized column names to item definitions. Used for type-aware decoding of QuestDB SELECT * results.

Parameters:

  • packet_def (Hash, nil)

    Packet definition from TargetModel.packet

Returns:

  • (Hash)

    { sanitized_column_name => item_def_hash }



498
499
500
501
502
503
504
505
# File 'lib/openc3/utilities/questdb_client.rb', line 498

def self.build_item_defs_map(packet_def)
  map = {}
  return map unless packet_def
  packet_def['items']&.each do |item|
    map[sanitize_column_name(item['name'])] = item
  end
  map
end

.build_packet_query(table_name, start_time, end_time) ⇒ String

Build a SELECT * query for full packet data from a single table.

Parameters:

  • table_name (String)

    Sanitized QuestDB table name

  • start_time (Integer)

    Start timestamp in nanoseconds

  • end_time (Integer, nil)

    End timestamp in nanoseconds

Returns:

  • (String)

    Complete SQL query (without LIMIT clause)



666
667
668
669
670
# File 'lib/openc3/utilities/questdb_client.rb', line 666

def self.build_packet_query(table_name, start_time, end_time)
  query = "SELECT *, #{TIMESTAMP_EXTRAS} FROM \"#{table_name}\""
  query += time_where_clause(start_time, end_time)
  query
end

.build_packet_reduced_selects(packet_def, value_type) ⇒ Array<String>, Boolean

Build aggregation SELECT columns for all numeric items in a packet definition. Filters out STRING, BLOCK, and DERIVED items since they can’t be aggregated.

Parameters:

  • packet_def (Hash, nil)

    Packet definition from TargetModel.packet

  • value_type (Symbol)

    :RAW or :CONVERTED

Returns:

  • (Array<String>, Boolean)

    Two-element array: [select_fragments, has_numeric_items] select_fragments includes TIMESTAMP_SELECT as the first element.



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# File 'lib/openc3/utilities/questdb_client.rb', line 549

def self.build_packet_reduced_selects(packet_def, value_type)
  selects = [TIMESTAMP_SELECT]
  has_items = false
  return [selects, false] unless packet_def && packet_def['items']

  packet_def['items'].each do |item|
    data_type = item['data_type']
    next if data_type.nil?
    next if ['STRING', 'BLOCK', 'DERIVED'].include?(data_type)
    next unless value_type == :RAW || value_type == :CONVERTED

    safe_name = sanitize_column_name(item['name'])
    agg_selects, _mapping = build_aggregation_selects(safe_name, value_type)
    selects.concat(agg_selects)
    has_items = true
  end

  [selects, has_items]
end

.build_reduced_query(table_name, select_columns, start_time, end_time, sample_interval) ⇒ String

Build a SAMPLE BY aggregation query for reduced data.

Parameters:

  • table_name (String)

    Sanitized QuestDB table name

  • select_columns (Array<String>)

    SELECT column expressions including aggregations

  • start_time (Integer)

    Start timestamp in nanoseconds

  • end_time (Integer, nil)

    End timestamp in nanoseconds

  • sample_interval (String)

    QuestDB SAMPLE BY interval (‘1m’, ‘1h’, ‘1d’)

Returns:

  • (String)

    Complete SQL query (without LIMIT clause)



680
681
682
683
684
685
686
687
# File 'lib/openc3/utilities/questdb_client.rb', line 680

def self.build_reduced_query(table_name, select_columns, start_time, end_time, sample_interval)
  query = "SELECT #{select_columns.join(', ')} FROM \"#{table_name}\""
  query += time_where_clause(start_time, end_time)
  query += " SAMPLE BY #{sample_interval}"
  query += " ALIGN TO CALENDAR"
  query += " ORDER BY PACKET_TIMESECONDS"
  query
end

.check_connection(db_shard: 0) ⇒ Object

Health check - attempt to connect and immediately close. Returns true if successful, raises on failure.



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/openc3/utilities/questdb_client.rb', line 126

def self.check_connection(db_shard: 0)
  conn = PG::Connection.new(
    host: hostname_for_db_shard(db_shard),
    port: ENV['OPENC3_TSDB_QUERY_PORT'],
    user: ENV['OPENC3_TSDB_USERNAME'],
    password: ENV['OPENC3_TSDB_PASSWORD'],
    dbname: 'qdb'
  )
  conn.close
  true
end

.coerce_to_utc(value) ⇒ Time?

Coerce a value from QuestDB (which may be a Time, Float, Integer, String, or PG timestamp object) into a Ruby UTC Time.

Parameters:

  • value (Object)

    Timestamp value in any supported format

Returns:

  • (Time, nil)

    UTC Time object or nil



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/openc3/utilities/questdb_client.rb', line 392

def self.coerce_to_utc(value)
  return nil unless value
  case value
  when Time
    # PG driver returns Time objects with UTC values but in local timezone,
    # so reconstruct as UTC from components rather than converting
    pg_timestamp_to_utc(value)
  when Float
    Time.at(value).utc
  when Integer
    nsec_to_utc_time(value).utc
  when String
    Time.parse(value).utc
  else
    raise QuestDBError.new("Unsupported timestamp value #{value} with type: #{value.class}")
  end
end

.column_suffix_for_value_type(value_type) ⇒ String

Return the QuestDB column suffix for a given value type.

Parameters:

  • value_type (String)

    One of ‘RAW’, ‘CONVERTED’, ‘FORMATTED’

Returns:

  • (String)

    Column suffix (e.g., ‘__C’, ‘__F’, or ”)



443
444
445
446
447
448
449
450
451
452
# File 'lib/openc3/utilities/questdb_client.rb', line 443

def self.column_suffix_for_value_type(value_type)
  case value_type
  when 'FORMATTED', 'WITH_UNITS' # WITH_UNITS is deprecated
    '__F'
  when 'CONVERTED'
    '__C'
  else
    ''
  end
end

.connection(db_shard: 0) ⇒ Object

Get or create a thread-local PG connection for the given db_shard with type mapping configured. Returns the thread-local connection - callers should not close it.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/openc3/utilities/questdb_client.rb', line 81

def self.connection(db_shard: 0)
  conns = @thread_conns.value
  conn = conns[db_shard]
  if conn and not conn.finished?
    begin
      conn.check_socket
      return conn
    rescue
      # Will need to reconnect
    end
  end
  conn = PG::Connection.new(
    host: hostname_for_db_shard(db_shard),
    port: ENV['OPENC3_TSDB_QUERY_PORT'],
    user: ENV['OPENC3_TSDB_USERNAME'],
    password: ENV['OPENC3_TSDB_PASSWORD'],
    dbname: 'qdb'
  )
  conn.type_map_for_results = PG::BasicTypeMapForResults.new(conn)
  conns[db_shard] = conn
  @thread_conns.value = conns
  conn
end

.db_shard_for_target(target_name, scope: "DEFAULT") ⇒ Object

Look up the db_shard number for a target from TargetModel with a 1-minute cache. Non-target-specific data (nil target_name) always returns db_shard 0.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/openc3/utilities/questdb_client.rb', line 50

def self.db_shard_for_target(target_name, scope: "DEFAULT")
  return 0 unless target_name

  cache_key = "#{scope}__#{target_name}"
  now = Time.now

  @db_shard_cache_mutex.synchronize do
    cached = @db_shard_cache[cache_key]
    if cached
      db_shard, cached_at = cached
      return db_shard if (now - cached_at) < DB_SHARD_CACHE_TIMEOUT
    end
  end

  # Cache miss or expired — look up from TargetModel
  begin
    model = TargetModel.get(name: target_name, scope: scope)
    db_shard = model ? model['db_shard'].to_i : 0
  rescue
    db_shard = 0
  end

  @db_shard_cache_mutex.synchronize do
    @db_shard_cache[cache_key] = [db_shard, now]
  end

  db_shard
end

.decode_float_special_values(value) ⇒ Float

Decode sentinel values back to float special values (inf, -inf, nan). Checks against both 32-bit and 64-bit sentinel values since we may not know the original column type at read time.

Parameters:

  • value (Float)

    The float value to potentially decode

Returns:

  • (Float)

    The value with sentinels replaced by special values



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/openc3/utilities/questdb_client.rb', line 170

def self.decode_float_special_values(value)
  return value unless value.is_a?(Float)

  # Check 64-bit sentinels
  return Float::INFINITY if value == FLOAT64_POS_INF_SENTINEL
  return -Float::INFINITY if value == FLOAT64_NEG_INF_SENTINEL
  return Float::NAN if value == FLOAT64_NAN_SENTINEL

  # Check 32-bit sentinels (stored values after precision loss)
  return Float::INFINITY if value == FLOAT32_POS_INF_STORED
  return -Float::INFINITY if value == FLOAT32_NEG_INF_STORED
  return Float::NAN if value == FLOAT32_NAN_STORED

  value
end

.decode_item_row(row, sql_to_local, meta) ⇒ Hash

Decode a single row from a per-table item columns query into an entry hash. Handles stored timestamps, calculated timestamps, and regular value decoding.

Parameters:

  • row (PG::Result row)

    Single row (iterable as [col_name, value] pairs)

  • sql_to_local (Array<Integer>)

    Mapping from SQL column index to meta position

  • meta (Hash)

    Per-table metadata with keys: :item_keys [Array<String>] - ordered list of item key identifiers :item_types [Array<Hash>] - type info per position ({ ‘data_type’ =>, ‘array_size’ => }) :stored_timestamp_item_keys [Hash] - { item_key => { column: col_name } } :calculated_positions [Hash] - { local_idx => { source: col_name, format: :seconds/:formatted } }

Returns:

  • (Hash)

    Entry hash with __type, item_key => value, __time, COSMOS_EXTRA



700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
# File 'lib/openc3/utilities/questdb_client.rb', line 700

def self.decode_item_row(row, sql_to_local, meta)
  num_sql_item_cols = sql_to_local.length

  entry = { "__type" => "ITEMS" }
  timestamp_values = {}
  time_ns = nil
  cosmos_extra = nil

  values = Array.new(meta[:item_keys].length)

  row.each_with_index do |tuple, sql_index|
    col_name = tuple[0]
    value = tuple[1]

    # Fixed columns come after item columns
    if sql_index >= num_sql_item_cols
      case col_name
      when 'PACKET_TIMESECONDS'
        time_ns = value.to_i
        timestamp_values['PACKET_TIMESECONDS'] = nsec_to_utc_time(time_ns)
      when 'RECEIVED_TIMESECONDS'
        timestamp_values['RECEIVED_TIMESECONDS'] = value if value
      when 'COSMOS_EXTRA'
        cosmos_extra = value
      # No else because we're only interested in these specific extra columns; others can be ignored
      end
      next
    end

    local_idx = sql_to_local[sql_index]

    # Track timestamp values from item columns
    if col_name == 'RECEIVED_TIMESECONDS'
      timestamp_values['RECEIVED_TIMESECONDS'] = value
    end

    next if value.nil?

    type_info = meta[:item_types][local_idx] || {}
    if meta[:stored_timestamp_item_keys].key?(meta[:item_keys][local_idx])
      ts_utc = coerce_to_utc(value)
      values[local_idx] = format_timestamp(ts_utc, :seconds) if ts_utc
    else
      values[local_idx] = decode_value(
        value,
        data_type: type_info['data_type'],
        array_size: type_info['array_size']
      )
    end
  end

  # Build ordered entry hash with calculated items in their natural position
  meta[:item_keys].each_with_index do |item_key, local_idx|
    if meta[:calculated_positions].key?(local_idx)
      calc_info = meta[:calculated_positions][local_idx]
      ts_value = timestamp_values[calc_info[:source]]
      next unless ts_value
      ts_utc = coerce_to_utc(ts_value)
      calculated_value = format_timestamp(ts_utc, calc_info[:format])
      entry[item_key] = calculated_value if calculated_value
    elsif !values[local_idx].nil?
      entry[item_key] = values[local_idx]
    end
  end

  entry['__time'] = time_ns if time_ns
  entry['COSMOS_EXTRA'] = cosmos_extra if cosmos_extra
  entry
end

.decode_packet_row(row, value_type, packet_def) ⇒ Hash

Decode a single row from a SELECT * packet query into an entry hash. Handles nanosecond timestamp CAST columns, value-type column preference, and type-aware decoding.

Parameters:

  • row (PG::Result row)

    Single row as iterable [col_name, value] pairs

  • value_type (Symbol)

    :RAW, :CONVERTED, :FORMATTED

  • packet_def (Hash, nil)

    Packet definition for type-aware decoding

Returns:

  • (Hash)

    Entry hash with item => value, __time, COSMOS_EXTRA, timestamp entries



778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
# File 'lib/openc3/utilities/questdb_client.rb', line 778

def self.decode_packet_row(row, value_type, packet_def)
  entry = {}
  item_defs = build_item_defs_map(packet_def)

  # First pass: build a hash of all columns for value-type preference lookups
  columns = {}
  row.each do |tuple|
    columns[tuple[0]] = tuple[1]
  end

  cosmos_timestamp_ns = nil
  received_timestamp_ns = nil

  # Second pass: process columns based on value_type
  row.each do |tuple|
    column_name = tuple[0]
    raw_value = tuple[1]

    if column_name == '__pkt_time_ns'
      cosmos_timestamp_ns = raw_value.to_i
      entry['__time'] = cosmos_timestamp_ns
      next
    end

    if column_name == '__rx_time_ns'
      received_timestamp_ns = raw_value.to_i
      next
    end

    # Skip PG timestamp versions - handled via CAST AS LONG columns above
    next if column_name == 'PACKET_TIMESECONDS'
    next if column_name == 'RECEIVED_TIMESECONDS'
    next if column_name == 'COSMOS_DATA_TAG'

    if column_name == 'COSMOS_EXTRA'
      entry['COSMOS_EXTRA'] = raw_value
      next
    end

    base_name = column_name.sub(/(__C|__F|__U)$/, '')
    item_def = item_defs[base_name]

    col_value_type = value_type_for_column_suffix(column_name)
    type_info = resolve_item_type(item_def, col_value_type)
    value = decode_value(raw_value, data_type: type_info['data_type'], array_size: type_info['array_size'])

    case value_type
    when :RAW
      next if column_name.end_with?('__C', '__F', '__U')
      entry[column_name] = value
    when :CONVERTED
      if column_name.end_with?('__C')
        entry[column_name.sub(/__C$/, '')] = value
      elsif !column_name.end_with?('__F', '__U') && !columns.key?("#{column_name}__C")
        entry[column_name] = value
      end
    when :FORMATTED
      if column_name.end_with?('__F')
        entry[column_name.sub(/__F$/, '')] = value
      elsif column_name.end_with?('__C') && !columns.key?("#{column_name.sub(/__C$/, '')}__F")
        entry[column_name.sub(/__C$/, '')] = value
      elsif !column_name.end_with?('__C', '__F', '__U') && !columns.key?("#{column_name}__F") && !columns.key?("#{column_name}__C")
        entry[column_name] = value
      end
    else
      raise QuestDBError.new("Unsupported value type for packet decoding: #{value_type}")
    end
  end

  add_timestamp_entries!(entry, cosmos_timestamp_ns, 'PACKET')
  add_timestamp_entries!(entry, received_timestamp_ns, 'RECEIVED')
  entry
end

.decode_reduced_row(row) ⇒ Hash

Decode a single row from a SAMPLE BY aggregation query. All non-timestamp columns are decoded as DOUBLE (aggregation results are always numeric).

Parameters:

  • row (PG::Result row)

    Single row as iterable [col_name, value] pairs

Returns:

  • (Hash)

    { col_name => decoded_value, ‘__time’ => ns_integer }



857
858
859
860
861
862
863
864
865
866
867
868
869
# File 'lib/openc3/utilities/questdb_client.rb', line 857

def self.decode_reduced_row(row)
  entry = {}
  row.each do |tuple|
    col_name = tuple[0]
    value = tuple[1]
    if col_name == 'PACKET_TIMESECONDS'
      entry['__time'] = value.to_i
    else
      entry[col_name] = decode_value(value, data_type: 'DOUBLE', array_size: nil)
    end
  end
  entry
end

.decode_value(value, data_type: nil, array_size: nil) ⇒ Object

Decode a value retrieved from QuestDB back to its original Ruby type.

QuestDB stores certain COSMOS types as encoded strings:

  • Arrays are JSON-encoded: “[1, 2, 3]” or ‘[“a”, “b”]’

  • Objects/Hashes are JSON-encoded: ‘“value”’

  • Binary data (BLOCK) is base64-encoded

  • Large integers (≥64-bit) are stored as VARCHAR strings

Parameters:

  • value (Object)

    The value to decode

  • data_type (String) (defaults to: nil)

    COSMOS data type (INT, UINT, FLOAT, STRING, BLOCK, DERIVED, etc.)

  • array_size (Integer, nil) (defaults to: nil)

    If not nil, indicates this is an array item

Returns:

  • (Object)

    The decoded value



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/openc3/utilities/questdb_client.rb', line 198

def self.decode_value(value, data_type: nil, array_size: nil)
  # Handle BigDecimal values from legacy QuestDB DECIMAL columns
  # (pre-existing tables may still use DECIMAL; new tables use VARCHAR)
  if value.is_a?(BigDecimal)
    return value.to_i if data_type == 'INT' || data_type == 'UINT'
    return value
  end

  # Decode float sentinel values back to inf/nan
  return decode_float_special_values(value) if value.is_a?(Float)

  # Non-strings don't need decoding (already handled by PG type mapping)
  return value unless value.is_a?(String)

  # Empty strings stay as empty strings
  return value if value.empty?

  # Handle based on data type if provided
  if data_type == 'BLOCK'
    begin
      return Base64.strict_decode64(value)
    rescue ArgumentError
      return value
    end
  end

  # Arrays are JSON-encoded
  if array_size
    begin
      return JSON.parse(value, allow_nan: true, create_additions: true)
    rescue JSON::ParserError
      return value
    end
  end

  # Integer values stored as VARCHAR strings (≥64-bit integers)
  if data_type == 'INT' || data_type == 'UINT'
    begin
      return Integer(value)
    rescue ArgumentError
      return value
    end
  end

  # DERIVED items with declared converted_type are stored as typed columns
  # (float, int, etc.) and will be returned as non-strings, handled above.
  # DERIVED items without declared type or with complex types (ARRAY, OBJECT, ANY)
  # are stored as VARCHAR and JSON-encoded.
  if data_type == 'DERIVED'
    begin
      return JSON.parse(value, allow_nan: true, create_additions: true)
    rescue JSON::ParserError
      # Could be a plain string from DERIVED with converted_type=STRING
      return value
    end
  end

  # No data_type provided - fall back to heuristic decoding
  if data_type.nil?
    first_char = value[0]
    # Try JSON for arrays/objects
    if first_char == '[' || first_char == '{'
      begin
        return JSON.parse(value, allow_nan: true, create_additions: true)
      rescue JSON::ParserError
        # Not valid JSON
      end
    # Try integer conversion for numeric strings
    elsif value =~ /\A-?\d+\z/
      begin
        return Integer(value)
      rescue ArgumentError
        # Not a valid integer
      end
    end
  end

  # Return as-is (STRING type or unknown)
  value
end

.disconnect(db_shard: nil) ⇒ Object

Reset the connection(s) for the current thread. Used after errors. If db_shard is nil, closes all db_shard connections. Otherwise closes only the specified db_shard.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/openc3/utilities/questdb_client.rb', line 107

def self.disconnect(db_shard: nil)
  conns = @thread_conns.value
  if db_shard.nil?
    conns.each_value do |conn|
      conn.finish if conn && !conn.finished?
    end
    @thread_conns.value = {}
  else
    conn = conns[db_shard]
    if conn && !conn.finished?
      conn.finish
    end
    conns.delete(db_shard)
    @thread_conns.value = conns
  end
end

.fetch_packet_def(target_name, packet_name, type: :TLM, scope: "DEFAULT") ⇒ Hash?

Fetch a packet definition from TargetModel, returning nil if not found.

Parameters:

  • target_name (String)

    Target name

  • packet_name (String)

    Packet name

  • type (Symbol) (defaults to: :TLM)

    :CMD or :TLM (default :TLM)

  • scope (String) (defaults to: "DEFAULT")

    Scope name

Returns:

  • (Hash, nil)

    Packet definition or nil



487
488
489
490
491
# File 'lib/openc3/utilities/questdb_client.rb', line 487

def self.fetch_packet_def(target_name, packet_name, type: :TLM, scope: "DEFAULT")
  TargetModel.packet(target_name, packet_name, type: type, scope: scope)
rescue RuntimeError
  nil
end

.find_item_def(packet_def, item_name) ⇒ Hash?

Find an item definition within a packet definition by name.

Parameters:

  • packet_def (Hash, nil)

    Packet definition from TargetModel.packet

  • item_name (String)

    Item name to find

Returns:

  • (Hash, nil)

    Item definition hash or nil if not found



306
307
308
309
310
311
312
# File 'lib/openc3/utilities/questdb_client.rb', line 306

def self.find_item_def(packet_def, item_name)
  return nil unless packet_def
  packet_def['items']&.each do |item|
    return item if item['name'] == item_name
  end
  nil
end

.format_timestamp(utc_time, format) ⇒ Float, ...

Format a UTC timestamp according to the specified format.

Parameters:

  • utc_time (Time)

    UTC timestamp

  • format (Symbol)

    :seconds for Unix seconds (float), :formatted for ISO 8601

Returns:

  • (Float, String, nil)

    Formatted timestamp or nil if utc_time is nil



427
428
429
430
431
432
433
434
435
436
437
# File 'lib/openc3/utilities/questdb_client.rb', line 427

def self.format_timestamp(utc_time, format)
  return nil unless utc_time
  case format
  when :seconds
    utc_time.to_f
  when :formatted
    utc_time.strftime('%Y-%m-%dT%H:%M:%S.%6NZ')
  else
    nil
  end
end

.hostname_for_db_shard(db_shard) ⇒ Object

Resolve the hostname for a given db_shard number. If OPENC3_TSDB_HOSTNAME contains “SHARDNUM”, it is replaced with the db_shard number. Otherwise, all db_shards connect to the same host (backward compatible).



44
45
46
# File 'lib/openc3/utilities/questdb_client.rb', line 44

def self.hostname_for_db_shard(db_shard)
  ENV['OPENC3_TSDB_HOSTNAME'].to_s.gsub("SHARDNUM", db_shard.to_s)
end

.nsec_to_utc_time(nsec) ⇒ Time

Convert a nanosecond integer timestamp to a UTC Time object.

Parameters:

  • nsec (Integer)

    Nanoseconds since epoch

Returns:

  • (Time)

    UTC Time object



382
383
384
385
# File 'lib/openc3/utilities/questdb_client.rb', line 382

def self.nsec_to_utc_time(nsec)
  return nil unless nsec
  Time.at(nsec / 1_000_000_000, nsec % 1_000_000_000, :nsec, in: '+00:00')
end

.paginate_query(query, page_size, label:, db_shard: 0) {|PG::Result| ... } ⇒ Object

Execute a paginated TSDB query, yielding each non-empty PG::Result page. Handles LIMIT pagination and retry on error.

Parameters:

  • query (String)

    Base SQL query (without LIMIT clause)

  • page_size (Integer)

    Number of rows per page

  • label (String)

    Label for log messages

Yields:

  • (PG::Result)

    Each page of results



625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
# File 'lib/openc3/utilities/questdb_client.rb', line 625

def self.paginate_query(query, page_size, label:, db_shard: 0)
  min = 0
  max = page_size
  loop do
    query_offset = "#{query} LIMIT #{min}, #{max}"
    Logger.debug("QuestDB #{label}: #{query_offset}")
    result = query_with_retry(query_offset, label: label, db_shard: db_shard)
    min += page_size
    max += page_size
    if result.nil? or result.ntuples == 0
      return
    else
      yield result
    end
  end
end

.pg_timestamp_to_utc(pg_time) ⇒ Time

Convert a PG timestamp to UTC. PG driver returns timestamps as naive Time objects that need UTC treatment. QuestDB stores timestamps in UTC, but the PG driver applies local timezone.

Parameters:

  • pg_time (Time)

    Timestamp from PG query result

Returns:

  • (Time)

    UTC timestamp



416
417
418
419
420
# File 'lib/openc3/utilities/questdb_client.rb', line 416

def self.pg_timestamp_to_utc(pg_time)
  return nil unless pg_time
  Time.utc(pg_time.year, pg_time.month, pg_time.day,
           pg_time.hour, pg_time.min, pg_time.sec, pg_time.usec)
end

.query_with_retry(query, params: [], max_retries: 5, label: nil, db_shard: 0) ⇒ PG::Result?

Execute a SQL query with automatic retry on connection errors. Handles PG connection management and retries up to max_retries times.

Parameters:

  • query (String)

    SQL query to execute

  • params (Array) (defaults to: [])

    Query parameters for parameterized queries (uses exec_params)

  • max_retries (Integer) (defaults to: 5)

    Maximum number of retry attempts (default 5)

  • label (String, nil) (defaults to: nil)

    Optional label for log messages

Returns:

  • (PG::Result, nil)

    Query result

Raises:

  • (RuntimeError)

    After exhausting retries



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/openc3/utilities/questdb_client.rb', line 356

def self.query_with_retry(query, params: [], max_retries: 5, label: nil, db_shard: 0)
  retry_count = 0
  begin
    conn = connection(db_shard: db_shard)
    if params.empty?
      return conn.exec(query)
    else
      return conn.exec_params(query, params)
    end
  rescue IOError, PG::Error => e
    retry_count += 1
    if retry_count > (max_retries - 1)
      raise QuestDBError.new("Error querying TSDB#{label ? " (#{label})" : ""}: #{e.message}")
    end
    Logger.warn("TSDB#{label ? " #{label}" : ""}: Retrying due to error: #{e.message}")
    Logger.warn("TSDB#{label ? " #{label}" : ""}: Last query: #{query}")
    disconnect(db_shard: db_shard)
    sleep 0.1
    retry
  end
end

.resolve_item_type(item_def, value_type) ⇒ Hash

Resolve the data_type and array_size for a QuestDB column based on the item definition and requested value type. This encapsulates the common logic for determining how to decode a value read from QuestDB.

Parameters:

  • item_def (Hash, nil)

    Item definition from packet definition

  • value_type (String)

    One of ‘RAW’, ‘CONVERTED’, ‘FORMATTED’

Returns:

  • (Hash)

    { ‘data_type’ => String|nil, ‘array_size’ => Integer|nil }



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/openc3/utilities/questdb_client.rb', line 321

def self.resolve_item_type(item_def, value_type)
  case value_type
  when 'FORMATTED', 'WITH_UNITS' # WITH_UNITS is deprecated
    { 'data_type' => 'STRING', 'array_size' => nil }
  when 'CONVERTED'
    if item_def
      rc = item_def['read_conversion']
      if rc && rc['converted_type']
        { 'data_type' => rc['converted_type'], 'array_size' => item_def['array_size'] }
      elsif item_def['states']
        { 'data_type' => 'STRING', 'array_size' => nil }
      else
        { 'data_type' => item_def['data_type'], 'array_size' => item_def['array_size'] }
      end
    else
      { 'data_type' => nil, 'array_size' => nil }
    end
  else # RAW or default
    if item_def
      { 'data_type' => item_def['data_type'], 'array_size' => item_def['array_size'] }
    else
      { 'data_type' => nil, 'array_size' => nil }
    end
  end
end

.sample_interval_for(stream_mode) ⇒ String

Returns the SAMPLE BY interval string for a given stream_mode symbol.

Parameters:

  • stream_mode (Symbol)

    :REDUCED_MINUTE, :REDUCED_HOUR, or :REDUCED_DAY

Returns:

  • (String)

    QuestDB SAMPLE BY interval string



593
594
595
596
597
598
599
600
# File 'lib/openc3/utilities/questdb_client.rb', line 593

def self.sample_interval_for(stream_mode)
  case stream_mode
  when :REDUCED_MINUTE then '1m'
  when :REDUCED_HOUR then '1h'
  when :REDUCED_DAY then '1d'
  else '1m'
  end
end

.sanitize_column_name(item_name) ⇒ String

Sanitize a column name for QuestDB. See questdb.com/docs/reference/api/ilp/advanced-settings/#name-restrictions

ILP protocol special characters that must be sanitized in column names

Parameters:

  • item_name (String)

    Item name

Returns:

  • (String)

    Sanitized column name



297
298
299
# File 'lib/openc3/utilities/questdb_client.rb', line 297

def self.sanitize_column_name(item_name)
  item_name.to_s.gsub(/[?\.,'"\\\/:\)\(\+=\-\*\%~;!@#\$\^&]/, '_')
end

.sanitize_table_name(target_name, packet_name, cmd_or_tlm = "TLM", scope: "DEFAULT") ⇒ String

Parameters:

  • target_name (String)

    Target name

  • packet_name (String)

    Packet name

  • cmd_or_tlm (String, Symbol) (defaults to: "TLM")

    “CMD” or “TLM” prefix (default “TLM”)

  • scope (String) (defaults to: "DEFAULT")

    Scope name (default “DEFAULT”)

Returns:

  • (String)

    Sanitized table name



287
288
289
# File 'lib/openc3/utilities/questdb_client.rb', line 287

def self.sanitize_table_name(target_name, packet_name, cmd_or_tlm = "TLM", scope: "DEFAULT")
  "#{scope}__#{cmd_or_tlm}__#{target_name}__#{packet_name}".gsub(/[?,'"\\\/:\)\(\+\*\%~]/, '_')
end

.table_has_data?(table_name, start_time, end_time, db_shard: 0) ⇒ Boolean

Returns true if the given TSDB table exists and has at least one row in the time range.

Parameters:

  • table_name (String)

    Sanitized table name

  • start_time (Integer)

    Nanosecond start time

  • end_time (Integer, nil)

    Nanosecond end time

Returns:

  • (Boolean)


608
609
610
611
612
613
614
615
616
# File 'lib/openc3/utilities/questdb_client.rb', line 608

def self.table_has_data?(table_name, start_time, end_time, db_shard: 0)
  query = "SELECT 1 FROM \"#{table_name}\""
  query += time_where_clause(start_time, end_time)
  query += " LIMIT 1"
  result = query_with_retry(query, max_retries: 1, label: "table_has_data", db_shard: db_shard)
  result && result.ntuples > 0
rescue RuntimeError
  false
end

.time_where_clause(start_time, end_time, prefix: '') ⇒ String

Build a SQL WHERE clause for PACKET_TIMESECONDS range filtering.

Parameters:

  • start_time (Integer, String)

    Start timestamp (nanoseconds)

  • end_time (Integer, String, nil)

    End timestamp (nanoseconds), or nil for open-ended

  • prefix (String) (defaults to: '')

    Table alias prefix (e.g., ‘T0.’) — default ”

Returns:

  • (String)

    SQL WHERE clause fragment (includes leading space)



474
475
476
477
478
# File 'lib/openc3/utilities/questdb_client.rb', line 474

def self.time_where_clause(start_time, end_time, prefix: '')
  where = " WHERE #{prefix}PACKET_TIMESECONDS >= #{start_time}"
  where += " AND #{prefix}PACKET_TIMESECONDS < #{end_time}" if end_time
  where
end

.tsdb_lookup(items, start_time:, end_time: nil, scope: "DEFAULT") ⇒ Array, Hash

Query historical telemetry data from QuestDB for a list of items. Builds the SQL query, executes it, and decodes all results. Supports cross-db_shard queries by grouping items by db_shard, executing separate queries per db_shard, and merging results positionally.

Parameters:

  • items (Array)

    Array of [target_name, packet_name, item_name, value_type, limits] item_name may be nil to indicate a placeholder (non-existent item)

  • start_time (String, Numeric)

    Start timestamp for the query

  • end_time (String, Numeric, nil) (defaults to: nil)

    End timestamp, or nil for “latest single row”

  • scope (String) (defaults to: "DEFAULT")

    Scope name

Returns:

  • (Array, Hash)

    Array of [value, limits_state] pairs per row, or {} if no results. Single-row results return a flat array; multi-row results return array of arrays.



883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
# File 'lib/openc3/utilities/questdb_client.rb', line 883

def self.tsdb_lookup(items, start_time:, end_time: nil, scope: "DEFAULT")
  # Group items by db_shard number while preserving their original positions
  db_shard_groups = {} # db_shard => { positions: [], items: [] }
  items.each_with_index do |item, pos|
    target_name = item[0]
    db_shard = db_shard_for_target(target_name, scope: scope)
    db_shard_groups[db_shard] ||= { positions: [], items: [] }
    db_shard_groups[db_shard][:positions] << pos
    db_shard_groups[db_shard][:items] << item
  end

  # Single-db_shard fast path (most common case)
  if db_shard_groups.length == 1
    db_shard, group = db_shard_groups.first
    return tsdb_lookup_single_db_shard(group[:items], start_time: start_time, end_time: end_time, scope: scope, db_shard: db_shard)
  end

  # Cross-db_shard: execute per-db_shard queries and merge results
  db_shard_results = {} # db_shard => data
  db_shard_groups.each do |db_shard, group|
    result = tsdb_lookup_single_db_shard(group[:items], start_time: start_time, end_time: end_time, scope: scope, db_shard: db_shard)
    db_shard_results[db_shard] = result
  end

  # If all db_shards returned empty, return empty
  return {} if db_shard_results.values.all? { |r| r == {} }

  # Merge results positionally back into the original item order.
  # For single-row results (no end_time), merge flat arrays.
  # For multi-row results, each db_shard may have different row counts;
  # use the maximum row count and fill missing positions with [nil, nil].
  if !end_time
    # Single-row mode: each db_shard returns a flat array of [value, limits] pairs.
    # Merge them into the original item order.
    merged = Array.new(items.length) { [nil, nil] }
    db_shard_groups.each do |db_shard, group|
      result = db_shard_results[db_shard]
      next if result == {} || !result.is_a?(Array)
      group[:positions].each_with_index do |orig_pos, db_shard_idx|
        merged[orig_pos] = result[db_shard_idx] if result[db_shard_idx]
      end
    end
    merged
  else
    # Multi-row mode: find max row count across db_shards
    max_rows = 0
    db_shard_groups.each do |db_shard, _group|
      result = db_shard_results[db_shard]
      next if result == {}
      count = result.is_a?(Array) ? result.length : 0
      max_rows = count if count > max_rows
    end
    return {} if max_rows == 0

    merged = Array.new(max_rows) { Array.new(items.length) { [nil, nil] } }
    db_shard_groups.each do |db_shard, group|
      result = db_shard_results[db_shard]
      next if result == {}
      rows = result.is_a?(Array) ? result : []
      rows.each_with_index do |row, row_num|
        next unless row.is_a?(Array)
        group[:positions].each_with_index do |orig_pos, db_shard_idx|
          merged[row_num][orig_pos] = row[db_shard_idx] if row[db_shard_idx]
        end
      end
    end
    merged
  end
end

.tsdb_lookup_single_db_shard(items, start_time:, end_time: nil, scope: "DEFAULT", db_shard: 0) ⇒ Object

Execute a tsdb_lookup query against a single db_shard. This contains the original ASOF JOIN logic for items all on the same QuestDB instance.



955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
# File 'lib/openc3/utilities/questdb_client.rb', line 955

def self.tsdb_lookup_single_db_shard(items, start_time:, end_time: nil, scope: "DEFAULT", db_shard: 0)
  tables = {}
  names = []
  nil_count = 0
  packet_cache = {}
  item_types = {}
  calculated_items = {}
  needed_timestamps = {}
  current_position = 0

  items.each do |item|
    target_name, packet_name, orig_item_name, value_type, limits = item
    if orig_item_name.nil?
      names << "PACKET_TIMESECONDS as __nil#{nil_count}"
      nil_count += 1
      current_position += 1
      next
    end
    table_name = sanitize_table_name(target_name, packet_name, scope: scope)
    tables[table_name] = 1
    index = tables.find_index {|k,_v| k == table_name }

    if STORED_TIMESTAMP_ITEMS.include?(orig_item_name)
      names << "\"T#{index}.#{orig_item_name}\""
      current_position += 1
      next
    end

    if TIMESTAMP_ITEMS.key?(orig_item_name)
      ts_info = TIMESTAMP_ITEMS[orig_item_name]
      calculated_items[current_position] = {
        source: ts_info[:source],
        format: ts_info[:format],
        table_index: index
      }
      needed_timestamps[index] ||= Set.new
      needed_timestamps[index] << ts_info[:source]
      current_position += 1
      next
    end

    safe_item_name = sanitize_column_name(orig_item_name)

    cache_key = [target_name, packet_name]
    unless packet_cache.key?(cache_key)
      packet_cache[cache_key] = fetch_packet_def(target_name, packet_name, scope: scope)
    end

    packet_def = packet_cache[cache_key]
    item_def = find_item_def(packet_def, orig_item_name)

    suffix = column_suffix_for_value_type(value_type)
    col_name = "T#{index}.#{safe_item_name}#{suffix}"
    names << "\"#{col_name}\""
    item_types[col_name] = resolve_item_type(item_def, value_type)
    current_position += 1
    if limits
      names << "\"T#{index}.#{safe_item_name}__L\""
    end
  end

  # Add needed timestamp columns to the SELECT for calculated items
  needed_timestamps.each do |table_index, ts_columns|
    ts_columns.each do |ts_col|
      names << "T#{table_index}.#{ts_col} as T#{table_index}___ts_#{ts_col}"
    end
  end

  # Build the SQL query
  query = "SELECT #{names.join(", ")} FROM "
  tables.each_with_index do |(table_name, _), index|
    if index == 0
      query += "\"#{table_name}\" as T#{index} "
    else
      query += "ASOF JOIN \"#{table_name}\" as T#{index} "
    end
  end
  query_params = []
  if start_time && !end_time
    query += "WHERE T0.PACKET_TIMESECONDS < $1 LIMIT -1"
    query_params << start_time
  elsif start_time && end_time
    query += "WHERE T0.PACKET_TIMESECONDS >= $1 AND T0.PACKET_TIMESECONDS < $2"
    query_params << start_time
    query_params << end_time
  end

  result = query_with_retry(query, params: query_params, label: "tsdb_lookup", db_shard: db_shard)
  if result.nil? or result.ntuples == 0
    return {}
  end

  data = []
  result.each_with_index do |tuples, row_num|
    data[row_num] ||= []
    row_index = 0
    row_timestamps = {}
    tuples.each do |tuple|
      col_name = tuple[0]
      col_value = tuple[1]
      if col_name.include?("__L")
        data[row_num][row_index - 1][1] = col_value
      elsif col_name =~ /^__nil/
        data[row_num][row_index] = [nil, nil]
        row_index += 1
      elsif col_name =~ /^T(\d+)___ts_(.+)$/
        table_idx = $1.to_i
        ts_source = $2
        row_timestamps["T#{table_idx}.#{ts_source}"] = col_value
      elsif col_name.end_with?('.PACKET_TIMESECONDS', '.RECEIVED_TIMESECONDS') || col_name == 'PACKET_TIMESECONDS' || col_name == 'RECEIVED_TIMESECONDS'
        ts_utc = coerce_to_utc(col_value)
        seconds_value = format_timestamp(ts_utc, :seconds)
        data[row_num][row_index] = [seconds_value, nil]
        row_index += 1
        if col_name.include?('.')
          row_timestamps[col_name] = col_value
        else
          row_timestamps["T0.#{col_name}"] = col_value
        end
      else
        type_info = item_types[col_name]
        unless type_info
          tables.length.times do |i|
            prefixed_name = "T#{i}.#{col_name}"
            type_info = item_types[prefixed_name]
            break if type_info
          end
          type_info ||= {}
        end
        decoded_value = decode_value(
          col_value,
          data_type: type_info['data_type'],
          array_size: type_info['array_size']
        )
        data[row_num][row_index] = [decoded_value, nil]
        row_index += 1
      end
    end

    calculated_items.keys.sort.each do |position|
      calc_info = calculated_items[position]
      ts_key = "T#{calc_info[:table_index]}.#{calc_info[:source]}"
      ts_value = row_timestamps[ts_key]
      ts_utc = coerce_to_utc(ts_value)
      calculated_value = format_timestamp(ts_utc, calc_info[:format])
      data[row_num].insert(position, [calculated_value, nil])
    end
  end
  if result.ntuples == 1
    data = data[0]
  end
  data
end

.value_type_for_column_suffix(column_name) ⇒ String

Determine the value type from a QuestDB column name’s suffix.

Parameters:

  • column_name (String)

    Column name possibly ending in __C, __F, __L

Returns:

  • (String)

    One of ‘FORMATTED’, ‘CONVERTED’, ‘RAW’



458
459
460
461
462
463
464
465
466
# File 'lib/openc3/utilities/questdb_client.rb', line 458

def self.value_type_for_column_suffix(column_name)
  if column_name.end_with?('__F')
    'FORMATTED'
  elsif column_name.end_with?('__C')
    'CONVERTED'
  else
    'RAW'
  end
end