Class: DataDrain::Record Abstract

Inherits:
Object
  • Object
show all
Extended by:
Observability
Includes:
ActiveModel::Attributes, ActiveModel::Model
Defined in:
lib/data_drain/record.rb

Overview

This class is abstract.

Subclasifica este modelo para cada tabla archivada.

Clase base que actúa como un ORM (Object-Relational Mapper) de solo lectura y purga para interactuar con el Data Lake en formato Parquet utilizando DuckDB.

Examples:

class ArchivedVersion < DataDrain::Record
  self.folder_name = 'versions'
  self.partition_keys = [:isp_id, :year, :month]
  attribute :event, :string
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connectionDuckDB::Connection

Retorna la conexión persistente a DuckDB en memoria para el hilo (Thread) actual. Esto previene tener que recargar extensiones (como httpfs) en cada consulta.

rubocop:disable Metrics/AbcSize

Returns:

  • (DuckDB::Connection)

    Conexión activa a DuckDB.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/data_drain/record.rb', line 49

def self.connection
  Thread.current[:data_drain_duckdb] ||= begin
    db = DuckDB::Database.open(":memory:")
    conn = db.connect

    config = DataDrain.configuration
    conn.query("SET max_memory='#{config.limit_ram}';") if config.limit_ram.present?
    conn.query("SET temp_directory='#{config.tmp_directory}'") if config.tmp_directory.present?

    DataDrain::Storage.adapter.setup_duckdb(conn)
    { db: db, conn: conn }
  end
  Thread.current[:data_drain_duckdb][:conn]
end

.destroy_all(**partitions) ⇒ Integer

Elimina físicamente los directorios o prefijos de S3.

Parameters:

  • partitions (Hash)

    Particiones a eliminar.

Returns:

  • (Integer)

    Cantidad de particiones físicas eliminadas.



108
109
110
111
112
113
114
# File 'lib/data_drain/record.rb', line 108

def self.destroy_all(**partitions)
  adapter = DataDrain::Storage.adapter
  @logger = DataDrain.configuration.logger
  safe_log(:info, "record.destroy_all", { folder: folder_name, partitions: partitions.inspect })

  adapter.destroy_partitions(bucket, folder_name, partition_keys, partitions)
end

.disconnect!void

This method returns an undefined value.

Cierra la conexión DuckDB del thread actual y limpia Thread.current. Idempotente: llamarlo varias veces no levanta.

Útil en middlewares de Sidekiq/Puma para evitar memory leak en threads de larga vida.



34
35
36
37
38
39
40
41
42
# File 'lib/data_drain/record.rb', line 34

def self.disconnect!
  entry = Thread.current[:data_drain_duckdb]
  Thread.current[:data_drain_duckdb] = nil
  return unless entry

  entry[:conn]&.close
  entry[:db]&.close
rescue StandardError # rubocop:disable Lint/SuppressedException
end

.find(id, **partitions) ⇒ DataDrain::Record?

Busca un registro específico por su ID. Implementa sanitización básica para prevenir Inyección SQL.

Parameters:

  • id (String, Integer)

    Identificador único del registro.

  • partitions (Hash)

    Pares clave-valor de las particiones donde buscar.

Returns:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/data_drain/record.rb', line 89

def self.find(id, **partitions)
  path = build_query_path(partitions)
  # Sanitización básica: duplicar comillas simples para anular escapes SQL
  safe_id = id.to_s.gsub("'", "''")

  sql = <<~SQL
    SELECT #{attribute_names.join(", ")}
    FROM read_parquet('#{path}')
    WHERE id = '#{safe_id}'
    LIMIT 1
  SQL

  execute_and_instantiate(sql, attribute_names).first
end

.where(limit: 50, **partitions) ⇒ Array<DataDrain::Record>

Consulta registros en el Data Lake filtrando por claves de partición.

Parameters:

  • limit (Integer) (defaults to: 50)

    Cantidad máxima de registros a retornar.

  • partitions (Hash)

    Pares clave-valor correspondientes a las particiones.

Returns:



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/data_drain/record.rb', line 70

def self.where(limit: 50, **partitions)
  path = build_query_path(partitions)

  sql = <<~SQL
    SELECT #{attribute_names.join(", ")}
    FROM read_parquet('#{path}')
    ORDER BY created_at DESC
    LIMIT #{limit}
  SQL

  execute_and_instantiate(sql, attribute_names)
end

Instance Method Details

#inspectString

Returns Representación legible en consola.

Returns:

  • (String)

    Representación legible en consola.



117
118
119
120
121
122
123
# File 'lib/data_drain/record.rb', line 117

def inspect
  inspection = attributes.map do |name, value|
    "#{name}: #{value.nil? ? "nil" : value.inspect}"
  end.compact.join(", ")

  "#<#{self.class} #{inspection}>"
end