Class: SparkConnect::DataFrame

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/data_frame.rb,
lib/spark_connect/stat_functions.rb

Overview

Reopen DataFrame to add the ‘describe`/`summary` actions, which are naturally statistical and share the Stat* relations.

Constant Summary collapse

Proto =
SparkConnect::Proto
JOIN_TYPES =
{
  inner: :JOIN_TYPE_INNER,
  cross: :JOIN_TYPE_CROSS,
  outer: :JOIN_TYPE_FULL_OUTER,
  full: :JOIN_TYPE_FULL_OUTER,
  fullouter: :JOIN_TYPE_FULL_OUTER,
  full_outer: :JOIN_TYPE_FULL_OUTER,
  left: :JOIN_TYPE_LEFT_OUTER,
  leftouter: :JOIN_TYPE_LEFT_OUTER,
  left_outer: :JOIN_TYPE_LEFT_OUTER,
  right: :JOIN_TYPE_RIGHT_OUTER,
  rightouter: :JOIN_TYPE_RIGHT_OUTER,
  right_outer: :JOIN_TYPE_RIGHT_OUTER,
  semi: :JOIN_TYPE_LEFT_SEMI,
  leftsemi: :JOIN_TYPE_LEFT_SEMI,
  left_semi: :JOIN_TYPE_LEFT_SEMI,
  anti: :JOIN_TYPE_LEFT_ANTI,
  leftanti: :JOIN_TYPE_LEFT_ANTI,
  left_anti: :JOIN_TYPE_LEFT_ANTI,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, relation) ⇒ DataFrame

Returns a new instance of DataFrame.

Parameters:



53
54
55
56
# File 'lib/spark_connect/data_frame.rb', line 53

def initialize(session, relation)
  @session = session
  @relation = relation
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object

Allows ‘df.column_name` for valid identifier column names.



621
622
623
624
625
626
627
# File 'lib/spark_connect/data_frame.rb', line 621

def method_missing(name, *args)
  if args.empty? && columns.include?(name.to_s)
    Functions.col(name.to_s)
  else
    super
  end
end

Instance Attribute Details

#relationSpark::Connect::Relation (readonly)

Returns the logical plan this DataFrame builds.

Returns:



49
50
51
# File 'lib/spark_connect/data_frame.rb', line 49

def relation
  @relation
end

#sessionSparkSession (readonly)

Returns:



47
48
49
# File 'lib/spark_connect/data_frame.rb', line 47

def session
  @session
end

Instance Method Details

#[](key) ⇒ Column

Index into a column by name (‘df`) or position (`df`).

Parameters:

  • key (String, Symbol, Integer)

Returns:



613
614
615
616
617
618
# File 'lib/spark_connect/data_frame.rb', line 613

def [](key)
  case key
  when Integer then Functions.col(columns[key])
  else Functions.col(key.to_s)
  end
end

#agg(*exprs) ⇒ DataFrame

Aggregate over the whole DataFrame (a group-by with no grouping columns).

Parameters:

Returns:



265
266
267
# File 'lib/spark_connect/data_frame.rb', line 265

def agg(*exprs)
  group_by.agg(*exprs)
end

#alias(name) ⇒ DataFrame Also known as: as

Alias this DataFrame (a subquery alias usable in join conditions).

Returns:



412
413
414
# File 'lib/spark_connect/data_frame.rb', line 412

def alias(name)
  build(subquery_alias: Proto::SubqueryAlias.new(input: @relation, alias: name.to_s))
end

#build(**rel) ⇒ Object



791
792
793
# File 'lib/spark_connect/data_frame.rb', line 791

def build(**rel)
  DataFrame.new(@session, PlanBuilder.relation(@session, **rel))
end

#checkpoint(eager: true) ⇒ DataFrame

Eagerly checkpoint this DataFrame: materialise it server-side and return a new DataFrame backed by the cached result (truncates the logical plan).

Parameters:

  • eager (Boolean) (defaults to: true)

    materialise immediately.

Returns:



551
552
553
# File 'lib/spark_connect/data_frame.rb', line 551

def checkpoint(eager: true)
  checkpoint_command(local: false, eager: eager)
end

#coalesce(num_partitions) ⇒ DataFrame

Reduce to ‘num_partitions` without a full shuffle.

Returns:



364
365
366
# File 'lib/spark_connect/data_frame.rb', line 364

def coalesce(num_partitions)
  build(repartition: Proto::Repartition.new(input: @relation, num_partitions: num_partitions, shuffle: false))
end

#col_regex(regex) ⇒ Column Also known as: colRegex

Select columns by a regular expression matched against their names.

Parameters:

  • regex (String)

Returns:



512
513
514
# File 'lib/spark_connect/data_frame.rb', line 512

def col_regex(regex)
  Column.new(Proto::Expression.new(unresolved_regex: Proto::Expression::UnresolvedRegex.new(col_name: regex.to_s)))
end

#collectArray<Row> Also known as: to_a

Execute the plan and return all rows.

Returns:



641
642
643
644
# File 'lib/spark_connect/data_frame.rb', line 641

def collect
  result = @session.client.execute_plan(@relation)
  ArrowConverter.to_rows(result.arrow_batches)
end

#column_objectsArray<Column>

Returns one Column per output column.

Returns:



598
599
600
# File 'lib/spark_connect/data_frame.rb', line 598

def column_objects
  columns.map { |c| Functions.col(c) }
end

#columnsArray<String>

Returns column names.

Returns:

  • (Array<String>)

    column names.



588
589
590
# File 'lib/spark_connect/data_frame.rb', line 588

def columns
  schema.names
end

#countInteger

Returns the number of rows.

Returns:

  • (Integer)

    the number of rows.



682
683
684
685
686
687
688
689
690
691
# File 'lib/spark_connect/data_frame.rb', line 682

def count
  df = build(aggregate: Proto::Aggregate.new(
    input: @relation,
    group_type: :GROUP_TYPE_GROUPBY,
    grouping_expressions: [],
    aggregate_expressions: [Column.invoke("count", Column.lit(1)).to_expr]
  ))
  row = df.collect.first
  row ? row[0] : 0
end

#create_global_temp_view(name) ⇒ void Also known as: createGlobalTempView

This method returns an undefined value.

Register this DataFrame as a global (cross-session) temporary view.



496
497
498
# File 'lib/spark_connect/data_frame.rb', line 496

def create_global_temp_view(name)
  register_view(name, global: true, replace: false)
end

#create_or_replace_global_temp_view(name) ⇒ void Also known as: createOrReplaceGlobalTempView

This method returns an undefined value.

Register (or replace) this DataFrame as a global temporary view.



503
504
505
# File 'lib/spark_connect/data_frame.rb', line 503

def create_or_replace_global_temp_view(name)
  register_view(name, global: true, replace: true)
end

#create_or_replace_temp_view(name) ⇒ void Also known as: createOrReplaceTempView

This method returns an undefined value.

Register (or replace) this DataFrame as a session-scoped temporary view.



489
490
491
# File 'lib/spark_connect/data_frame.rb', line 489

def create_or_replace_temp_view(name)
  register_view(name, global: false, replace: true)
end

#create_temp_view(name) ⇒ void Also known as: createTempView

This method returns an undefined value.

Register this DataFrame as a session-scoped temporary view, failing if a view of the same name already exists.



482
483
484
# File 'lib/spark_connect/data_frame.rb', line 482

def create_temp_view(name)
  register_view(name, global: false, replace: false)
end

#cross_join(other) ⇒ DataFrame Also known as: crossJoin

Cartesian product with another DataFrame.

Returns:



293
294
295
# File 'lib/spark_connect/data_frame.rb', line 293

def cross_join(other)
  build(join: Proto::Join.new(left: @relation, right: other.relation, join_type: :JOIN_TYPE_CROSS))
end

#cube(*cols) ⇒ GroupedData

Multi-dimensional cube.

Returns:



257
258
259
# File 'lib/spark_connect/data_frame.rb', line 257

def cube(*cols)
  GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_CUBE)
end

#describe(*cols) ⇒ DataFrame

Basic descriptive statistics (count, mean, stddev, min, max) per column.

Parameters:

  • cols (Array<String>)

    columns to describe (all when empty).

Returns:



96
97
98
# File 'lib/spark_connect/stat_functions.rb', line 96

def describe(*cols)
  build(describe: Proto::StatDescribe.new(input: @relation, cols: cols.flatten.map(&:to_s)))
end

#distinctDataFrame

Distinct rows.

Returns:



167
168
169
# File 'lib/spark_connect/data_frame.rb', line 167

def distinct
  build(deduplicate: Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true))
end

#drop(*cols) ⇒ DataFrame

Drop one or more columns (by name or Column).

Returns:



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/spark_connect/data_frame.rb', line 138

def drop(*cols)
  names = []
  columns = []
  cols.flatten.each do |c|
    case c
    when Column then columns << c.to_expr
    else names << c.to_s
    end
  end
  build(drop: Proto::Drop.new(input: @relation, columns: columns, column_names: names))
end

#drop_duplicates(subset = nil) ⇒ DataFrame Also known as: dropDuplicates

Drop duplicate rows, optionally restricted to a subset of columns.

Parameters:

  • subset (Array<String>, nil) (defaults to: nil)

Returns:



175
176
177
178
179
180
181
182
183
# File 'lib/spark_connect/data_frame.rb', line 175

def drop_duplicates(subset = nil)
  dedup =
    if subset.nil? || subset.empty?
      Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true)
    else
      Proto::Deduplicate.new(input: @relation, column_names: Array(subset).map(&:to_s))
    end
  build(deduplicate: dedup)
end

#drop_duplicates_within_watermark(subset = nil) ⇒ DataFrame Also known as: dropDuplicatesWithinWatermark

Drop duplicate rows within the event-time watermark, optionally restricted to a subset of columns. Unlike #drop_duplicates, this is watermark-aware and is intended for streaming DataFrames (mirrors PySpark’s ‘dropDuplicatesWithinWatermark`).

Parameters:

  • subset (Array<String>, nil) (defaults to: nil)

Returns:



193
194
195
196
197
198
199
200
201
# File 'lib/spark_connect/data_frame.rb', line 193

def drop_duplicates_within_watermark(subset = nil)
  dedup =
    if subset.nil? || subset.empty?
      Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true, within_watermark: true)
    else
      Proto::Deduplicate.new(input: @relation, column_names: Array(subset).map(&:to_s), within_watermark: true)
    end
  build(deduplicate: dedup)
end

#dtypesArray<Array(String, String)>

Returns (name, simpleString-type) pairs.

Returns:

  • (Array<Array(String, String)>)

    (name, simpleString-type) pairs.



593
594
595
# File 'lib/spark_connect/data_frame.rb', line 593

def dtypes
  schema.fields.map { |f| [f.name, f.data_type.simple_string] }
end

#each {|row| ... } ⇒ Enumerator, void

Iterate over all rows (materialises the result). Returns an Enumerator when no block is given.

Yield Parameters:

Returns:

  • (Enumerator, void)


652
653
654
655
656
# File 'lib/spark_connect/data_frame.rb', line 652

def each(&block)
  return collect.each unless block

  collect.each(&block)
end

#empty?Boolean Also known as: is_empty

Returns whether the DataFrame has no rows.

Returns:

  • (Boolean)

    whether the DataFrame has no rows.



694
695
696
# File 'lib/spark_connect/data_frame.rb', line 694

def empty?
  limit(1).collect.empty?
end

#except_all(other) ⇒ DataFrame Also known as: exceptAll

Rows in this DataFrame not in ‘other`, keeping duplicates - Spark’s ‘EXCEPT ALL`.

Returns:



333
334
335
# File 'lib/spark_connect/data_frame.rb', line 333

def except_all(other)
  set_op(other, :SET_OP_TYPE_EXCEPT, is_all: true)
end

#explain(mode = :simple) ⇒ void

This method returns an undefined value.

Print the query plan.



747
748
749
# File 'lib/spark_connect/data_frame.rb', line 747

def explain(mode = :simple)
  $stdout.puts(explain_string(mode))
end

#explain_string(mode = :simple) ⇒ String

Return the query plan as a string.

Parameters:

  • mode (Symbol) (defaults to: :simple)

    ‘:simple`, `:extended`, `:codegen`, `:cost`, `:formatted`.

Returns:

  • (String)


740
741
742
743
# File 'lib/spark_connect/data_frame.rb', line 740

def explain_string(mode = :simple)
  em = :"EXPLAIN_MODE_#{mode.to_s.upcase}"
  analyze(explain: Proto::AnalyzePlanRequest::Explain.new(plan: plan, explain_mode: em)).explain.explain_string
end

#filter(condition) ⇒ DataFrame Also known as: where

Filter rows by a condition.

Parameters:

  • condition (Column, String)

    a boolean column or SQL expression string.

Returns:



87
88
89
90
# File 'lib/spark_connect/data_frame.rb', line 87

def filter(condition)
  cond = condition.is_a?(String) ? Functions.expr(condition) : condition
  build(filter: Proto::Filter.new(input: @relation, condition: cond.to_expr))
end

#firstRow?

Returns the first row.

Returns:

  • (Row, nil)

    the first row.



677
678
679
# File 'lib/spark_connect/data_frame.rb', line 677

def first
  take(1).first
end

#group_by(*cols) ⇒ GroupedData Also known as: groupBy, groupby

Group by the given columns.

Parameters:

  • cols (Array<Column, String>)

Returns:



243
244
245
# File 'lib/spark_connect/data_frame.rb', line 243

def group_by(*cols)
  GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_GROUPBY)
end

#head(n = nil) ⇒ Array<Row>, Row

Returns ‘n` rows (array) or the single first row when called with no arg.

Returns:

  • (Array<Row>, Row)

    ‘n` rows (array) or the single first row when called with no arg.



670
671
672
673
674
# File 'lib/spark_connect/data_frame.rb', line 670

def head(n = nil)
  return first if n.nil?

  take(n)
end

#hint(name, *params) ⇒ DataFrame

Attach a planner hint (e.g. ‘“broadcast”`).

Parameters:

  • name (String)
  • params (Array)

Returns:



422
423
424
425
426
# File 'lib/spark_connect/data_frame.rb', line 422

def hint(name, *params)
  h = Proto::Hint.new(input: @relation, name: name.to_s,
                      parameters: params.map { |p| Column.to_col(p).to_expr })
  build(hint: h)
end

#input_filesArray<String>

Returns the input files backing this DataFrame.

Returns:

  • (Array<String>)

    the input files backing this DataFrame.



752
753
754
# File 'lib/spark_connect/data_frame.rb', line 752

def input_files
  analyze(input_files: Proto::AnalyzePlanRequest::InputFiles.new(plan: plan)).input_files.files.to_a
end

#intersect(other) ⇒ DataFrame

Set intersection (distinct).

Returns:



320
321
322
# File 'lib/spark_connect/data_frame.rb', line 320

def intersect(other)
  set_op(other, :SET_OP_TYPE_INTERSECT, is_all: false)
end

#intersect_all(other) ⇒ DataFrame Also known as: intersectAll

Set intersection keeping duplicates.

Returns:



326
327
328
# File 'lib/spark_connect/data_frame.rb', line 326

def intersect_all(other)
  set_op(other, :SET_OP_TYPE_INTERSECT, is_all: true)
end

#join(other, on: nil, how: :inner) ⇒ DataFrame

Join with another DataFrame.

Parameters:

  • other (DataFrame)
  • on (String, Array<String>, Column, nil) (defaults to: nil)

    join key column name(s) or a boolean join condition.

  • how (Symbol, String) (defaults to: :inner)

    join type (see JOIN_TYPES).

Returns:



278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/spark_connect/data_frame.rb', line 278

def join(other, on: nil, how: :inner)
  jt = JOIN_TYPES[how.to_s.downcase.to_sym] or
    raise IllegalArgumentError, "Unsupported join type: #{how}"
  j = Proto::Join.new(left: @relation, right: other.relation, join_type: jt)
  case on
  when nil then nil
  when Column then j.join_condition = on.to_expr
  when Array then j.using_columns += on.map(&:to_s)
  else j.using_columns << on.to_s
  end
  build(join: j)
end

#limit(n) ⇒ DataFrame

Returns the first ‘n` rows.

Returns:



228
229
230
# File 'lib/spark_connect/data_frame.rb', line 228

def limit(n)
  build(limit: Proto::Limit.new(input: @relation, limit: n))
end

#local?Boolean

Returns whether the data is small enough to be local.

Returns:

  • (Boolean)

    whether the data is small enough to be local.



757
758
759
# File 'lib/spark_connect/data_frame.rb', line 757

def local?
  analyze(is_local: Proto::AnalyzePlanRequest::IsLocal.new(plan: plan)).is_local.is_local
end

#local_checkpoint(eager: true) ⇒ DataFrame Also known as: localCheckpoint

Like #checkpoint but uses the executors’ local storage (no reliable storage), which is faster but not fault-tolerant.

Parameters:

  • eager (Boolean) (defaults to: true)

Returns:



560
561
562
# File 'lib/spark_connect/data_frame.rb', line 560

def local_checkpoint(eager: true)
  checkpoint_command(local: true, eager: eager)
end

#naDataFrameNaFunctions

Returns missing-data helpers (‘drop`, `fill`, `replace`).

Returns:



450
451
452
# File 'lib/spark_connect/data_frame.rb', line 450

def na
  DataFrameNaFunctions.new(self)
end

#observe(name, *exprs) ⇒ DataFrame

Observe named metrics over this DataFrame.

Parameters:

Returns:



570
571
572
573
574
575
576
577
578
# File 'lib/spark_connect/data_frame.rb', line 570

def observe(name, *exprs)
  obs_name = name.is_a?(Observation) ? name.name : name.to_s
  cm = Proto::CollectMetrics.new(
    input: @relation, name: obs_name, metrics: exprs.map { |e| Column.to_col(e).to_expr }
  )
  df = build(collect_metrics: cm)
  name.bind(df) if name.is_a?(Observation)
  df
end

#offset(n) ⇒ DataFrame

Returns all rows except the first ‘n`.

Returns:

  • (DataFrame)

    all rows except the first ‘n`.



233
234
235
# File 'lib/spark_connect/data_frame.rb', line 233

def offset(n)
  build(offset: Proto::Offset.new(input: @relation, offset: n))
end

#order_by(*cols) ⇒ DataFrame Also known as: sort, orderBy

Sort by the given columns (globally).

Parameters:

  • cols (Array<Column, String>)

Returns:



210
211
212
213
# File 'lib/spark_connect/data_frame.rb', line 210

def order_by(*cols)
  orders = normalize_columns(cols).map { |c| to_sort_order(c) }
  build(sort: Proto::Sort.new(input: @relation, order: orders, is_global: true))
end

#planSpark::Connect::Plan



780
781
782
# File 'lib/spark_connect/data_frame.rb', line 780

def plan
  PlanBuilder.root_plan(@relation)
end

This method returns an undefined value.

Print the schema as an indented tree to ‘io`.



604
605
606
# File 'lib/spark_connect/data_frame.rb', line 604

def print_schema(io = $stdout)
  io.puts(schema.tree_string)
end

#repartition(num_partitions, *cols) ⇒ DataFrame

Repartition into ‘num_partitions`, optionally hash-partitioned by columns.

Parameters:

  • num_partitions (Integer)
  • cols (Array<Column, String>)

Returns:



351
352
353
354
355
356
357
358
359
360
# File 'lib/spark_connect/data_frame.rb', line 351

def repartition(num_partitions, *cols)
  if cols.empty?
    build(repartition: Proto::Repartition.new(input: @relation, num_partitions: num_partitions, shuffle: true))
  else
    rbe = Proto::RepartitionByExpression.new(
      input: @relation, partition_exprs: normalize_columns(cols).map(&:to_expr), num_partitions: num_partitions
    )
    build(repartition_by_expression: rbe)
  end
end

#repartition_by_range(*cols) ⇒ DataFrame #repartition_by_range(num_partitions, *cols) ⇒ DataFrame Also known as: repartitionByRange

Range-partition by the given columns (rows are range-partitioned on the sort order of the columns).

Returns:



374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/spark_connect/data_frame.rb', line 374

def repartition_by_range(*args)
  num_partitions = args.first.is_a?(Integer) ? args.shift : nil
  orders = normalize_columns(args).map do |c|
    expr = c.to_expr
    if expr.expr_type == :sort_order
      expr
    else
      Proto::Expression.new(sort_order: Proto::Expression::SortOrder.new(
        child: expr, direction: :SORT_DIRECTION_ASCENDING, null_ordering: :SORT_NULLS_FIRST
      ))
    end
  end
  rbe = Proto::RepartitionByExpression.new(input: @relation, partition_exprs: orders)
  rbe.num_partitions = num_partitions if num_partitions
  build(repartition_by_expression: rbe)
end

#respond_to_missing?(name, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


629
630
631
632
633
634
635
# File 'lib/spark_connect/data_frame.rb', line 629

def respond_to_missing?(name, include_private = false)
  begin
    columns.include?(name.to_s)
  rescue StandardError
    false
  end || super
end

#rollup(*cols) ⇒ GroupedData

Multi-dimensional rollup.

Returns:



251
252
253
# File 'lib/spark_connect/data_frame.rb', line 251

def rollup(*cols)
  GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_ROLLUP)
end

#same_semantics?(other) ⇒ Boolean

Returns whether ‘other` has the same logical plan.

Returns:

  • (Boolean)

    whether ‘other` has the same logical plan.



767
768
769
770
771
# File 'lib/spark_connect/data_frame.rb', line 767

def same_semantics?(other)
  analyze(same_semantics: Proto::AnalyzePlanRequest::SameSemantics.new(
    target_plan: plan, other_plan: other.plan
  )).same_semantics.result
end

#sample(fraction, with_replacement: false, seed: nil) ⇒ DataFrame

Random sample of rows.

Parameters:

  • fraction (Float)

    expected fraction (0.0..1.0).

  • with_replacement (Boolean) (defaults to: false)
  • seed (Integer, nil) (defaults to: nil)

Returns:



400
401
402
403
404
405
406
# File 'lib/spark_connect/data_frame.rb', line 400

def sample(fraction, with_replacement: false, seed: nil)
  s = Proto::Sample.new(
    input: @relation, lower_bound: 0.0, upper_bound: fraction, with_replacement: with_replacement
  )
  s.seed = seed if seed
  build(sample: s)
end

#schemaTypes::StructType

Returns the DataFrame’s schema.

Returns:



583
584
585
# File 'lib/spark_connect/data_frame.rb', line 583

def schema
  @schema ||= Types.from_proto(analyze(schema: Proto::AnalyzePlanRequest::Schema.new(plan: plan)).schema.schema)
end

#select(*cols) ⇒ DataFrame

Select a set of columns or expressions.

Parameters:

  • cols (Array<Column, String, Symbol>)

Returns:



64
65
66
67
# File 'lib/spark_connect/data_frame.rb', line 64

def select(*cols)
  exprs = normalize_columns(cols).map(&:to_expr)
  build(project: Proto::Project.new(input: @relation, expressions: exprs))
end

#select_expr(*exprs) ⇒ DataFrame Also known as: selectExpr

Select using SQL expression strings.

Parameters:

  • exprs (Array<String>)

Returns:



73
74
75
76
77
78
# File 'lib/spark_connect/data_frame.rb', line 73

def select_expr(*exprs)
  parsed = exprs.flatten.map do |e|
    Proto::Expression.new(expression_string: Proto::Expression::ExpressionString.new(expression: e))
  end
  build(project: Proto::Project.new(input: @relation, expressions: parsed))
end

#semantic_hashInteger

Returns a hash of the logical plan.

Returns:

  • (Integer)

    a hash of the logical plan.



774
775
776
# File 'lib/spark_connect/data_frame.rb', line 774

def semantic_hash
  analyze(semantic_hash: Proto::AnalyzePlanRequest::SemanticHash.new(plan: plan)).semantic_hash.result
end

#show(n = 20, truncate: true, vertical: false) ⇒ void

This method returns an undefined value.

Render the first ‘n` rows as a formatted table.

Parameters:

  • n (Integer) (defaults to: 20)
  • truncate (Boolean, Integer) (defaults to: true)

    truncate long values to 20 chars (true) or to the given width (Integer).

  • vertical (Boolean) (defaults to: false)


706
707
708
# File 'lib/spark_connect/data_frame.rb', line 706

def show(n = 20, truncate: true, vertical: false)
  $stdout.puts(show_string(n, truncate: truncate, vertical: vertical))
end

#show_string(n = 20, truncate: true, vertical: false) ⇒ String

Returns the formatted table string (what #show prints).

Returns:

  • (String)

    the formatted table string (what #show prints).



711
712
713
714
715
716
717
718
719
720
# File 'lib/spark_connect/data_frame.rb', line 711

def show_string(n = 20, truncate: true, vertical: false)
  trunc = if truncate == true
            20
          else
            (truncate == false ? 0 : truncate.to_i)
          end
  ss = Proto::ShowString.new(input: @relation, num_rows: n, truncate: trunc, vertical: vertical)
  df = build(show_string: ss)
  df.collect.first&.[](0).to_s
end

#sort_within_partitions(*cols) ⇒ DataFrame Also known as: sortWithinPartitions

Sort within each partition (no global shuffle).

Returns:



219
220
221
222
# File 'lib/spark_connect/data_frame.rb', line 219

def sort_within_partitions(*cols)
  orders = normalize_columns(cols).map { |c| to_sort_order(c) }
  build(sort: Proto::Sort.new(input: @relation, order: orders, is_global: false))
end

#statDataFrameStatFunctions

Returns statistical helpers.

Returns:



455
456
457
# File 'lib/spark_connect/data_frame.rb', line 455

def stat
  DataFrameStatFunctions.new(self)
end

#streaming?Boolean

Returns whether this is a streaming DataFrame.

Returns:

  • (Boolean)

    whether this is a streaming DataFrame.



762
763
764
# File 'lib/spark_connect/data_frame.rb', line 762

def streaming?
  analyze(is_streaming: Proto::AnalyzePlanRequest::IsStreaming.new(plan: plan)).is_streaming.is_streaming
end

#subtract(other) ⇒ DataFrame

Rows in this DataFrame not in ‘other` (distinct) - Spark’s ‘EXCEPT`.

Returns:



340
341
342
# File 'lib/spark_connect/data_frame.rb', line 340

def subtract(other)
  set_op(other, :SET_OP_TYPE_EXCEPT, is_all: false)
end

#summary(*statistics) ⇒ DataFrame

Configurable summary statistics.

Parameters:

  • statistics (Array<String>)

    e.g. ‘“count”`, `“mean”`, `“stddev”`, `“min”`, `“25%”`, `“50%”`, `“75%”`, `“max”`.

Returns:



105
106
107
# File 'lib/spark_connect/stat_functions.rb', line 105

def summary(*statistics)
  build(summary: Proto::StatSummary.new(input: @relation, statistics: statistics.flatten.map(&:to_s)))
end

#take(n) ⇒ Array<Row>

Returns the first ‘n` rows.

Returns:

  • (Array<Row>)

    the first ‘n` rows.



665
666
667
# File 'lib/spark_connect/data_frame.rb', line 665

def take(n)
  limit(n).collect
end

#to(schema) ⇒ DataFrame

Apply a Types::StructType (reconciling/casting columns to it).

Returns:



159
160
161
# File 'lib/spark_connect/data_frame.rb', line 159

def to(schema)
  build(to_schema: Proto::ToSchema.new(input: @relation, schema: schema.to_proto))
end

#to_arrowArrow::Table?

Materialise the result as an Arrow Arrow::Table (columnar).

Returns:

  • (Arrow::Table, nil)


724
725
726
727
# File 'lib/spark_connect/data_frame.rb', line 724

def to_arrow
  result = @session.client.execute_plan(@relation)
  ArrowConverter.to_table(result.arrow_batches)
end

#to_df(*names) ⇒ DataFrame Also known as: toDF

Rename all columns positionally.

Returns:



152
153
154
# File 'lib/spark_connect/data_frame.rb', line 152

def to_df(*names)
  build(to_df: Proto::ToDF.new(input: @relation, column_names: names.flatten.map(&:to_s)))
end

#to_h_arrayArray<Hash>

Returns all rows as Hashes.

Returns:

  • (Array<Hash>)

    all rows as Hashes.



730
731
732
# File 'lib/spark_connect/data_frame.rb', line 730

def to_h_array
  collect.map(&:to_h)
end

#to_json(*_args) ⇒ DataFrame Also known as: toJSON

Returns a single-column (‘value`) DataFrame of each row encoded as a JSON string.

Returns:

  • (DataFrame)

    a single-column (‘value`) DataFrame of each row encoded as a JSON string.



519
520
521
# File 'lib/spark_connect/data_frame.rb', line 519

def to_json(*_args)
  select(Functions.to_json(Functions.struct(Functions.col("*"))).alias("value"))
end

#to_local_iteratorEnumerator<Row> Also known as: toLocalIterator

Returns an enumerator over all rows.

Returns:

  • (Enumerator<Row>)

    an enumerator over all rows.



659
660
661
# File 'lib/spark_connect/data_frame.rb', line 659

def to_local_iterator
  collect.each
end

#to_sObject Also known as: inspect



784
785
786
# File 'lib/spark_connect/data_frame.rb', line 784

def to_s
  "#<SparkConnect::DataFrame>"
end

#transform {|df| ... } ⇒ DataFrame

Apply a function to this DataFrame and return its result, enabling a fluent chain of custom transformations.

Yield Parameters:

Returns:



542
543
544
# File 'lib/spark_connect/data_frame.rb', line 542

def transform
  yield(self)
end

#union(other) ⇒ DataFrame Also known as: union_all, unionAll

Union (by position; keeps duplicates - equivalent to Spark’s ‘unionAll`).

Returns:



302
303
304
# File 'lib/spark_connect/data_frame.rb', line 302

def union(other)
  set_op(other, :SET_OP_TYPE_UNION, is_all: true)
end

#union_by_name(other, allow_missing_columns: false) ⇒ DataFrame Also known as: unionByName

Union by column name.

Parameters:

  • other (DataFrame)
  • allow_missing_columns (Boolean) (defaults to: false)

Returns:



313
314
315
# File 'lib/spark_connect/data_frame.rb', line 313

def union_by_name(other, allow_missing_columns: false)
  set_op(other, :SET_OP_TYPE_UNION, is_all: true, by_name: true, allow_missing_columns: allow_missing_columns)
end

#unpivot(ids, values, variable_column_name, value_column_name) ⇒ DataFrame Also known as: melt

Unpivot (melt) columns from wide to long format.

Parameters:

  • ids (Array<Column, String>)

    identifier columns.

  • values (Array<Column, String>, nil)

    value columns (nil = all others).

  • variable_column_name (String)
  • value_column_name (String)

Returns:



435
436
437
438
439
440
441
442
443
444
# File 'lib/spark_connect/data_frame.rb', line 435

def unpivot(ids, values, variable_column_name, value_column_name)
  u = Proto::Unpivot.new(
    input: @relation,
    ids: normalize_columns(Array(ids)).map(&:to_expr),
    variable_column_name: variable_column_name,
    value_column_name: value_column_name
  )
  u.values = Proto::Unpivot::Values.new(values: normalize_columns(Array(values)).map(&:to_expr)) unless values.nil?
  build(unpivot: u)
end

#with_column(name, col) ⇒ DataFrame Also known as: withColumn

Add or replace a single column.

Parameters:

Returns:



100
101
102
# File 'lib/spark_connect/data_frame.rb', line 100

def with_column(name, col)
  with_columns(name => col)
end

#with_column_renamed(existing, new_name) ⇒ DataFrame Also known as: withColumnRenamed

Rename a single column.

Returns:



119
120
121
# File 'lib/spark_connect/data_frame.rb', line 119

def with_column_renamed(existing, new_name)
  with_columns_renamed(existing => new_name)
end

#with_columns(assigns) ⇒ DataFrame Also known as: withColumns

Add or replace multiple columns.

Parameters:

  • assigns (Hash{String=>Column})

Returns:



109
110
111
112
113
114
# File 'lib/spark_connect/data_frame.rb', line 109

def with_columns(assigns)
  aliases = assigns.map do |name, col|
    Proto::Expression::Alias.new(expr: Column.to_col(col).to_expr, name: [name.to_s])
  end
  build(with_columns: Proto::WithColumns.new(input: @relation, aliases: aliases))
end

#with_columns_renamed(renames) ⇒ DataFrame Also known as: withColumnsRenamed

Rename multiple columns.

Parameters:

  • renames (Hash{String=>String})

Returns:



128
129
130
131
132
133
# File 'lib/spark_connect/data_frame.rb', line 128

def with_columns_renamed(renames)
  pairs = renames.map do |old, new_name|
    Proto::WithColumnsRenamed::Rename.new(col_name: old.to_s, new_col_name: new_name.to_s)
  end
  build(with_columns_renamed: Proto::WithColumnsRenamed.new(input: @relation, renames: pairs))
end

#with_watermark(event_time, delay_threshold) ⇒ DataFrame Also known as: withWatermark

Define an event-time watermark for late-data handling on a streaming DataFrame.

Parameters:

  • event_time (String)

    the event-time column name.

  • delay_threshold (String)

    e.g. ‘“10 minutes”`.

Returns:



530
531
532
533
534
# File 'lib/spark_connect/data_frame.rb', line 530

def with_watermark(event_time, delay_threshold)
  build(with_watermark: Proto::WithWatermark.new(
    input: @relation, event_time: event_time.to_s, delay_threshold: delay_threshold.to_s
  ))
end

#writeDataFrameWriter

Returns interface for saving this DataFrame.

Returns:



460
461
462
# File 'lib/spark_connect/data_frame.rb', line 460

def write
  DataFrameWriter.new(self)
end

#write_streamDataStreamWriter Also known as: writeStream

Returns interface for starting a streaming query from this (streaming) DataFrame.

Returns:

  • (DataStreamWriter)

    interface for starting a streaming query from this (streaming) DataFrame.



472
473
474
# File 'lib/spark_connect/data_frame.rb', line 472

def write_stream
  DataStreamWriter.new(self)
end

#write_to(table) ⇒ DataFrameWriterV2 Also known as: writeTo

Returns the v2 (catalog) write interface.

Returns:



465
466
467
# File 'lib/spark_connect/data_frame.rb', line 465

def write_to(table)
  DataFrameWriterV2.new(self, table)
end