Class: SparkConnect::DataFrame

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/data_frame.rb,
lib/spark_connect/stat_functions.rb

Overview

Reopen DataFrame to add the ‘describe`/`summary` actions, which are naturally statistical and share the Stat* relations.

Constant Summary collapse

Proto =
SparkConnect::Proto
JOIN_TYPES =
{
  inner: :JOIN_TYPE_INNER,
  cross: :JOIN_TYPE_CROSS,
  outer: :JOIN_TYPE_FULL_OUTER,
  full: :JOIN_TYPE_FULL_OUTER,
  fullouter: :JOIN_TYPE_FULL_OUTER,
  full_outer: :JOIN_TYPE_FULL_OUTER,
  left: :JOIN_TYPE_LEFT_OUTER,
  leftouter: :JOIN_TYPE_LEFT_OUTER,
  left_outer: :JOIN_TYPE_LEFT_OUTER,
  right: :JOIN_TYPE_RIGHT_OUTER,
  rightouter: :JOIN_TYPE_RIGHT_OUTER,
  right_outer: :JOIN_TYPE_RIGHT_OUTER,
  semi: :JOIN_TYPE_LEFT_SEMI,
  leftsemi: :JOIN_TYPE_LEFT_SEMI,
  left_semi: :JOIN_TYPE_LEFT_SEMI,
  anti: :JOIN_TYPE_LEFT_ANTI,
  leftanti: :JOIN_TYPE_LEFT_ANTI,
  left_anti: :JOIN_TYPE_LEFT_ANTI,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, relation) ⇒ DataFrame

Returns a new instance of DataFrame.

Parameters:



53
54
55
56
# File 'lib/spark_connect/data_frame.rb', line 53

def initialize(session, relation)
  @session = session
  @relation = relation
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object

Allows ‘df.column_name` for valid identifier column names.



604
605
606
607
608
609
610
# File 'lib/spark_connect/data_frame.rb', line 604

def method_missing(name, *args)
  if args.empty? && columns.include?(name.to_s)
    Functions.col(name.to_s)
  else
    super
  end
end

Instance Attribute Details

#relationSpark::Connect::Relation (readonly)

Returns the logical plan this DataFrame builds.

Returns:



49
50
51
# File 'lib/spark_connect/data_frame.rb', line 49

def relation
  @relation
end

#sessionSparkSession (readonly)

Returns:



47
48
49
# File 'lib/spark_connect/data_frame.rb', line 47

def session
  @session
end

Instance Method Details

#[](key) ⇒ Column

Index into a column by name (‘df`) or position (`df`).

Parameters:

  • key (String, Symbol, Integer)

Returns:



596
597
598
599
600
601
# File 'lib/spark_connect/data_frame.rb', line 596

def [](key)
  case key
  when Integer then Functions.col(columns[key])
  else Functions.col(key.to_s)
  end
end

#agg(*exprs) ⇒ DataFrame

Aggregate over the whole DataFrame (a group-by with no grouping columns).

Parameters:

Returns:



248
249
250
# File 'lib/spark_connect/data_frame.rb', line 248

def agg(*exprs)
  group_by.agg(*exprs)
end

#alias(name) ⇒ DataFrame Also known as: as

Alias this DataFrame (a subquery alias usable in join conditions).

Returns:



395
396
397
# File 'lib/spark_connect/data_frame.rb', line 395

def alias(name)
  build(subquery_alias: Proto::SubqueryAlias.new(input: @relation, alias: name.to_s))
end

#build(**rel) ⇒ Object



774
775
776
# File 'lib/spark_connect/data_frame.rb', line 774

def build(**rel)
  DataFrame.new(@session, PlanBuilder.relation(@session, **rel))
end

#checkpoint(eager: true) ⇒ DataFrame

Eagerly checkpoint this DataFrame: materialise it server-side and return a new DataFrame backed by the cached result (truncates the logical plan).

Parameters:

  • eager (Boolean) (defaults to: true)

    materialise immediately.

Returns:



534
535
536
# File 'lib/spark_connect/data_frame.rb', line 534

def checkpoint(eager: true)
  checkpoint_command(local: false, eager: eager)
end

#coalesce(num_partitions) ⇒ DataFrame

Reduce to ‘num_partitions` without a full shuffle.

Returns:



347
348
349
# File 'lib/spark_connect/data_frame.rb', line 347

def coalesce(num_partitions)
  build(repartition: Proto::Repartition.new(input: @relation, num_partitions: num_partitions, shuffle: false))
end

#col_regex(regex) ⇒ Column Also known as: colRegex

Select columns by a regular expression matched against their names.

Parameters:

  • regex (String)

Returns:



495
496
497
# File 'lib/spark_connect/data_frame.rb', line 495

def col_regex(regex)
  Column.new(Proto::Expression.new(unresolved_regex: Proto::Expression::UnresolvedRegex.new(col_name: regex.to_s)))
end

#collectArray<Row> Also known as: to_a

Execute the plan and return all rows.

Returns:



624
625
626
627
# File 'lib/spark_connect/data_frame.rb', line 624

def collect
  result = @session.client.execute_plan(@relation)
  ArrowConverter.to_rows(result.arrow_batches)
end

#column_objectsArray<Column>

Returns one Column per output column.

Returns:



581
582
583
# File 'lib/spark_connect/data_frame.rb', line 581

def column_objects
  columns.map { |c| Functions.col(c) }
end

#columnsArray<String>

Returns column names.

Returns:

  • (Array<String>)

    column names.



571
572
573
# File 'lib/spark_connect/data_frame.rb', line 571

def columns
  schema.names
end

#countInteger

Returns the number of rows.

Returns:

  • (Integer)

    the number of rows.



665
666
667
668
669
670
671
672
673
674
# File 'lib/spark_connect/data_frame.rb', line 665

def count
  df = build(aggregate: Proto::Aggregate.new(
    input: @relation,
    group_type: :GROUP_TYPE_GROUPBY,
    grouping_expressions: [],
    aggregate_expressions: [Column.invoke("count", Column.lit(1)).to_expr]
  ))
  row = df.collect.first
  row ? row[0] : 0
end

#create_global_temp_view(name) ⇒ void Also known as: createGlobalTempView

This method returns an undefined value.

Register this DataFrame as a global (cross-session) temporary view.



479
480
481
# File 'lib/spark_connect/data_frame.rb', line 479

def create_global_temp_view(name)
  register_view(name, global: true, replace: false)
end

#create_or_replace_global_temp_view(name) ⇒ void Also known as: createOrReplaceGlobalTempView

This method returns an undefined value.

Register (or replace) this DataFrame as a global temporary view.



486
487
488
# File 'lib/spark_connect/data_frame.rb', line 486

def create_or_replace_global_temp_view(name)
  register_view(name, global: true, replace: true)
end

#create_or_replace_temp_view(name) ⇒ void Also known as: createOrReplaceTempView

This method returns an undefined value.

Register (or replace) this DataFrame as a session-scoped temporary view.



472
473
474
# File 'lib/spark_connect/data_frame.rb', line 472

def create_or_replace_temp_view(name)
  register_view(name, global: false, replace: true)
end

#create_temp_view(name) ⇒ void Also known as: createTempView

This method returns an undefined value.

Register this DataFrame as a session-scoped temporary view, failing if a view of the same name already exists.



465
466
467
# File 'lib/spark_connect/data_frame.rb', line 465

def create_temp_view(name)
  register_view(name, global: false, replace: false)
end

#cross_join(other) ⇒ DataFrame Also known as: crossJoin

Cartesian product with another DataFrame.

Returns:



276
277
278
# File 'lib/spark_connect/data_frame.rb', line 276

def cross_join(other)
  build(join: Proto::Join.new(left: @relation, right: other.relation, join_type: :JOIN_TYPE_CROSS))
end

#cube(*cols) ⇒ GroupedData

Multi-dimensional cube.

Returns:



240
241
242
# File 'lib/spark_connect/data_frame.rb', line 240

def cube(*cols)
  GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_CUBE)
end

#describe(*cols) ⇒ DataFrame

Basic descriptive statistics (count, mean, stddev, min, max) per column.

Parameters:

  • cols (Array<String>)

    columns to describe (all when empty).

Returns:



96
97
98
# File 'lib/spark_connect/stat_functions.rb', line 96

def describe(*cols)
  build(describe: Proto::StatDescribe.new(input: @relation, cols: cols.flatten.map(&:to_s)))
end

#distinctDataFrame

Distinct rows.

Returns:



167
168
169
# File 'lib/spark_connect/data_frame.rb', line 167

def distinct
  build(deduplicate: Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true))
end

#drop(*cols) ⇒ DataFrame

Drop one or more columns (by name or Column).

Returns:



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/spark_connect/data_frame.rb', line 138

def drop(*cols)
  names = []
  columns = []
  cols.flatten.each do |c|
    case c
    when Column then columns << c.to_expr
    else names << c.to_s
    end
  end
  build(drop: Proto::Drop.new(input: @relation, columns: columns, column_names: names))
end

#drop_duplicates(subset = nil) ⇒ DataFrame Also known as: dropDuplicates, drop_duplicates_within_watermark

Drop duplicate rows, optionally restricted to a subset of columns.

Parameters:

  • subset (Array<String>, nil) (defaults to: nil)

Returns:



175
176
177
178
179
180
181
182
183
# File 'lib/spark_connect/data_frame.rb', line 175

def drop_duplicates(subset = nil)
  dedup =
    if subset.nil? || subset.empty?
      Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true)
    else
      Proto::Deduplicate.new(input: @relation, column_names: Array(subset).map(&:to_s))
    end
  build(deduplicate: dedup)
end

#dtypesArray<Array(String, String)>

Returns (name, simpleString-type) pairs.

Returns:

  • (Array<Array(String, String)>)

    (name, simpleString-type) pairs.



576
577
578
# File 'lib/spark_connect/data_frame.rb', line 576

def dtypes
  schema.fields.map { |f| [f.name, f.data_type.simple_string] }
end

#each {|row| ... } ⇒ Enumerator, void

Iterate over all rows (materialises the result). Returns an Enumerator when no block is given.

Yield Parameters:

Returns:

  • (Enumerator, void)


635
636
637
638
639
# File 'lib/spark_connect/data_frame.rb', line 635

def each(&block)
  return collect.each unless block

  collect.each(&block)
end

#empty?Boolean Also known as: is_empty

Returns whether the DataFrame has no rows.

Returns:

  • (Boolean)

    whether the DataFrame has no rows.



677
678
679
# File 'lib/spark_connect/data_frame.rb', line 677

def empty?
  limit(1).collect.empty?
end

#except_all(other) ⇒ DataFrame Also known as: exceptAll

Rows in this DataFrame not in ‘other` (distinct).

Returns:



316
317
318
# File 'lib/spark_connect/data_frame.rb', line 316

def except_all(other)
  set_op(other, :SET_OP_TYPE_EXCEPT, is_all: true)
end

#explain(mode = :simple) ⇒ void

This method returns an undefined value.

Print the query plan.



730
731
732
# File 'lib/spark_connect/data_frame.rb', line 730

def explain(mode = :simple)
  $stdout.puts(explain_string(mode))
end

#explain_string(mode = :simple) ⇒ String

Return the query plan as a string.

Parameters:

  • mode (Symbol) (defaults to: :simple)

    ‘:simple`, `:extended`, `:codegen`, `:cost`, `:formatted`.

Returns:

  • (String)


723
724
725
726
# File 'lib/spark_connect/data_frame.rb', line 723

def explain_string(mode = :simple)
  em = :"EXPLAIN_MODE_#{mode.to_s.upcase}"
  analyze(explain: Proto::AnalyzePlanRequest::Explain.new(plan: plan, explain_mode: em)).explain.explain_string
end

#filter(condition) ⇒ DataFrame Also known as: where

Filter rows by a condition.

Parameters:

  • condition (Column, String)

    a boolean column or SQL expression string.

Returns:



87
88
89
90
# File 'lib/spark_connect/data_frame.rb', line 87

def filter(condition)
  cond = condition.is_a?(String) ? Functions.expr(condition) : condition
  build(filter: Proto::Filter.new(input: @relation, condition: cond.to_expr))
end

#firstRow?

Returns the first row.

Returns:

  • (Row, nil)

    the first row.



660
661
662
# File 'lib/spark_connect/data_frame.rb', line 660

def first
  take(1).first
end

#group_by(*cols) ⇒ GroupedData Also known as: groupBy, groupby

Group by the given columns.

Parameters:

  • cols (Array<Column, String>)

Returns:



226
227
228
# File 'lib/spark_connect/data_frame.rb', line 226

def group_by(*cols)
  GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_GROUPBY)
end

#head(n = nil) ⇒ Array<Row>, Row

Returns ‘n` rows (array) or the single first row when called with no arg.

Returns:

  • (Array<Row>, Row)

    ‘n` rows (array) or the single first row when called with no arg.



653
654
655
656
657
# File 'lib/spark_connect/data_frame.rb', line 653

def head(n = nil)
  return first if n.nil?

  take(n)
end

#hint(name, *params) ⇒ DataFrame

Attach a planner hint (e.g. ‘“broadcast”`).

Parameters:

  • name (String)
  • params (Array)

Returns:



405
406
407
408
409
# File 'lib/spark_connect/data_frame.rb', line 405

def hint(name, *params)
  h = Proto::Hint.new(input: @relation, name: name.to_s,
                      parameters: params.map { |p| Column.to_col(p).to_expr })
  build(hint: h)
end

#input_filesArray<String>

Returns the input files backing this DataFrame.

Returns:

  • (Array<String>)

    the input files backing this DataFrame.



735
736
737
# File 'lib/spark_connect/data_frame.rb', line 735

def input_files
  analyze(input_files: Proto::AnalyzePlanRequest::InputFiles.new(plan: plan)).input_files.files.to_a
end

#intersect(other) ⇒ DataFrame

Set intersection (distinct).

Returns:



303
304
305
# File 'lib/spark_connect/data_frame.rb', line 303

def intersect(other)
  set_op(other, :SET_OP_TYPE_INTERSECT, is_all: false)
end

#intersect_all(other) ⇒ DataFrame Also known as: intersectAll

Set intersection keeping duplicates.

Returns:



309
310
311
# File 'lib/spark_connect/data_frame.rb', line 309

def intersect_all(other)
  set_op(other, :SET_OP_TYPE_INTERSECT, is_all: true)
end

#join(other, on: nil, how: :inner) ⇒ DataFrame

Join with another DataFrame.

Parameters:

  • other (DataFrame)
  • on (String, Array<String>, Column, nil) (defaults to: nil)

    join key column name(s) or a boolean join condition.

  • how (Symbol, String) (defaults to: :inner)

    join type (see JOIN_TYPES).

Returns:



261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/spark_connect/data_frame.rb', line 261

def join(other, on: nil, how: :inner)
  jt = JOIN_TYPES[how.to_s.downcase.to_sym] or
    raise IllegalArgumentError, "Unsupported join type: #{how}"
  j = Proto::Join.new(left: @relation, right: other.relation, join_type: jt)
  case on
  when nil then nil
  when Column then j.join_condition = on.to_expr
  when Array then j.using_columns += on.map(&:to_s)
  else j.using_columns << on.to_s
  end
  build(join: j)
end

#limit(n) ⇒ DataFrame

Returns the first ‘n` rows.

Returns:



211
212
213
# File 'lib/spark_connect/data_frame.rb', line 211

def limit(n)
  build(limit: Proto::Limit.new(input: @relation, limit: n))
end

#local?Boolean

Returns whether the data is small enough to be local.

Returns:

  • (Boolean)

    whether the data is small enough to be local.



740
741
742
# File 'lib/spark_connect/data_frame.rb', line 740

def local?
  analyze(is_local: Proto::AnalyzePlanRequest::IsLocal.new(plan: plan)).is_local.is_local
end

#local_checkpoint(eager: true) ⇒ DataFrame Also known as: localCheckpoint

Like #checkpoint but uses the executors’ local storage (no reliable storage), which is faster but not fault-tolerant.

Parameters:

  • eager (Boolean) (defaults to: true)

Returns:



543
544
545
# File 'lib/spark_connect/data_frame.rb', line 543

def local_checkpoint(eager: true)
  checkpoint_command(local: true, eager: eager)
end

#naDataFrameNaFunctions

Returns missing-data helpers (‘drop`, `fill`, `replace`).

Returns:



433
434
435
# File 'lib/spark_connect/data_frame.rb', line 433

def na
  DataFrameNaFunctions.new(self)
end

#observe(name, *exprs) ⇒ DataFrame

Observe named metrics over this DataFrame.

Parameters:

Returns:



553
554
555
556
557
558
559
560
561
# File 'lib/spark_connect/data_frame.rb', line 553

def observe(name, *exprs)
  obs_name = name.is_a?(Observation) ? name.name : name.to_s
  cm = Proto::CollectMetrics.new(
    input: @relation, name: obs_name, metrics: exprs.map { |e| Column.to_col(e).to_expr }
  )
  df = build(collect_metrics: cm)
  name.bind(df) if name.is_a?(Observation)
  df
end

#offset(n) ⇒ DataFrame

Returns all rows except the first ‘n`.

Returns:

  • (DataFrame)

    all rows except the first ‘n`.



216
217
218
# File 'lib/spark_connect/data_frame.rb', line 216

def offset(n)
  build(offset: Proto::Offset.new(input: @relation, offset: n))
end

#order_by(*cols) ⇒ DataFrame Also known as: sort, orderBy

Sort by the given columns (globally).

Parameters:

  • cols (Array<Column, String>)

Returns:



193
194
195
196
# File 'lib/spark_connect/data_frame.rb', line 193

def order_by(*cols)
  orders = normalize_columns(cols).map { |c| to_sort_order(c) }
  build(sort: Proto::Sort.new(input: @relation, order: orders, is_global: true))
end

#planSpark::Connect::Plan



763
764
765
# File 'lib/spark_connect/data_frame.rb', line 763

def plan
  PlanBuilder.root_plan(@relation)
end

This method returns an undefined value.

Print the schema as an indented tree to ‘io`.



587
588
589
# File 'lib/spark_connect/data_frame.rb', line 587

def print_schema(io = $stdout)
  io.puts(schema.tree_string)
end

#repartition(num_partitions, *cols) ⇒ DataFrame

Repartition into ‘num_partitions`, optionally hash-partitioned by columns.

Parameters:

  • num_partitions (Integer)
  • cols (Array<Column, String>)

Returns:



334
335
336
337
338
339
340
341
342
343
# File 'lib/spark_connect/data_frame.rb', line 334

def repartition(num_partitions, *cols)
  if cols.empty?
    build(repartition: Proto::Repartition.new(input: @relation, num_partitions: num_partitions, shuffle: true))
  else
    rbe = Proto::RepartitionByExpression.new(
      input: @relation, partition_exprs: normalize_columns(cols).map(&:to_expr), num_partitions: num_partitions
    )
    build(repartition_by_expression: rbe)
  end
end

#repartition_by_range(*cols) ⇒ DataFrame #repartition_by_range(num_partitions, *cols) ⇒ DataFrame Also known as: repartitionByRange

Range-partition by the given columns (rows are range-partitioned on the sort order of the columns).

Returns:



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/spark_connect/data_frame.rb', line 357

def repartition_by_range(*args)
  num_partitions = args.first.is_a?(Integer) ? args.shift : nil
  orders = normalize_columns(args).map do |c|
    expr = c.to_expr
    if expr.expr_type == :sort_order
      expr
    else
      Proto::Expression.new(sort_order: Proto::Expression::SortOrder.new(
        child: expr, direction: :SORT_DIRECTION_ASCENDING, null_ordering: :SORT_NULLS_FIRST
      ))
    end
  end
  rbe = Proto::RepartitionByExpression.new(input: @relation, partition_exprs: orders)
  rbe.num_partitions = num_partitions if num_partitions
  build(repartition_by_expression: rbe)
end

#respond_to_missing?(name, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


612
613
614
615
616
617
618
# File 'lib/spark_connect/data_frame.rb', line 612

def respond_to_missing?(name, include_private = false)
  begin
    columns.include?(name.to_s)
  rescue StandardError
    false
  end || super
end

#rollup(*cols) ⇒ GroupedData

Multi-dimensional rollup.

Returns:



234
235
236
# File 'lib/spark_connect/data_frame.rb', line 234

def rollup(*cols)
  GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_ROLLUP)
end

#same_semantics?(other) ⇒ Boolean

Returns whether ‘other` has the same logical plan.

Returns:

  • (Boolean)

    whether ‘other` has the same logical plan.



750
751
752
753
754
# File 'lib/spark_connect/data_frame.rb', line 750

def same_semantics?(other)
  analyze(same_semantics: Proto::AnalyzePlanRequest::SameSemantics.new(
    target_plan: plan, other_plan: other.plan
  )).same_semantics.result
end

#sample(fraction, with_replacement: false, seed: nil) ⇒ DataFrame

Random sample of rows.

Parameters:

  • fraction (Float)

    expected fraction (0.0..1.0).

  • with_replacement (Boolean) (defaults to: false)
  • seed (Integer, nil) (defaults to: nil)

Returns:



383
384
385
386
387
388
389
# File 'lib/spark_connect/data_frame.rb', line 383

def sample(fraction, with_replacement: false, seed: nil)
  s = Proto::Sample.new(
    input: @relation, lower_bound: 0.0, upper_bound: fraction, with_replacement: with_replacement
  )
  s.seed = seed if seed
  build(sample: s)
end

#schemaTypes::StructType

Returns the DataFrame’s schema.

Returns:



566
567
568
# File 'lib/spark_connect/data_frame.rb', line 566

def schema
  @schema ||= Types.from_proto(analyze(schema: Proto::AnalyzePlanRequest::Schema.new(plan: plan)).schema.schema)
end

#select(*cols) ⇒ DataFrame

Select a set of columns or expressions.

Parameters:

  • cols (Array<Column, String, Symbol>)

Returns:



64
65
66
67
# File 'lib/spark_connect/data_frame.rb', line 64

def select(*cols)
  exprs = normalize_columns(cols).map(&:to_expr)
  build(project: Proto::Project.new(input: @relation, expressions: exprs))
end

#select_expr(*exprs) ⇒ DataFrame Also known as: selectExpr

Select using SQL expression strings.

Parameters:

  • exprs (Array<String>)

Returns:



73
74
75
76
77
78
# File 'lib/spark_connect/data_frame.rb', line 73

def select_expr(*exprs)
  parsed = exprs.flatten.map do |e|
    Proto::Expression.new(expression_string: Proto::Expression::ExpressionString.new(expression: e))
  end
  build(project: Proto::Project.new(input: @relation, expressions: parsed))
end

#semantic_hashInteger

Returns a hash of the logical plan.

Returns:

  • (Integer)

    a hash of the logical plan.



757
758
759
# File 'lib/spark_connect/data_frame.rb', line 757

def semantic_hash
  analyze(semantic_hash: Proto::AnalyzePlanRequest::SemanticHash.new(plan: plan)).semantic_hash.result
end

#show(n = 20, truncate: true, vertical: false) ⇒ void

This method returns an undefined value.

Render the first ‘n` rows as a formatted table.

Parameters:

  • n (Integer) (defaults to: 20)
  • truncate (Boolean, Integer) (defaults to: true)

    truncate long values to 20 chars (true) or to the given width (Integer).

  • vertical (Boolean) (defaults to: false)


689
690
691
# File 'lib/spark_connect/data_frame.rb', line 689

def show(n = 20, truncate: true, vertical: false)
  $stdout.puts(show_string(n, truncate: truncate, vertical: vertical))
end

#show_string(n = 20, truncate: true, vertical: false) ⇒ String

Returns the formatted table string (what #show prints).

Returns:

  • (String)

    the formatted table string (what #show prints).



694
695
696
697
698
699
700
701
702
703
# File 'lib/spark_connect/data_frame.rb', line 694

def show_string(n = 20, truncate: true, vertical: false)
  trunc = if truncate == true
            20
          else
            (truncate == false ? 0 : truncate.to_i)
          end
  ss = Proto::ShowString.new(input: @relation, num_rows: n, truncate: trunc, vertical: vertical)
  df = build(show_string: ss)
  df.collect.first&.[](0).to_s
end

#sort_within_partitions(*cols) ⇒ DataFrame Also known as: sortWithinPartitions

Sort within each partition (no global shuffle).

Returns:



202
203
204
205
# File 'lib/spark_connect/data_frame.rb', line 202

def sort_within_partitions(*cols)
  orders = normalize_columns(cols).map { |c| to_sort_order(c) }
  build(sort: Proto::Sort.new(input: @relation, order: orders, is_global: false))
end

#statDataFrameStatFunctions

Returns statistical helpers.

Returns:



438
439
440
# File 'lib/spark_connect/data_frame.rb', line 438

def stat
  DataFrameStatFunctions.new(self)
end

#streaming?Boolean

Returns whether this is a streaming DataFrame.

Returns:

  • (Boolean)

    whether this is a streaming DataFrame.



745
746
747
# File 'lib/spark_connect/data_frame.rb', line 745

def streaming?
  analyze(is_streaming: Proto::AnalyzePlanRequest::IsStreaming.new(plan: plan)).is_streaming.is_streaming
end

#subtract(other) ⇒ DataFrame

Rows in this DataFrame not in ‘other` (distinct) - Spark’s ‘EXCEPT`.

Returns:



323
324
325
# File 'lib/spark_connect/data_frame.rb', line 323

def subtract(other)
  set_op(other, :SET_OP_TYPE_EXCEPT, is_all: false)
end

#summary(*statistics) ⇒ DataFrame

Configurable summary statistics.

Parameters:

  • statistics (Array<String>)

    e.g. ‘“count”`, `“mean”`, `“stddev”`, `“min”`, `“25%”`, `“50%”`, `“75%”`, `“max”`.

Returns:



105
106
107
# File 'lib/spark_connect/stat_functions.rb', line 105

def summary(*statistics)
  build(summary: Proto::StatSummary.new(input: @relation, statistics: statistics.flatten.map(&:to_s)))
end

#take(n) ⇒ Array<Row>

Returns the first ‘n` rows.

Returns:

  • (Array<Row>)

    the first ‘n` rows.



648
649
650
# File 'lib/spark_connect/data_frame.rb', line 648

def take(n)
  limit(n).collect
end

#to(schema) ⇒ DataFrame

Apply a Types::StructType (reconciling/casting columns to it).

Returns:



159
160
161
# File 'lib/spark_connect/data_frame.rb', line 159

def to(schema)
  build(to_schema: Proto::ToSchema.new(input: @relation, schema: schema.to_proto))
end

#to_arrowArrow::Table?

Materialise the result as an Arrow Arrow::Table (columnar).

Returns:

  • (Arrow::Table, nil)


707
708
709
710
# File 'lib/spark_connect/data_frame.rb', line 707

def to_arrow
  result = @session.client.execute_plan(@relation)
  ArrowConverter.to_table(result.arrow_batches)
end

#to_df(*names) ⇒ DataFrame Also known as: toDF

Rename all columns positionally.

Returns:



152
153
154
# File 'lib/spark_connect/data_frame.rb', line 152

def to_df(*names)
  build(to_df: Proto::ToDF.new(input: @relation, column_names: names.flatten.map(&:to_s)))
end

#to_h_arrayArray<Hash>

Returns all rows as Hashes.

Returns:

  • (Array<Hash>)

    all rows as Hashes.



713
714
715
# File 'lib/spark_connect/data_frame.rb', line 713

def to_h_array
  collect.map(&:to_h)
end

#to_json(*_args) ⇒ DataFrame Also known as: toJSON

Returns a single-column (‘value`) DataFrame of each row encoded as a JSON string.

Returns:

  • (DataFrame)

    a single-column (‘value`) DataFrame of each row encoded as a JSON string.



502
503
504
# File 'lib/spark_connect/data_frame.rb', line 502

def to_json(*_args)
  select(Functions.to_json(Functions.struct(Functions.col("*"))).alias("value"))
end

#to_local_iteratorEnumerator<Row> Also known as: toLocalIterator

Returns an enumerator over all rows.

Returns:

  • (Enumerator<Row>)

    an enumerator over all rows.



642
643
644
# File 'lib/spark_connect/data_frame.rb', line 642

def to_local_iterator
  collect.each
end

#to_sObject Also known as: inspect



767
768
769
# File 'lib/spark_connect/data_frame.rb', line 767

def to_s
  "#<SparkConnect::DataFrame>"
end

#transform {|df| ... } ⇒ DataFrame

Apply a function to this DataFrame and return its result, enabling a fluent chain of custom transformations.

Yield Parameters:

Returns:



525
526
527
# File 'lib/spark_connect/data_frame.rb', line 525

def transform
  yield(self)
end

#union(other) ⇒ DataFrame Also known as: union_all, unionAll

Union (by position; keeps duplicates - equivalent to Spark’s ‘unionAll`).

Returns:



285
286
287
# File 'lib/spark_connect/data_frame.rb', line 285

def union(other)
  set_op(other, :SET_OP_TYPE_UNION, is_all: true)
end

#union_by_name(other, allow_missing_columns: false) ⇒ DataFrame Also known as: unionByName

Union by column name.

Parameters:

  • other (DataFrame)
  • allow_missing_columns (Boolean) (defaults to: false)

Returns:



296
297
298
# File 'lib/spark_connect/data_frame.rb', line 296

def union_by_name(other, allow_missing_columns: false)
  set_op(other, :SET_OP_TYPE_UNION, is_all: true, by_name: true, allow_missing_columns: allow_missing_columns)
end

#unpivot(ids, values, variable_column_name, value_column_name) ⇒ DataFrame Also known as: melt

Unpivot (melt) columns from wide to long format.

Parameters:

  • ids (Array<Column, String>)

    identifier columns.

  • values (Array<Column, String>, nil)

    value columns (nil = all others).

  • variable_column_name (String)
  • value_column_name (String)

Returns:



418
419
420
421
422
423
424
425
426
427
# File 'lib/spark_connect/data_frame.rb', line 418

def unpivot(ids, values, variable_column_name, value_column_name)
  u = Proto::Unpivot.new(
    input: @relation,
    ids: normalize_columns(Array(ids)).map(&:to_expr),
    variable_column_name: variable_column_name,
    value_column_name: value_column_name
  )
  u.values = Proto::Unpivot::Values.new(values: normalize_columns(Array(values)).map(&:to_expr)) unless values.nil?
  build(unpivot: u)
end

#with_column(name, col) ⇒ DataFrame Also known as: withColumn

Add or replace a single column.

Parameters:

Returns:



100
101
102
# File 'lib/spark_connect/data_frame.rb', line 100

def with_column(name, col)
  with_columns(name => col)
end

#with_column_renamed(existing, new_name) ⇒ DataFrame Also known as: withColumnRenamed

Rename a single column.

Returns:



119
120
121
# File 'lib/spark_connect/data_frame.rb', line 119

def with_column_renamed(existing, new_name)
  with_columns_renamed(existing => new_name)
end

#with_columns(assigns) ⇒ DataFrame Also known as: withColumns

Add or replace multiple columns.

Parameters:

  • assigns (Hash{String=>Column})

Returns:



109
110
111
112
113
114
# File 'lib/spark_connect/data_frame.rb', line 109

def with_columns(assigns)
  aliases = assigns.map do |name, col|
    Proto::Expression::Alias.new(expr: Column.to_col(col).to_expr, name: [name.to_s])
  end
  build(with_columns: Proto::WithColumns.new(input: @relation, aliases: aliases))
end

#with_columns_renamed(renames) ⇒ DataFrame Also known as: withColumnsRenamed

Rename multiple columns.

Parameters:

  • renames (Hash{String=>String})

Returns:



128
129
130
131
132
133
# File 'lib/spark_connect/data_frame.rb', line 128

def with_columns_renamed(renames)
  pairs = renames.map do |old, new_name|
    Proto::WithColumnsRenamed::Rename.new(col_name: old.to_s, new_col_name: new_name.to_s)
  end
  build(with_columns_renamed: Proto::WithColumnsRenamed.new(input: @relation, renames: pairs))
end

#with_watermark(event_time, delay_threshold) ⇒ DataFrame Also known as: withWatermark

Define an event-time watermark for late-data handling on a streaming DataFrame.

Parameters:

  • event_time (String)

    the event-time column name.

  • delay_threshold (String)

    e.g. ‘“10 minutes”`.

Returns:



513
514
515
516
517
# File 'lib/spark_connect/data_frame.rb', line 513

def with_watermark(event_time, delay_threshold)
  build(with_watermark: Proto::WithWatermark.new(
    input: @relation, event_time: event_time.to_s, delay_threshold: delay_threshold.to_s
  ))
end

#writeDataFrameWriter

Returns interface for saving this DataFrame.

Returns:



443
444
445
# File 'lib/spark_connect/data_frame.rb', line 443

def write
  DataFrameWriter.new(self)
end

#write_streamDataStreamWriter Also known as: writeStream

Returns interface for starting a streaming query from this (streaming) DataFrame.

Returns:

  • (DataStreamWriter)

    interface for starting a streaming query from this (streaming) DataFrame.



455
456
457
# File 'lib/spark_connect/data_frame.rb', line 455

def write_stream
  DataStreamWriter.new(self)
end

#write_to(table) ⇒ DataFrameWriterV2 Also known as: writeTo

Returns the v2 (catalog) write interface.

Returns:



448
449
450
# File 'lib/spark_connect/data_frame.rb', line 448

def write_to(table)
  DataFrameWriterV2.new(self, table)
end