Module: Fluent::Plugin::Extractor
- Defined in:
- lib/fluent/plugin/extractor.rb
Defined Under Namespace
Classes: SizeLimitError
Constant Summary collapse
- BYTES_TO_READ =
64 * 1024
- INFLATE_BYTES_TO_READ =
1024
Class Method Summary collapse
- .decompress_deflate(compressed_data, limit:) ⇒ Object
- .decompress_gzip(compressed_data, limit:) ⇒ Object
- .decompress_zstd(compressed_data, limit:) ⇒ Object
- .io_decompress_gzip(input, output) ⇒ Object
- .io_decompress_zstd(input, output) ⇒ Object
Class Method Details
.decompress_deflate(compressed_data, limit:) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/extractor.rb', line 70 def self.decompress_deflate(compressed_data, limit:) io = StringIO.new(compressed_data) out = '' begin zstream = Zlib::Inflate.new while (chunk = io.read(INFLATE_BYTES_TO_READ)) out << zstream.inflate(chunk) if out.bytesize > limit raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" end end out << zstream.finish if out.bytesize > limit raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" end ensure zstream&.close end out end |
.decompress_gzip(compressed_data, limit:) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/extractor.rb', line 30 def self.decompress_gzip(compressed_data, limit:) io = StringIO.new(compressed_data) out = '' loop do reader = Zlib::GzipReader.new(io) while (chunk = reader.read(BYTES_TO_READ)) out << chunk if out.bytesize > limit raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" end end unused = reader.unused reader.finish unless unused.nil? adjust = unused.length io.pos -= adjust end break if io.eof? end out end |
.decompress_zstd(compressed_data, limit:) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/extractor.rb', line 53 def self.decompress_zstd(compressed_data, limit:) io = StringIO.new(compressed_data) reader = Zstd::StreamReader.new(io) out = '' loop do # Zstd::StreamReader needs to specify the size of the buffer out << reader.read(BYTES_TO_READ) if out.bytesize > limit raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes" end # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position break if io.eof? end out end |
.io_decompress_gzip(input, output) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/extractor.rb', line 91 def self.io_decompress_gzip(input, output) loop do reader = Zlib::GzipReader.new(input) while (chunk = reader.read(BYTES_TO_READ)) output.write(chunk) end unused = reader.unused reader.finish unless unused.nil? adjust = unused.length input.pos -= adjust end break if input.eof? end output end |
.io_decompress_zstd(input, output) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/fluent/plugin/extractor.rb', line 108 def self.io_decompress_zstd(input, output) reader = Zstd::StreamReader.new(input) loop do # Zstd::StreamReader needs to specify the size of the buffer chunk = reader.read(BYTES_TO_READ) output.write(chunk) # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position break if input.eof? end output end |