Class: ArrowFormat::StreamingWriter

Inherits:
Object
  • Object
show all
Includes:
BufferAlignable
Defined in:
lib/arrow-format/streaming-writer.rb

Direct Known Subclasses

FileWriter

Constant Summary collapse

ALIGNMENT_SIZE =
IO::Buffer.size_of(:u64)
CONTINUATION =
"\xFF\xFF\xFF\xFF".b.freeze
EOS =
"\xFF\xFF\xFF\xFF\x00\x00\x00\x00".b.freeze
METADATA_LARGEST_PADDING =
"\x00" * 7

Constants included from BufferAlignable

BufferAlignable::BUFFER_ALIGNMENT_SIZE

Instance Method Summary collapse

Constructor Details

#initialize(output) ⇒ StreamingWriter

Returns a new instance of StreamingWriter.



29
30
31
32
33
34
35
# File 'lib/arrow-format/streaming-writer.rb', line 29

def initialize(output)
  @output = output
  @offset = 0
  @fb_dictionary_blocks = []
  @fb_record_batch_blocks = []
  @written_dictionary_offsets = {}
end

Instance Method Details

#finishObject



54
55
56
57
# File 'lib/arrow-format/streaming-writer.rb', line 54

def finish
  write_data(EOS)
  @output
end

#start(schema) ⇒ Object



37
38
39
40
# File 'lib/arrow-format/streaming-writer.rb', line 37

def start(schema)
  write_message((schema.to_flatbuffers,
                               custom_metadata: schema.))
end

#write_record_batch(record_batch) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/arrow-format/streaming-writer.rb', line 42

def write_record_batch(record_batch)
  record_batch.schema.fields.each_with_index do |field, i|
    next unless field.type.is_a?(DictionaryType)
    dictionary_array = record_batch.columns[i]
    write_dictionary(field.type.id, dictionary_array)
  end

  write_record_batch_based_message(record_batch,
                                   record_batch.to_flatbuffers,
                                   @fb_record_batch_blocks)
end