Module: Fluent::Plugin::Extractor

Defined in:
lib/fluent/plugin/extractor.rb

Defined Under Namespace

Classes: SizeLimitError

Constant Summary collapse

BYTES_TO_READ =
64 * 1024
INFLATE_BYTES_TO_READ =
1024

Class Method Summary collapse

Class Method Details

.decompress_deflate(compressed_data, limit:) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/extractor.rb', line 70

def self.decompress_deflate(compressed_data, limit:)
  io = StringIO.new(compressed_data)
  out = ''
  begin
    zstream = Zlib::Inflate.new
    while (chunk = io.read(INFLATE_BYTES_TO_READ))
      out << zstream.inflate(chunk)
      if out.bytesize > limit
        raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
      end
    end
    out << zstream.finish
    if out.bytesize > limit
      raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
    end
  ensure
    zstream&.close
  end
  out
end

.decompress_gzip(compressed_data, limit:) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/extractor.rb', line 30

def self.decompress_gzip(compressed_data, limit:)
  io = StringIO.new(compressed_data)
  out = ''
  loop do
    reader = Zlib::GzipReader.new(io)
    while (chunk = reader.read(BYTES_TO_READ))
      out << chunk
      if out.bytesize > limit
        raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
      end
    end

    unused = reader.unused
    reader.finish
    unless unused.nil?
      adjust = unused.length
      io.pos -= adjust
    end
    break if io.eof?
  end
  out
end

.decompress_zstd(compressed_data, limit:) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/extractor.rb', line 53

def self.decompress_zstd(compressed_data, limit:)
  io = StringIO.new(compressed_data)
  reader = Zstd::StreamReader.new(io)
  out = ''
  loop do
    # Zstd::StreamReader needs to specify the size of the buffer
    out << reader.read(BYTES_TO_READ)
    if out.bytesize > limit
      raise SizeLimitError, "Decompressed data exceeds limit of #{limit} bytes"
    end

    # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
    break if io.eof?
  end
  out
end

.io_decompress_gzip(input, output) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/extractor.rb', line 91

def self.io_decompress_gzip(input, output)
  loop do
    reader = Zlib::GzipReader.new(input)
    while (chunk = reader.read(BYTES_TO_READ))
      output.write(chunk)
    end
    unused = reader.unused
    reader.finish
    unless unused.nil?
      adjust = unused.length
      input.pos -= adjust
    end
    break if input.eof?
  end
  output
end

.io_decompress_zstd(input, output) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/fluent/plugin/extractor.rb', line 108

def self.io_decompress_zstd(input, output)
  reader = Zstd::StreamReader.new(input)
  loop do
    # Zstd::StreamReader needs to specify the size of the buffer
    chunk = reader.read(BYTES_TO_READ)
    output.write(chunk)
    # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
    break if input.eof?
  end
  output
end