Class: DataDrain::Storage::S3

Inherits:
Base
  • Object
show all
Defined in:
lib/data_drain/storage/s3.rb

Overview

Implementación del adaptador de almacenamiento para Amazon S3.

Instance Attribute Summary

Attributes inherited from Base

#config

Instance Method Summary collapse

Methods inherited from Base

#initialize, #prepare_export_path

Constructor Details

This class inherits a constructor from DataDrain::Storage::Base

Instance Method Details

#build_path(bucket, folder_name, partition_path) ⇒ String

Parameters:

  • bucket (String)
  • folder_name (String)
  • partition_path (String, nil)

Returns:

  • (String)


20
21
22
23
24
25
# File 'lib/data_drain/storage/s3.rb', line 20

def build_path(bucket, folder_name, partition_path)
  # En S3, el base_path actúa como el nombre del bucket
  base = File.join(bucket, folder_name)
  base = File.join(base, partition_path) if partition_path && !partition_path.empty?
  "s3://#{base}/**/*.parquet"
end

#destroy_partitions(bucket, folder_name, partition_keys, partitions) ⇒ Integer

Parameters:

  • bucket (String)
  • folder_name (String)
  • partition_keys (Array<Symbol>)
  • partitions (Hash)

Returns:

  • (Integer)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/data_drain/storage/s3.rb', line 32

def destroy_partitions(bucket, folder_name, partition_keys, partitions)
  client = Aws::S3::Client.new(
    region: @config.aws_region,
    access_key_id: @config.aws_access_key_id,
    secret_access_key: @config.aws_secret_access_key
  )

  regex_parts = partition_keys.map do |key|
    val = partitions[key]
    val.nil? || val.to_s.empty? ? "#{key}=[^/]+" : "#{key}=#{val}"
  end
  pattern_regex = Regexp.new("^#{folder_name}/#{regex_parts.join('/')}")

  objects_to_delete = []
  prefix = "#{folder_name}/"
  first_key = partition_keys.first
  prefix += "#{first_key}=#{partitions[first_key]}/" if partitions[first_key]

  client.list_objects_v2(bucket: bucket, prefix: prefix).each do |response|
    response.contents.each do |obj|
      objects_to_delete << { key: obj.key } if obj.key.match?(pattern_regex)
    end
  end

  delete_in_batches(client, bucket, objects_to_delete)
end

#setup_duckdb(connection) ⇒ Object

Carga la extensión httpfs en DuckDB e inyecta las credenciales de AWS.

Parameters:

  • connection (DuckDB::Connection)


9
10
11
12
13
14
# File 'lib/data_drain/storage/s3.rb', line 9

def setup_duckdb(connection)
  connection.query("INSTALL httpfs; LOAD httpfs;")
  connection.query("SET s3_region='#{@config.aws_region}';")
  connection.query("SET s3_access_key_id='#{@config.aws_access_key_id}';")
  connection.query("SET s3_secret_access_key='#{@config.aws_secret_access_key}';")
end