Class: Multiwoven::Integrations::Destination::Sftp::Client
- Inherits:
-
DestinationConnector
- Object
- DestinationConnector
- Multiwoven::Integrations::Destination::Sftp::Client
- Includes:
- Core::Fullrefresher, Core::RateLimiter
- Defined in:
- lib/multiwoven/integrations/destination/sftp/client.rb
Overview
rubocop:disable Metrics/ClassLength
Instance Method Summary collapse
- #check_connection(connection_config) ⇒ Object
- #clear_all_records(sync_config) ⇒ Object
- #discover(_connection_config = nil) ⇒ Object
- #write(sync_config, records, _action = "insert") ⇒ Object
- #write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
- #write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
Instance Method Details
#check_connection(connection_config) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/multiwoven/integrations/destination/sftp/client.rb', line 10 def check_connection(connection_config) connection_config = connection_config.with_indifferent_access with_sftp_client(connection_config) do |sftp| stream = SecureRandom.uuid test_path = "#{connection_config[:destination_path]}/#{stream}" test_file_operations(sftp, test_path) return success_status end rescue StandardError => e handle_exception("SFTP:CHECK_CONNECTION:EXCEPTION", "error", e) failure_status(e) end |
#clear_all_records(sync_config) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/multiwoven/integrations/destination/sftp/client.rb', line 96 def clear_all_records(sync_config) connection_specification = sync_config.destination.connection_specification.with_indifferent_access with_sftp_client(connection_specification) do |sftp| files = sftp.dir.glob(connection_specification[:destination_path], "*") files.each do |file| sftp.remove!(File.join(connection_specification[:destination_path], file.name)) end return ("Successfully cleared data.", "succeeded") if sftp.dir.entries(connection_specification[:destination_path]).size <= 2 return ("Failed to clear data.", "failed") end rescue StandardError => e (e., "failed") end |
#discover(_connection_config = nil) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/multiwoven/integrations/destination/sftp/client.rb', line 23 def discover(_connection_config = nil) catalog_json = read_json(CATALOG_SPEC_PATH) catalog = build_catalog(catalog_json) catalog. rescue StandardError => e handle_exception( "SFTP:DISCOVER:EXCEPTION", "error", e ) end |
#write(sync_config, records, _action = "insert") ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/multiwoven/integrations/destination/sftp/client.rb', line 37 def write(sync_config, records, _action = "insert") connection_config = sync_config.destination.connection_specification.with_indifferent_access file_path = generate_file_path(sync_config) local_file_name = generate_local_file_name(sync_config) csv_content = generate_csv_content(records) records_size = records.size write_success = 0 case connection_config[:format][:compression_type] when CompressionType.enum("zip") write_success = write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) when CompressionType.enum("un_compressed") write_success = write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) else raise ArgumentError, "Unsupported compression type: #{connection_config[:format][:compression_type]}" end write_failure = records.size - write_success (write_success, write_failure) rescue StandardError => e handle_exception( "SFTP:WRITE:EXCEPTION", "error", e ) end |
#write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/multiwoven/integrations/destination/sftp/client.rb', line 63 def write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) write_success = 0 Tempfile.create([local_file_name, ".zip"]) do |tempfile| Zip::File.open(tempfile.path, Zip::File::CREATE) do |zipfile| zipfile.get_output_stream("#{local_file_name}.csv") { |f| f.write(csv_content) } end with_sftp_client(connection_config) do |sftp| sftp.upload!(tempfile.path, file_path) write_success = records_size rescue StandardError => e handle_exception("SFTP:RECORD:WRITE:EXCEPTION", "error", e) write_success = 0 end end write_success end |
#write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/multiwoven/integrations/destination/sftp/client.rb', line 80 def write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) write_success = 0 Tempfile.create([local_file_name, ".csv"]) do |tempfile| tempfile.write(csv_content) tempfile.close with_sftp_client(connection_config) do |sftp| sftp.upload!(tempfile.path, file_path) write_success = records_size rescue StandardError => e handle_exception("SFTP:RECORD:WRITE:EXCEPTION", "error", e) write_success = 0 end end write_success end |