Class: DataDrain::Record Abstract

Inherits:
Object
  • Object
show all
Includes:
ActiveModel::Attributes, ActiveModel::Model
Defined in:
lib/data_drain/record.rb

Overview

This class is abstract.

Subclasifica este modelo para cada tabla archivada.

Clase base que actúa como un ORM (Object-Relational Mapper) de solo lectura y purga para interactuar con el Data Lake en formato Parquet utilizando DuckDB.

Examples:

class ArchivedVersion < DataDrain::Record
  self.folder_name = 'versions'
  self.partition_keys = [:year, :month, :isp_id]
  attribute :event, :string
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connectionDuckDB::Connection

Retorna la conexión persistente a DuckDB en memoria para el hilo (Thread) actual. Esto previene tener que recargar extensiones (como httpfs) en cada consulta.

Returns:

  • (DuckDB::Connection)

    Conexión activa a DuckDB.



29
30
31
32
33
34
35
36
# File 'lib/data_drain/record.rb', line 29

def self.connection
  Thread.current[:data_drain_duckdb_conn] ||= begin
    db = DuckDB::Database.open(":memory:")
    conn = db.connect
    DataDrain::Storage.adapter.setup_duckdb(conn)
    conn
  end
end

.destroy_all(**partitions) ⇒ Integer

Elimina físicamente los directorios o prefijos de S3.

Parameters:

  • partitions (Hash)

    Particiones a eliminar.

Returns:

  • (Integer)

    Cantidad de particiones físicas eliminadas.



81
82
83
84
85
86
# File 'lib/data_drain/record.rb', line 81

def self.destroy_all(**partitions)
  adapter = DataDrain::Storage.adapter
  DataDrain.configuration.logger.info "[DataDrain] 🗑️ Ejecutando destroy_all en #{folder_name} con: #{partitions.inspect}"

  adapter.destroy_partitions(bucket, folder_name, partition_keys, partitions)
end

.find(id, **partitions) ⇒ DataDrain::Record?

Busca un registro específico por su ID. Implementa sanitización básica para prevenir Inyección SQL.

Parameters:

  • id (String, Integer)

    Identificador único del registro.

  • partitions (Hash)

    Pares clave-valor de las particiones donde buscar.

Returns:



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/data_drain/record.rb', line 62

def self.find(id, **partitions)
  path = build_query_path(partitions)
  # Sanitización básica: duplicar comillas simples para anular escapes SQL
  safe_id = id.to_s.gsub("'", "''")

  sql = <<~SQL
    SELECT #{attribute_names.join(', ')}
    FROM read_parquet('#{path}')
    WHERE id = '#{safe_id}'
    LIMIT 1
  SQL

  execute_and_instantiate(sql, attribute_names).first
end

.where(limit: 50, **partitions) ⇒ Array<DataDrain::Record>

Consulta registros en el Data Lake filtrando por claves de partición.

Parameters:

  • limit (Integer) (defaults to: 50)

    Cantidad máxima de registros a retornar.

  • partitions (Hash)

    Pares clave-valor correspondientes a las particiones.

Returns:



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/data_drain/record.rb', line 43

def self.where(limit: 50, **partitions)
  path = build_query_path(partitions)

  sql = <<~SQL
    SELECT #{attribute_names.join(', ')}
    FROM read_parquet('#{path}')
    ORDER BY created_at DESC
    LIMIT #{limit}
  SQL

  execute_and_instantiate(sql, attribute_names)
end

Instance Method Details

#inspectString

Returns Representación legible en consola.

Returns:

  • (String)

    Representación legible en consola.



89
90
91
92
93
94
95
# File 'lib/data_drain/record.rb', line 89

def inspect
  inspection = attributes.map do |name, value|
    "#{name}: #{value.nil? ? 'nil' : value.inspect}"
  end.compact.join(", ")

  "#<#{self.class} #{inspection}>"
end