DataDrain
Micro-framework Ruby para extraer, archivar y purgar datos históricos de PostgreSQL hacia un Data Lake (S3 o disco local) en formato Parquet, usando DuckDB en memoria.
Características
- ETL de alto rendimiento: millones de registros de Postgres a Parquet sin cargar objetos en RAM Ruby.
- File ingestion: convierte CSV, JSON o Parquet locales a Parquet (ZSTD) particionado y los sube a S3.
- Hive partitioning: organiza archivos en
key=val/key=val/...para prefix scans eficientes. - Storage adapters: soporte transparente para disco local y AWS S3.
- Integridad garantizada: verificación matemática Postgres vs Parquet antes de cualquier
DELETE. - ORM analítico: clase base
DataDrain::Record(compatibleActiveModel) para consultar y purgar particiones históricas. - Observabilidad estructurada: logs
key=valuecompatibles con Datadog, CloudWatch yexis_ray. Fallos del logger nunca interrumpen el flujo principal.
Instalación
# Gemfile
gem 'data_drain', git: 'https://github.com/gedera/data_drain.git', branch: 'main'
bundle install
Configuración
# config/initializers/data_drain.rb
DataDrain.configure do |config|
config.storage_mode = ENV.fetch('STORAGE_MODE', 'local').to_sym # :local o :s3
# AWS S3 (solo si storage_mode == :s3)
config.aws_region = ENV['AWS_REGION']
config.aws_access_key_id = ENV['AWS_ACCESS_KEY_ID']
config.aws_secret_access_key = ENV['AWS_SECRET_ACCESS_KEY']
# PostgreSQL origen (solo para Engine)
config.db_host = ENV.fetch('DB_HOST', '127.0.0.1')
config.db_port = ENV.fetch('DB_PORT', '5432')
config.db_user = ENV.fetch('DB_USER', 'postgres')
config.db_pass = ENV.fetch('DB_PASS', '')
config.db_name = ENV.fetch('DB_NAME', 'core_production')
# Tuning de purga
config.batch_size = 5000 # registros por DELETE
config.throttle_delay = 0.5 # segundos entre lotes
config.idle_in_transaction_session_timeout = 0 # 0 = DESACTIVADO (mandatorio en purgas masivas)
# Tuning de DuckDB
config.limit_ram = '2GB' # evita OOM en contenedores
config.tmp_directory = '/tmp/duckdb_work' # spill-to-disk (preferir SSD/NVMe)
config.logger = Rails.logger
end
Uso
Ingesta de archivos crudos (FileIngestor)
DataDrain::FileIngestor.new(
bucket: 'my-bucket-store',
source_path: '/tmp/netflow_metrics.csv',
folder_name: 'netflow',
partition_keys: %w[isp_id year month],
select_sql: "*, EXTRACT(YEAR FROM timestamp) AS year, EXTRACT(MONTH FROM timestamp) AS month",
delete_after_upload: true
).call
Extracción y purga (Engine)
Ventanas rodantes de retención: archivar 6 meses atrás y purgar el origen.
DataDrain::Engine.new(
bucket: 'my-bucket-store',
start_date: 6.months.ago.beginning_of_month,
end_date: 6.months.ago.end_of_month,
table_name: 'versions',
partition_keys: %w[year month]
).call
Modo skip_export (delegar export a Glue/EMR)
DataDrain solo verifica integridad y purga; el export ya lo hizo otra herramienta.
DataDrain::Engine.new(
bucket: 'my-bucket-store',
start_date: 6.months.ago.beginning_of_month,
end_date: 6.months.ago.end_of_month,
table_name: 'versions',
partition_keys: %w[year month],
skip_export: true
).call
Orquestación con AWS Glue (tablas 1TB+)
DataDrain::GlueRunner.run_and_wait(
"my-glue-export-job",
{
"--start_date" => start_date.to_fs(:db),
"--end_date" => end_date.to_fs(:db),
"--s3_bucket" => bucket,
"--s3_folder" => table,
"--db_url" => "jdbc:postgresql://#{config.db_host}:#{config.db_port}/#{config.db_name}",
"--db_user" => config.db_user,
"--db_password" => config.db_pass,
"--db_table" => table,
"--partition_by" => "isp_id,year,month"
}
)
DataDrain::Engine.new(
bucket:, folder_name: table, start_date:, end_date:,
table_name: table, partition_keys: %w[isp_id year month],
skip_export: true
).call
Consultar el Data Lake (Record)
class ArchivedVersion < DataDrain::Record
self.bucket = 'my-bucket-storage'
self.folder_name = 'versions'
self.partition_keys = [:isp_id, :year, :month] # orden = jerarquía Hive
attribute :id, :string
attribute :item_type, :string
attribute :event, :string
attribute :created_at, :datetime
attribute :object, :json
attribute :object_changes, :json
end
# Búsqueda puntual aislando la partición exacta
ArchivedVersion.find("uuid", isp_id: 42, year: 2026, month: 3)
# Colecciones
ArchivedVersion.where(limit: 10, isp_id: 42, year: 2026, month: 3)
# Eliminación (retención y cumplimiento)
ArchivedVersion.destroy_all(isp_id: 42) # todo el historial de un cliente
ArchivedVersion.destroy_all(year: 2024, month: 3) # un mes globalmente
Convenciones críticas
- Rangos de fecha semi-abiertos: siempre
created_at >= START AND created_at < END_BOUNDARY. Nunca<= end_of_day. - Orden de
partition_keys: debe coincidir entre escritura (Engine/FileIngestor) y lectura (Record). Mismatch → DuckDB devuelve vacío sin error. - Cambiar
storage_modeen runtime: llamarDataDrain::Storage.reset_adapter!después. verify_integrityes la única salvaguarda antes de purgar. Si falla, el flujo retornafalsey aborta elDELETE.
Observabilidad
component=data_drain event=engine.complete table=versions duration_s=12.4 export_duration_s=8.1 purge_duration_s=3.9 count=150000
component=data_drain event=engine.purge_heartbeat table=versions batches_processed_count=100 rows_deleted_count=500000
component=data_drain event=glue_runner.failed job=my-export-job run_id=jr_abc123 status=FAILED duration_s=301.0
Formato key=value. Tiempos con sufijo _s (Float). Contadores con _count (Integer). Sin unidades en valores. Fallos internos del logger nunca interrumpen el flujo principal.
Contribuir
bundle install
bundle exec rspec # tests
bundle exec rubocop # linting
bin/console # REPL
Licencia
MIT.