Module: SparkConnect::ArrowConverter

Defined in:
lib/spark_connect/arrow.rb

Overview

Decodes the Apache Arrow IPC stream payloads returned by the server into Ruby Rows. Each ‘ExecutePlanResponse.arrow_batch.data` chunk is a complete, self-contained Arrow IPC stream (schema + record batches); this converter reads each chunk and flattens all record batches into rows.

Named ‘ArrowConverter` (not `Arrow`) so that references to the `red-arrow` top-level Arrow constant inside the gem are unambiguous.

Class Method Summary collapse

Class Method Details

.arrow_field_type(data_type) ⇒ Object

Map a Spark type to the corresponding ‘red-arrow` data type used when building local relations.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/spark_connect/arrow.rb', line 94

def arrow_field_type(data_type)
  case data_type
  when Types::BooleanType then :boolean
  when Types::ByteType then :int8
  when Types::ShortType then :int16
  when Types::IntegerType then :int32
  when Types::LongType then :int64
  when Types::FloatType then :float
  when Types::DoubleType then :double
  when Types::StringType, Types::CharType, Types::VarcharType then :string
  when Types::BinaryType then :binary
  when Types::DateType then :date32
  # TimestampType is an instant: tag it UTC so the server reads the epoch
  # micros as a point in time rather than session-local wall-clock. The NTZ
  # variant stays zone-less (wall-clock) to match its semantics.
  when Types::TimestampType then ::Arrow::TimestampDataType.new(:micro, GLib::TimeZone.new("UTC"))
  when Types::TimestampNTZType then { type: :timestamp, unit: :micro }
  when Types::ArrayType
    { type: :list, field: { name: "element", type: arrow_field_type(data_type.element_type) } }
  when Types::StructType
    { type: :struct, fields: data_type.fields.map { |f| { name: f.name, type: arrow_field_type(f.data_type) } } }
  else :string # rubocop:disable Lint/DuplicateBranch -- string default for unsupported-locally types
  end
end

.build_arrow_schema(schema) ⇒ Object



73
74
75
76
77
78
# File 'lib/spark_connect/arrow.rb', line 73

def build_arrow_schema(schema)
  fields = schema.fields.map do |f|
    ::Arrow::Field.new(f.name, arrow_field_type(f.data_type))
  end
  ::Arrow::Schema.new(fields)
end

.extract_value(row, name, idx) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/spark_connect/arrow.rb', line 80

def extract_value(row, name, idx)
  case row
  when Row then row[name]
  when Hash
    if row.key?(name) then row[name]
    elsif row.key?(name.to_sym) then row[name.to_sym]
    end
  when Array then row[idx]
  else row
  end
end

.from_rows(rows, schema) ⇒ String

Serialize an array of Ruby hashes into a single Arrow IPC stream given a Spark Types::StructType. Used by SparkSession#create_data_frame to ship local data to the server as a ‘LocalRelation`.

Parameters:

Returns:

  • (String)

    Arrow IPC stream bytes.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/spark_connect/arrow.rb', line 58

def from_rows(rows, schema)
  arrow_schema = build_arrow_schema(schema)
  raw_rows = rows.map do |row|
    schema.fields.each_with_index.map { |field, idx| extract_value(row, field.name, idx) }
  end
  record_batch = ::Arrow::RecordBatch.new(arrow_schema, raw_rows)
  buffer = ::Arrow::ResizableBuffer.new(1024)
  ::Arrow::BufferOutputStream.open(buffer) do |output|
    ::Arrow::RecordBatchStreamWriter.open(output, arrow_schema) do |writer|
      writer.write_record_batch(record_batch)
    end
  end
  buffer.data.to_s
end

.to_rows(batches) ⇒ Array<Row>

Decode IPC stream chunks into rows.

Parameters:

  • batches (Array<String>)

    Arrow IPC stream byte chunks.

Returns:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/spark_connect/arrow.rb', line 20

def to_rows(batches)
  rows = []
  field_names = nil
  batches.each do |data|
    next if data.nil? || data.empty?

    reader = ::Arrow::RecordBatchStreamReader.new(::Arrow::BufferInputStream.new(::Arrow::Buffer.new(data)))
    reader.each do |record_batch|
      field_names ||= record_batch.schema.fields.map(&:name)
      record_batch.raw_records.each do |values|
        rows << Row.new(values, fields: field_names)
      end
    end
  end
  rows
end

.to_table(batches) ⇒ Arrow::Table?

Decode IPC stream chunks into a single Arrow Table (for advanced/columnar consumers who want zero-copy access to the underlying data).

Parameters:

  • batches (Array<String>)

Returns:

  • (Arrow::Table, nil)


42
43
44
45
46
47
48
49
# File 'lib/spark_connect/arrow.rb', line 42

def to_table(batches)
  tables = batches.reject { |b| b.nil? || b.empty? }.map do |data|
    ::Arrow::RecordBatchStreamReader.new(::Arrow::BufferInputStream.new(::Arrow::Buffer.new(data))).read_all
  end
  return nil if tables.empty?

  tables.reduce { |acc, t| acc.concatenate([t]) }
end