Module: SparkConnect::ArrowConverter
- Defined in:
- lib/spark_connect/arrow.rb
Overview
Decodes the Apache Arrow IPC stream payloads returned by the server into Ruby Rows. Each ‘ExecutePlanResponse.arrow_batch.data` chunk is a complete, self-contained Arrow IPC stream (schema + record batches); this converter reads each chunk and flattens all record batches into rows.
Named ‘ArrowConverter` (not `Arrow`) so that references to the `red-arrow` top-level Arrow constant inside the gem are unambiguous.
Class Method Summary collapse
-
.arrow_field_type(data_type) ⇒ Object
Map a Spark type to the corresponding ‘red-arrow` data type used when building local relations.
- .build_arrow_schema(schema) ⇒ Object
- .extract_value(row, name, idx) ⇒ Object
-
.from_rows(rows, schema) ⇒ String
Serialize an array of Ruby hashes into a single Arrow IPC stream given a Spark Types::StructType.
-
.to_rows(batches) ⇒ Array<Row>
Decode IPC stream chunks into rows.
-
.to_table(batches) ⇒ Arrow::Table?
Decode IPC stream chunks into a single Arrow Table (for advanced/columnar consumers who want zero-copy access to the underlying data).
Class Method Details
.arrow_field_type(data_type) ⇒ Object
Map a Spark type to the corresponding ‘red-arrow` data type used when building local relations.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/spark_connect/arrow.rb', line 94 def arrow_field_type(data_type) case data_type when Types::BooleanType then :boolean when Types::ByteType then :int8 when Types::ShortType then :int16 when Types::IntegerType then :int32 when Types::LongType then :int64 when Types::FloatType then :float when Types::DoubleType then :double when Types::StringType, Types::CharType, Types::VarcharType then :string when Types::BinaryType then :binary when Types::DateType then :date32 # TimestampType is an instant: tag it UTC so the server reads the epoch # micros as a point in time rather than session-local wall-clock. The NTZ # variant stays zone-less (wall-clock) to match its semantics. when Types::TimestampType then ::Arrow::TimestampDataType.new(:micro, GLib::TimeZone.new("UTC")) when Types::TimestampNTZType then { type: :timestamp, unit: :micro } when Types::ArrayType { type: :list, field: { name: "element", type: arrow_field_type(data_type.element_type) } } when Types::StructType { type: :struct, fields: data_type.fields.map { |f| { name: f.name, type: arrow_field_type(f.data_type) } } } else :string # rubocop:disable Lint/DuplicateBranch -- string default for unsupported-locally types end end |
.build_arrow_schema(schema) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/spark_connect/arrow.rb', line 73 def build_arrow_schema(schema) fields = schema.fields.map do |f| ::Arrow::Field.new(f.name, arrow_field_type(f.data_type)) end ::Arrow::Schema.new(fields) end |
.extract_value(row, name, idx) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/spark_connect/arrow.rb', line 80 def extract_value(row, name, idx) case row when Row then row[name] when Hash if row.key?(name) then row[name] elsif row.key?(name.to_sym) then row[name.to_sym] end when Array then row[idx] else row end end |
.from_rows(rows, schema) ⇒ String
Serialize an array of Ruby hashes into a single Arrow IPC stream given a Spark Types::StructType. Used by SparkSession#create_data_frame to ship local data to the server as a ‘LocalRelation`.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/spark_connect/arrow.rb', line 58 def from_rows(rows, schema) arrow_schema = build_arrow_schema(schema) raw_rows = rows.map do |row| schema.fields.each_with_index.map { |field, idx| extract_value(row, field.name, idx) } end record_batch = ::Arrow::RecordBatch.new(arrow_schema, raw_rows) buffer = ::Arrow::ResizableBuffer.new(1024) ::Arrow::BufferOutputStream.open(buffer) do |output| ::Arrow::RecordBatchStreamWriter.open(output, arrow_schema) do |writer| writer.write_record_batch(record_batch) end end buffer.data.to_s end |
.to_rows(batches) ⇒ Array<Row>
Decode IPC stream chunks into rows.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/spark_connect/arrow.rb', line 20 def to_rows(batches) rows = [] field_names = nil batches.each do |data| next if data.nil? || data.empty? reader = ::Arrow::RecordBatchStreamReader.new(::Arrow::BufferInputStream.new(::Arrow::Buffer.new(data))) reader.each do |record_batch| field_names ||= record_batch.schema.fields.map(&:name) record_batch.raw_records.each do |values| rows << Row.new(values, fields: field_names) end end end rows end |
.to_table(batches) ⇒ Arrow::Table?
Decode IPC stream chunks into a single Arrow Table (for advanced/columnar consumers who want zero-copy access to the underlying data).
42 43 44 45 46 47 48 49 |
# File 'lib/spark_connect/arrow.rb', line 42 def to_table(batches) tables = batches.reject { |b| b.nil? || b.empty? }.map do |data| ::Arrow::RecordBatchStreamReader.new(::Arrow::BufferInputStream.new(::Arrow::Buffer.new(data))).read_all end return nil if tables.empty? tables.reduce { |acc, t| acc.concatenate([t]) } end |