Module: Fluent::Plugin::Buffer::Chunk::ZstdDecompressable
- Includes:
- Compressable
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Constant Summary
Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT
Instance Method Summary
collapse
#compress, #decompress
Instance Method Details
#append(data, **kwargs) ⇒ Object
252
253
254
255
256
257
258
259
260
261
262
263
264
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 252
def append(data, **kwargs)
if kwargs[:compress] == :zstd
io = StringIO.new
stream = Zstd::StreamWriter.new(io)
data.each do |d|
stream.write(d)
end
stream.finish
concat(io.string, data.size)
else
super
end
end
|
#open(**kwargs, &block) ⇒ Object
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 266
def open(**kwargs, &block)
if kwargs[:compressed] == :zstd
super
else
super(**kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
end
output_io.binmode if output_io.is_a?(Tempfile)
decompress(input_io: chunk_io, output_io: output_io, type: :zstd)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end
|
#read(**kwargs) ⇒ Object
284
285
286
287
288
289
290
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 284
def read(**kwargs)
if kwargs[:compressed] == :zstd
super
else
decompress(super,type: :zstd)
end
end
|
#write_to(io, **kwargs) ⇒ Object
292
293
294
295
296
297
298
299
300
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 292
def write_to(io, **kwargs)
open(compressed: :zstd) do |chunk_io|
if kwargs[:compressed] == :zstd
IO.copy_stream(chunk_io, io)
else
decompress(input_io: chunk_io, output_io: io, type: :zstd)
end
end
end
|