Class: Feedx::Format::Parquet::Encoder

Inherits:
Abstract::Encoder show all
Defined in:
lib/feedx/format/parquet.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Abstract::Wrapper

open

Constructor Details

#initialize(io, schema:, buffer_size: 1 << 20, batch_size: 10_000) ⇒ Encoder

Returns a new instance of Encoder.



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/feedx/format/parquet.rb', line 57

def initialize(io, schema:, buffer_size: 1 << 20, batch_size: 10_000)
  super(io)

  @schema = schema
  @batch_size = batch_size.to_i
  @buffer_size = buffer_size.to_i

  @tmpname = ::Dir::Tmpname.create('feedx-parquet') {|path, *| path }
  @output  = Arrow::FileOutputStream.new(@tmpname, append: false)
  @writer  = Parquet::ArrowFileWriter.new(@schema, @output)
  @batch   = []
end

Instance Attribute Details

#schemaObject (readonly)

Returns the value of attribute schema.



55
56
57
# File 'lib/feedx/format/parquet.rb', line 55

def schema
  @schema
end

Instance Method Details

#closeObject



78
79
80
81
82
83
84
85
86
# File 'lib/feedx/format/parquet.rb', line 78

def close
  flush_table unless @batch.empty?

  @writer.close
  @output.close
  IO.copy_stream(@tmpname, @io)
ensure
  unlink!
end

#encode(msg, **opts) ⇒ Object



70
71
72
73
74
75
76
# File 'lib/feedx/format/parquet.rb', line 70

def encode(msg, **opts)
  msg = msg.to_parquet(@schema, **opts) if msg.respond_to?(:to_parquet)

  res = @batch.push(msg)
  flush_table if @batch.size >= @batch_size
  res
end