Class: DataDrain::Record Abstract
- Inherits:
-
Object
- Object
- DataDrain::Record
- Extended by:
- Observability
- Includes:
- ActiveModel::Attributes, ActiveModel::Model
- Defined in:
- lib/data_drain/record.rb
Overview
Subclasifica este modelo para cada tabla archivada.
Clase base que actúa como un ORM (Object-Relational Mapper) de solo lectura y purga para interactuar con el Data Lake en formato Parquet utilizando DuckDB.
Constant Summary
Constants included from Observability
Observability::SENSITIVE_KEY_PATTERN
Class Method Summary collapse
-
.connection ⇒ DuckDB::Connection
Retorna la conexión persistente a DuckDB en memoria para el hilo (Thread) actual.
-
.destroy_all(**partitions) ⇒ Integer
Elimina físicamente los directorios o prefijos de S3.
-
.disconnect! ⇒ void
Cierra la conexión DuckDB del thread actual y limpia Thread.current.
-
.find(id, **partitions) ⇒ DataDrain::Record?
Busca un registro específico por su ID.
-
.where(limit: 50, **partitions) ⇒ Array<DataDrain::Record>
Consulta registros en el Data Lake filtrando por claves de partición.
Instance Method Summary collapse
-
#inspect ⇒ String
Representación legible en consola.
Class Method Details
.connection ⇒ DuckDB::Connection
Retorna la conexión persistente a DuckDB en memoria para el hilo (Thread) actual. Esto previene tener que recargar extensiones (como httpfs) en cada consulta.
rubocop:disable Metrics/AbcSize
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/data_drain/record.rb', line 50 def self.connection Thread.current[:data_drain_duckdb] ||= begin db = DuckDB::Database.open(":memory:") conn = db.connect config = DataDrain.configuration conn.query("SET max_memory='#{config.limit_ram}';") if config.limit_ram.present? conn.query("SET temp_directory='#{config.tmp_directory}'") if config.tmp_directory.present? DataDrain::Storage.adapter.setup_duckdb(conn) { db: db, conn: conn } end Thread.current[:data_drain_duckdb][:conn] end |
.destroy_all(**partitions) ⇒ Integer
Elimina físicamente los directorios o prefijos de S3.
109 110 111 112 113 114 115 |
# File 'lib/data_drain/record.rb', line 109 def self.destroy_all(**partitions) adapter = DataDrain::Storage.adapter @logger = DataDrain.configuration.logger safe_log(:info, "record.destroy_all", { folder: folder_name, partitions: partitions.inspect }) adapter.destroy_partitions(bucket, folder_name, partition_keys, partitions) end |
.disconnect! ⇒ void
This method returns an undefined value.
Cierra la conexión DuckDB del thread actual y limpia Thread.current. Idempotente: llamarlo varias veces no levanta.
Útil en middlewares de Sidekiq/Puma para evitar memory leak en threads de larga vida.
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/data_drain/record.rb', line 34 def self.disconnect! entry = Thread.current[:data_drain_duckdb] Thread.current[:data_drain_duckdb] = nil return unless entry entry[:conn]&.close entry[:db]&.close rescue StandardError nil end |
.find(id, **partitions) ⇒ DataDrain::Record?
Busca un registro específico por su ID. Implementa sanitización básica para prevenir Inyección SQL.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/data_drain/record.rb', line 90 def self.find(id, **partitions) path = build_query_path(partitions) # Sanitización básica: duplicar comillas simples para anular escapes SQL safe_id = id.to_s.gsub("'", "''") sql = <<~SQL SELECT #{attribute_names.join(", ")} FROM read_parquet('#{path}') WHERE id = '#{safe_id}' LIMIT 1 SQL execute_and_instantiate(sql, attribute_names).first end |
.where(limit: 50, **partitions) ⇒ Array<DataDrain::Record>
Consulta registros en el Data Lake filtrando por claves de partición.
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/data_drain/record.rb', line 71 def self.where(limit: 50, **partitions) path = build_query_path(partitions) sql = <<~SQL SELECT #{attribute_names.join(", ")} FROM read_parquet('#{path}') ORDER BY created_at DESC LIMIT #{limit} SQL execute_and_instantiate(sql, attribute_names) end |
Instance Method Details
#inspect ⇒ String
Returns Representación legible en consola.
118 119 120 121 122 123 124 |
# File 'lib/data_drain/record.rb', line 118 def inspect inspection = attributes.map do |name, value| "#{name}: #{value.nil? ? "nil" : value.inspect}" end.compact.join(", ") "#<#{self.class} #{inspection}>" end |