Class: DataDrain::Record Abstract
- Inherits:
-
Object
- Object
- DataDrain::Record
- Extended by:
- Observability
- Includes:
- ActiveModel::Attributes, ActiveModel::Model
- Defined in:
- lib/data_drain/record.rb
Overview
Subclasifica este modelo para cada tabla archivada.
Clase base que actúa como un ORM (Object-Relational Mapper) de solo lectura y purga para interactuar con el Data Lake en formato Parquet utilizando DuckDB.
Constant Summary
Constants included from Observability
Observability::SENSITIVE_KEY_PATTERN
Class Method Summary collapse
-
.connection ⇒ DuckDB::Connection
Retorna la conexión persistente a DuckDB en memoria para el hilo (Thread) actual.
-
.destroy_all(**partitions) ⇒ Integer
Elimina físicamente los directorios o prefijos de S3.
-
.disconnect! ⇒ void
Cierra la conexión DuckDB del thread actual y limpia Thread.current.
-
.find(id, **partitions) ⇒ DataDrain::Record?
Busca un registro específico por su ID.
-
.where(limit: 50, **partitions) ⇒ Array<DataDrain::Record>
Consulta registros en el Data Lake filtrando por claves de partición.
Instance Method Summary collapse
-
#inspect ⇒ String
Representación legible en consola.
Class Method Details
.connection ⇒ DuckDB::Connection
Retorna la conexión persistente a DuckDB en memoria para el hilo (Thread) actual. Esto previene tener que recargar extensiones (como httpfs) en cada consulta.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/data_drain/record.rb', line 49 def self.connection Thread.current[:data_drain_duckdb] ||= begin db = DuckDB::Database.open(":memory:") conn = db.connect config = DataDrain.configuration conn.query("SET max_memory='#{config.limit_ram}';") if config.limit_ram.present? conn.query("SET temp_directory='#{config.tmp_directory}'") if config.tmp_directory.present? DataDrain::Storage.adapter.setup_duckdb(conn) conn.query("SET lock_configuration=true;") { db: db, conn: conn } end Thread.current[:data_drain_duckdb][:conn] end |
.destroy_all(**partitions) ⇒ Integer
Elimina físicamente los directorios o prefijos de S3.
110 111 112 113 114 115 116 |
# File 'lib/data_drain/record.rb', line 110 def self.destroy_all(**partitions) adapter = DataDrain::Storage.adapter @logger = DataDrain.configuration.logger safe_log(:info, "record.destroy_all", { folder: folder_name, partitions: partitions.inspect }) adapter.destroy_partitions(bucket, folder_name, partition_keys, partitions) end |
.disconnect! ⇒ void
This method returns an undefined value.
Cierra la conexión DuckDB del thread actual y limpia Thread.current. Idempotente: llamarlo varias veces no levanta.
Útil en middlewares de Sidekiq/Puma para evitar memory leak en threads de larga vida.
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/data_drain/record.rb', line 34 def self.disconnect! entry = Thread.current[:data_drain_duckdb] Thread.current[:data_drain_duckdb] = nil return unless entry entry[:conn]&.close entry[:db]&.close rescue StandardError nil end |
.find(id, **partitions) ⇒ DataDrain::Record?
Busca un registro específico por su ID. Implementa sanitización básica para prevenir Inyección SQL.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/data_drain/record.rb', line 91 def self.find(id, **partitions) path = build_query_path(partitions) # Sanitización básica: duplicar comillas simples para anular escapes SQL safe_id = id.to_s.gsub("'", "''") sql = <<~SQL SELECT #{attribute_names.join(", ")} FROM read_parquet('#{path}') WHERE id = '#{safe_id}' LIMIT 1 SQL execute_and_instantiate(sql, attribute_names).first end |
.where(limit: 50, **partitions) ⇒ Array<DataDrain::Record>
Consulta registros en el Data Lake filtrando por claves de partición.
72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/data_drain/record.rb', line 72 def self.where(limit: 50, **partitions) path = build_query_path(partitions) sql = <<~SQL SELECT #{attribute_names.join(", ")} FROM read_parquet('#{path}') ORDER BY created_at DESC LIMIT #{limit} SQL execute_and_instantiate(sql, attribute_names) end |
Instance Method Details
#inspect ⇒ String
Returns Representación legible en consola.
119 120 121 122 123 124 125 |
# File 'lib/data_drain/record.rb', line 119 def inspect inspection = attributes.map do |name, value| "#{name}: #{value.nil? ? "nil" : value.inspect}" end.compact.join(", ") "#<#{self.class} #{inspection}>" end |