Class: Feedx::Format::Parquet::Encoder
- Inherits:
-
Abstract::Encoder
- Object
- Abstract::Wrapper
- Abstract::Encoder
- Feedx::Format::Parquet::Encoder
- Defined in:
- lib/feedx/format/parquet.rb
Instance Attribute Summary collapse
-
#schema ⇒ Object
readonly
Returns the value of attribute schema.
Instance Method Summary collapse
- #close ⇒ Object
- #encode(msg, **opts) ⇒ Object
-
#initialize(io, schema:, buffer_size: 1 << 20, batch_size: 10_000) ⇒ Encoder
constructor
A new instance of Encoder.
Methods inherited from Abstract::Wrapper
Constructor Details
#initialize(io, schema:, buffer_size: 1 << 20, batch_size: 10_000) ⇒ Encoder
Returns a new instance of Encoder.
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/feedx/format/parquet.rb', line 57 def initialize(io, schema:, buffer_size: 1 << 20, batch_size: 10_000) super(io) @schema = schema @batch_size = batch_size.to_i @buffer_size = buffer_size.to_i @tmpname = ::Dir::Tmpname.create('feedx-parquet') {|path, *| path } @output = Arrow::FileOutputStream.new(@tmpname, append: false) @writer = Parquet::ArrowFileWriter.new(@schema, @output) @batch = [] end |
Instance Attribute Details
#schema ⇒ Object (readonly)
Returns the value of attribute schema.
55 56 57 |
# File 'lib/feedx/format/parquet.rb', line 55 def schema @schema end |
Instance Method Details
#close ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/feedx/format/parquet.rb', line 78 def close flush_table unless @batch.empty? @writer.close @output.close IO.copy_stream(@tmpname, @io) ensure unlink! end |
#encode(msg, **opts) ⇒ Object
70 71 72 73 74 75 76 |
# File 'lib/feedx/format/parquet.rb', line 70 def encode(msg, **opts) msg = msg.to_parquet(@schema, **opts) if msg.respond_to?(:to_parquet) res = @batch.push(msg) flush_table if @batch.size >= @batch_size res end |