Module: IOStreams

Defined in:
lib/iostreams.rb,
lib/io_streams/pgp.rb,
lib/io_streams/path.rb,
lib/io_streams/utils.rb,
lib/io_streams/errors.rb,
lib/io_streams/reader.rb,
lib/io_streams/stream.rb,
lib/io_streams/writer.rb,
lib/io_streams/builder.rb,
lib/io_streams/tabular.rb,
lib/io_streams/version.rb,
lib/io_streams/paths/s3.rb,
lib/io_streams/io_streams.rb,
lib/io_streams/paths/file.rb,
lib/io_streams/paths/http.rb,
lib/io_streams/paths/sftp.rb,
lib/io_streams/pgp/reader.rb,
lib/io_streams/pgp/writer.rb,
lib/io_streams/row/reader.rb,
lib/io_streams/row/writer.rb,
lib/io_streams/zip/reader.rb,
lib/io_streams/zip/writer.rb,
lib/io_streams/gzip/reader.rb,
lib/io_streams/gzip/writer.rb,
lib/io_streams/line/reader.rb,
lib/io_streams/line/writer.rb,
lib/io_streams/xlsx/reader.rb,
lib/io_streams/bzip2/reader.rb,
lib/io_streams/bzip2/writer.rb,
lib/io_streams/encode/reader.rb,
lib/io_streams/encode/writer.rb,
lib/io_streams/paths/matcher.rb,
lib/io_streams/record/reader.rb,
lib/io_streams/record/writer.rb,
lib/io_streams/tabular/header.rb,
lib/io_streams/tabular/parser/csv.rb,
lib/io_streams/tabular/parser/psv.rb,
lib/io_streams/tabular/parser/base.rb,
lib/io_streams/tabular/parser/hash.rb,
lib/io_streams/tabular/parser/json.rb,
lib/io_streams/tabular/parser/array.rb,
lib/io_streams/tabular/parser/fixed.rb,
lib/io_streams/symmetric_encryption/reader.rb,
lib/io_streams/symmetric_encryption/writer.rb

Overview

Streaming library for Ruby

Stream types / extensions supported:

.zip       Zip File                                   [ :zip ]
.gz, .gzip GZip File                                  [ :gzip ]
.enc       File Encrypted using symmetric encryption  [ :enc ]
etc...
other      All other extensions will be returned as:  []

When a file is encrypted, it may also be compressed:

.zip.enc  [ :zip, :enc ]
.gz.enc   [ :gz,  :enc ]

Defined Under Namespace

Modules: Bzip2, Encode, Errors, Gzip, Line, Paths, Pgp, Record, Row, SymmetricEncryption, Utils, Xlsx, Zip Classes: Builder, Extension, Path, Reader, Stream, Tabular, Writer

Constant Summary collapse

VERSION =
"2.0.0".freeze

Class Method Summary collapse

Class Method Details

.add_root(root, *elements, **args) ⇒ Object

Add a named root path

Raises:

  • (ArgumentError)


234
235
236
237
238
# File 'lib/io_streams/io_streams.rb', line 234

def self.add_root(root, *elements, **args)
  raise(ArgumentError, "Invalid characters in root name #{root.inspect}") unless root.to_s =~ /\A\w+\Z/

  @root_paths[root.to_sym] = path(*elements, **args)
end

.deregister_extension(extension) ⇒ Object

De-Register a file extension

Returns [Symbol] the extension removed, or nil if the extension was not registered

Example:

deregister_extension(:xls)

Raises:

  • (ArgumentError)


296
297
298
299
300
# File 'lib/io_streams/io_streams.rb', line 296

def self.deregister_extension(extension)
  raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.to_s =~ /\A\w+\Z/

  @extensions.delete(extension.to_sym)
end

.each_child(pattern, case_sensitive: false, directories: false, hidden: false, &block) ⇒ Object

Yields Paths within the current path.

Examples:

# Return all children in a complete path: IOStreams.each_child(“/exports/files/customer/*”) { |path| puts path }

# Return all children in a complete path on S3: IOStreams.each_child(“s3://my_bucket/exports/files/customer/*”) { |path| puts path }

# Case Insensitive file name lookup: IOStreams.each_child(“/exports/files/customer/R*”) { |path| puts path }

# Case Sensitive file name lookup: IOStreams.each_child(“/exports/files/customer/R*”, case_sensitive: true) { |path| puts path }

# Case Insensitive recursive file name lookup: IOStreams.each_child(“source_files/*/fast.rb”) { |name| puts name }

Parameters:

pattern [String]
  The pattern is not a regexp, it is a string that may contain the following metacharacters:
  `*`      Matches all regular files.
  `c*`     Matches all regular files beginning with `c`.
  `*c`     Matches all regular files ending with `c`.
  `*c*`    Matches all regular files that have `c` in them.

  `**`     Matches recursively into subdirectories.

  `?`      Matches any one character.

  `[set]`  Matches any one character in the supplied `set`.
  `[^set]` Does not matches any one character in the supplied `set`.

  `\`      Escapes the next metacharacter.

  `{a,b}`  Matches on either pattern `a` or pattern `b`.

case_sensitive [true|false]
  Whether the pattern is case-sensitive.

directories [true|false]
  Whether to yield directory names.

hidden [true|false]
  Whether to yield hidden paths.

Examples:

Pattern: File name: match? Reason Options

================ ====== ============================= ===========================

“cat” “cat” true # Match entire string “cat” “category” false # Only match partial string

“cat,ubs” “cats” true # { } is supported

“c?t” “cat” true # “?” match only 1 character “c??t” “cat” false # ditto “c*” “cats” true # “*” match 0 or more characters “c*t” “c/a/b/t” true # ditto “ca” “cat” true # inclusive bracket expression “ca” “cat” false # exclusive bracket expression (“^” or “!”)

“cat” “CAT” false # case sensitive false “cat” “CAT” true # case insensitive

“?” “?” true # escaped wildcard becomes ordinary “a” “a” true # escaped ordinary remains ordinary “[?]” “?” true # can escape inside bracket expression

“*” “.profile” false # wildcard doesn’t match leading period by default “*” “.profile” true # unless hidden is enabled true “.*” “.profile” true # leading period is explicit

*/.rb” “main.rb” false “*/.rb” “./main.rb” false “*/.rb” “lib/song.rb” true “**.rb” “main.rb” true “**.rb” “./main.rb” false “**.rb” “lib/song.rb” true “*” “dave/.profile” true



217
218
219
220
221
222
223
224
225
226
# File 'lib/io_streams/io_streams.rb', line 217

def self.each_child(pattern, case_sensitive: false, directories: false, hidden: false, &block)
  matcher = Paths::Matcher.new(nil, pattern, case_sensitive: case_sensitive, hidden: hidden)

  # When the pattern includes an exact file name without any pattern characters
  if matcher.pattern.nil?
    block.call(matcher.path) if matcher.path.exist?
    return
  end
  matcher.path.each_child(matcher.pattern, case_sensitive: case_sensitive, directories: directories, hidden: hidden, &block)
end

.extensionsObject

Registered file extensions



303
304
305
# File 'lib/io_streams/io_streams.rb', line 303

def self.extensions
  @extensions.dup
end

.home(username = nil) ⇒ Object

Returns [IOStreams::Paths::File] current or named users home path



127
128
129
# File 'lib/io_streams/io_streams.rb', line 127

def self.home(username = nil)
  IOStreams::Paths::File.new(Dir.home(username))
end

.join(*elements, root: :default) ⇒ Object

Join the supplied path elements to a root path.

Roots allow paths to reference a particular root directory, so that all path names are appended to that root. Use ‘IOStreams.join` instead of `IOStreams.path` so that the exact same code can run in production and development, yet use completely different data sources in each. For example, in production the root can point to an S3 bucket, while in development it points to the local file system.

Roots are configured via an initializer at startup. Multiple roots can be setup, for example one for input files, another for output files, another for reports, etc. The ‘:default` root is used whenever a root is not supplied when calling `IOStreams.join`.

Example:

IOStreams.add_root(:default, "tmp/export")
IOStreams.add_root(:ftp, "tmp/ftp")

IOStreams.join('file.xls')
# => #<IOStreams::Paths::File:0x00007fec70391bd8 @path="tmp/export/file.xls">

IOStreams.join('file.xls').to_s
# => "tmp/export/file.xls"

IOStreams.join('sample', 'file.xls', root: :ftp)
# => #<IOStreams::Paths::File:0x00007fec6ee329b8 @path="tmp/ftp/sample/file.xls">

IOStreams.join('sample', 'file.xls', root: :ftp).to_s
# => "tmp/ftp/sample/file.xls"

Notes:

  • Add the root path first against which this path is permitted to operate.

    `IOStreams.add_root(:default, "/usr/local/var/files")`
    


106
107
108
# File 'lib/io_streams/io_streams.rb', line 106

def self.join(*elements, root: :default)
  root(root).join(*elements)
end

.loggerObject

Returns [Logger] the logger used by IOStreams for debug logging.

When SemanticLogger is loaded a SemanticLogger instance is used by default, otherwise no logging is performed unless a logger is assigned via #logger=.



266
267
268
# File 'lib/io_streams/io_streams.rb', line 266

def self.logger
  @logger
end

.logger=(logger) ⇒ Object

Replace the logger used by IOStreams.

Set to nil to disable logging.



273
274
275
# File 'lib/io_streams/io_streams.rb', line 273

def self.logger=(logger)
  @logger = logger
end

.new(file_name_or_io) ⇒ Object

For processing by either a file name or an open IO stream.



69
70
71
72
73
# File 'lib/io_streams/io_streams.rb', line 69

def self.new(file_name_or_io)
  return file_name_or_io if file_name_or_io.is_a?(Stream)

  file_name_or_io.is_a?(String) ? path(file_name_or_io) : stream(file_name_or_io)
end

.path(*elements, **args) ⇒ Object

Returns [Path] instance for the supplied complete path with optional scheme.

Example:

IOStreams.path("/usr", "local", "sample")
# => #<IOStreams::Paths::File:0x00007fec66e59b60 @path="/usr/local/sample">

IOStreams.path("/usr", "local", "sample").to_s
# => "/usr/local/sample"

IOStreams.path("s3://mybucket/path/file.xls")
# => #<IOStreams::Paths::S3:0x00007fec66e3a288 @path="s3://mybucket/path/file.xls">

IOStreams.path("s3://mybucket/path/file.xls").to_s
# => "s3://mybucket/path/file.xls"

IOStreams.path("file.xls")
# => #<IOStreams::Paths::File:0x00007fec6be6aaf0 @path="file.xls">

IOStreams.path("files", "file.xls").to_s
# => "files/file.xls"

For Files IOStreams.path(‘blah.zip’).option(:encode, encoding: ‘BINARY’).each(:line) { |line| puts line } IOStreams.path(‘blah.zip’).option(:encode, encoding: ‘UTF-8’).each(:line) { |line| puts line } IOStreams.path(‘blah.zip’).option(:encode, encoding: ‘UTF-8’).each(:hash) { |hash| p hash } IOStreams.path(‘blah.zip’).option(:encode, encoding: ‘UTF-8’).read IOStreams.path(‘blah.csv.zip’).each(:line) { |line| puts line } IOStreams.path(‘blah.zip’).option(:pgp, passphrase: ‘receiver_passphrase’).read IOStreams.path(‘blah.zip’).stream(:zip).stream(:pgp, passphrase: ‘receiver_passphrase’).read IOStreams.path(‘blah.zip’).stream(:zip).stream(:encode, encoding: ‘BINARY’).read



47
48
49
50
51
52
53
54
55
# File 'lib/io_streams/io_streams.rb', line 47

def self.path(*elements, **args)
  return elements.first if (elements.size == 1) && args.empty? && elements.first.is_a?(IOStreams::Path)

  elements         = elements.collect(&:to_s)
  path             = ::File.join(*elements)
  extracted_scheme = path.include?("://") ? Utils::URI.new(path).scheme : nil
  klass            = scheme(extracted_scheme)
  args.empty? ? klass.new(path) : klass.new(path, **args)
end

.register_extension(extension, reader_class, writer_class) ⇒ Object

Register a file extension and the reader and writer streaming classes

Example:

# MyXls::Reader and MyXls::Writer must implement .open
register_extension(:xls, MyXls::Reader, MyXls::Writer)

Raises:

  • (ArgumentError)


284
285
286
287
288
# File 'lib/io_streams/io_streams.rb', line 284

def self.register_extension(extension, reader_class, writer_class)
  raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.nil? || extension.to_s =~ /\A\w+\Z/

  @extensions[extension&.to_sym] = Extension.new(reader_class, writer_class)
end

.register_scheme(scheme, klass) ⇒ Object

Register a URI scheme and the path class that handles it

Example:

register_scheme(:gcs, MyGoogleCloudStoragePath)

Raises:

  • (ArgumentError)


311
312
313
314
315
# File 'lib/io_streams/io_streams.rb', line 311

def self.register_scheme(scheme, klass)
  raise(ArgumentError, "Invalid scheme #{scheme.inspect}") unless scheme.nil? || scheme.to_s =~ /\A\w+\Z/

  @schemes[scheme&.to_sym] = klass
end

.root(root = :default) ⇒ Object

Returns [IOStreams::Paths::File] the default root path, or the named root path



229
230
231
# File 'lib/io_streams/io_streams.rb', line 229

def self.root(root = :default)
  @root_paths[root.to_sym] || raise(ArgumentError, "Root: #{root.inspect} has not been registered.")
end

.rootsObject



240
241
242
# File 'lib/io_streams/io_streams.rb', line 240

def self.roots
  @root_paths.dup
end

.scheme(scheme_name) ⇒ Object



321
322
323
# File 'lib/io_streams/io_streams.rb', line 321

def self.scheme(scheme_name)
  @schemes[scheme_name&.to_sym] || raise(ArgumentError, "Unknown Scheme type: #{scheme_name.inspect}")
end

.schemesObject



317
318
319
# File 'lib/io_streams/io_streams.rb', line 317

def self.schemes
  @schemes.dup
end

.stream(io_stream) ⇒ Object

For an existing IO Stream IOStreams.stream(io).file_name(‘blah.zip’).encoding(‘BINARY’).read IOStreams.stream(io).file_name(‘blah.zip’).encoding(‘BINARY’).each(:line){ … } IOStreams.stream(io).file_name(‘blah.csv.zip’).each(:line) { … } IOStreams.stream(io).stream(:zip).stream(:pgp, passphrase: ‘receiver_passphrase’).read



62
63
64
65
66
# File 'lib/io_streams/io_streams.rb', line 62

def self.stream(io_stream)
  return io_stream if io_stream.is_a?(Stream)

  Stream.new(io_stream)
end

.temp_dirObject

Returns the temporary path used when creating local temp files.

Default:

ENV['TMPDIR'], or ENV['TMP'], or ENV['TEMP'], or `Etc.systmpdir`, or '/tmp', otherwise '.'


256
257
258
# File 'lib/io_streams/io_streams.rb', line 256

def self.temp_dir
  @temp_dir ||= Dir.tmpdir
end

.temp_dir=(temp_dir) ⇒ Object

Set the temporary path to use when creating local temp files.



245
246
247
248
249
250
# File 'lib/io_streams/io_streams.rb', line 245

def self.temp_dir=(temp_dir)
  temp_dir = File.expand_path(temp_dir)
  FileUtils.mkdir_p(temp_dir)

  @temp_dir = temp_dir
end

.temp_file(basename, extension = "") ⇒ Object

Returns a path to a temporary file. Temporary file is deleted upon block completion if present.

Parameters:

basename: [String]
  Base file name to include in the temp file name.

extension: [String]
  Optional extension to add to the tempfile.

Example:

IOStreams.temp_file("export", ".csv") { |path| path.write("Hello World") }


122
123
124
# File 'lib/io_streams/io_streams.rb', line 122

def self.temp_file(basename, extension = "")
  Utils.temp_file_name(basename, extension) { |file_name| yield(Paths::File.new(file_name).stream(:none)) }
end

.working_pathObject

Returns [IOStreams::Paths::File] the current working path for this process.



132
133
134
# File 'lib/io_streams/io_streams.rb', line 132

def self.working_path
  IOStreams::Paths::File.new(Dir.pwd)
end