Class: SparkConnect::DataFrame
- Inherits:
-
Object
- Object
- SparkConnect::DataFrame
- Defined in:
- lib/spark_connect/data_frame.rb,
lib/spark_connect/stat_functions.rb
Overview
Reopen DataFrame to add the ‘describe`/`summary` actions, which are naturally statistical and share the Stat* relations.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- JOIN_TYPES =
{ inner: :JOIN_TYPE_INNER, cross: :JOIN_TYPE_CROSS, outer: :JOIN_TYPE_FULL_OUTER, full: :JOIN_TYPE_FULL_OUTER, fullouter: :JOIN_TYPE_FULL_OUTER, full_outer: :JOIN_TYPE_FULL_OUTER, left: :JOIN_TYPE_LEFT_OUTER, leftouter: :JOIN_TYPE_LEFT_OUTER, left_outer: :JOIN_TYPE_LEFT_OUTER, right: :JOIN_TYPE_RIGHT_OUTER, rightouter: :JOIN_TYPE_RIGHT_OUTER, right_outer: :JOIN_TYPE_RIGHT_OUTER, semi: :JOIN_TYPE_LEFT_SEMI, leftsemi: :JOIN_TYPE_LEFT_SEMI, left_semi: :JOIN_TYPE_LEFT_SEMI, anti: :JOIN_TYPE_LEFT_ANTI, leftanti: :JOIN_TYPE_LEFT_ANTI, left_anti: :JOIN_TYPE_LEFT_ANTI, }.freeze
Instance Attribute Summary collapse
-
#relation ⇒ Spark::Connect::Relation
readonly
The logical plan this DataFrame builds.
- #session ⇒ SparkSession readonly
Instance Method Summary collapse
- #[](key) ⇒ Column
-
#agg(*exprs) ⇒ DataFrame
Aggregate over the whole DataFrame (a group-by with no grouping columns).
-
#alias(name) ⇒ DataFrame
(also: #as)
Alias this DataFrame (a subquery alias usable in join conditions).
- #build(**rel) ⇒ Object
-
#checkpoint(eager: true) ⇒ DataFrame
Eagerly checkpoint this DataFrame: materialise it server-side and return a new DataFrame backed by the cached result (truncates the logical plan).
-
#coalesce(num_partitions) ⇒ DataFrame
Reduce to ‘num_partitions` without a full shuffle.
-
#col_regex(regex) ⇒ Column
(also: #colRegex)
Select columns by a regular expression matched against their names.
-
#collect ⇒ Array<Row>
(also: #to_a)
Execute the plan and return all rows.
-
#column_objects ⇒ Array<Column>
One Column per output column.
-
#columns ⇒ Array<String>
Column names.
-
#count ⇒ Integer
The number of rows.
-
#create_global_temp_view(name) ⇒ void
(also: #createGlobalTempView)
Register this DataFrame as a global (cross-session) temporary view.
-
#create_or_replace_global_temp_view(name) ⇒ void
(also: #createOrReplaceGlobalTempView)
Register (or replace) this DataFrame as a global temporary view.
-
#create_or_replace_temp_view(name) ⇒ void
(also: #createOrReplaceTempView)
Register (or replace) this DataFrame as a session-scoped temporary view.
-
#create_temp_view(name) ⇒ void
(also: #createTempView)
Register this DataFrame as a session-scoped temporary view, failing if a view of the same name already exists.
-
#cross_join(other) ⇒ DataFrame
(also: #crossJoin)
Cartesian product with another DataFrame.
-
#cube(*cols) ⇒ GroupedData
Multi-dimensional cube.
-
#describe(*cols) ⇒ DataFrame
Basic descriptive statistics (count, mean, stddev, min, max) per column.
-
#distinct ⇒ DataFrame
Distinct rows.
-
#drop(*cols) ⇒ DataFrame
Drop one or more columns (by name or Column).
-
#drop_duplicates(subset = nil) ⇒ DataFrame
(also: #dropDuplicates, #drop_duplicates_within_watermark)
Drop duplicate rows, optionally restricted to a subset of columns.
-
#dtypes ⇒ Array<Array(String, String)>
(name, simpleString-type) pairs.
-
#each {|row| ... } ⇒ Enumerator, void
Iterate over all rows (materialises the result).
-
#empty? ⇒ Boolean
(also: #is_empty)
Whether the DataFrame has no rows.
-
#except_all(other) ⇒ DataFrame
(also: #exceptAll)
Rows in this DataFrame not in ‘other` (distinct).
-
#explain(mode = :simple) ⇒ void
Print the query plan.
-
#explain_string(mode = :simple) ⇒ String
Return the query plan as a string.
-
#filter(condition) ⇒ DataFrame
(also: #where)
Filter rows by a condition.
-
#first ⇒ Row?
The first row.
-
#group_by(*cols) ⇒ GroupedData
(also: #groupBy, #groupby)
Group by the given columns.
-
#head(n = nil) ⇒ Array<Row>, Row
‘n` rows (array) or the single first row when called with no arg.
-
#hint(name, *params) ⇒ DataFrame
Attach a planner hint (e.g. ‘“broadcast”`).
-
#initialize(session, relation) ⇒ DataFrame
constructor
A new instance of DataFrame.
-
#input_files ⇒ Array<String>
The input files backing this DataFrame.
-
#intersect(other) ⇒ DataFrame
Set intersection (distinct).
-
#intersect_all(other) ⇒ DataFrame
(also: #intersectAll)
Set intersection keeping duplicates.
-
#join(other, on: nil, how: :inner) ⇒ DataFrame
Join with another DataFrame.
-
#limit(n) ⇒ DataFrame
The first ‘n` rows.
-
#local? ⇒ Boolean
Whether the data is small enough to be local.
-
#local_checkpoint(eager: true) ⇒ DataFrame
(also: #localCheckpoint)
Like #checkpoint but uses the executors’ local storage (no reliable storage), which is faster but not fault-tolerant.
-
#method_missing(name, *args) ⇒ Object
Allows ‘df.column_name` for valid identifier column names.
-
#na ⇒ DataFrameNaFunctions
Missing-data helpers (‘drop`, `fill`, `replace`).
-
#observe(name, *exprs) ⇒ DataFrame
Observe named metrics over this DataFrame.
-
#offset(n) ⇒ DataFrame
All rows except the first ‘n`.
-
#order_by(*cols) ⇒ DataFrame
(also: #sort, #orderBy)
Sort by the given columns (globally).
- #plan ⇒ Spark::Connect::Plan
-
#print_schema(io = $stdout) ⇒ void
(also: #printSchema)
Print the schema as an indented tree to ‘io`.
-
#repartition(num_partitions, *cols) ⇒ DataFrame
Repartition into ‘num_partitions`, optionally hash-partitioned by columns.
-
#repartition_by_range(*args) ⇒ DataFrame
(also: #repartitionByRange)
Range-partition by the given columns (rows are range-partitioned on the sort order of the columns).
- #respond_to_missing?(name, include_private = false) ⇒ Boolean
-
#rollup(*cols) ⇒ GroupedData
Multi-dimensional rollup.
-
#same_semantics?(other) ⇒ Boolean
Whether ‘other` has the same logical plan.
-
#sample(fraction, with_replacement: false, seed: nil) ⇒ DataFrame
Random sample of rows.
-
#schema ⇒ Types::StructType
The DataFrame’s schema.
-
#select(*cols) ⇒ DataFrame
Select a set of columns or expressions.
-
#select_expr(*exprs) ⇒ DataFrame
(also: #selectExpr)
Select using SQL expression strings.
-
#semantic_hash ⇒ Integer
A hash of the logical plan.
-
#show(n = 20, truncate: true, vertical: false) ⇒ void
Render the first ‘n` rows as a formatted table.
-
#show_string(n = 20, truncate: true, vertical: false) ⇒ String
The formatted table string (what #show prints).
-
#sort_within_partitions(*cols) ⇒ DataFrame
(also: #sortWithinPartitions)
Sort within each partition (no global shuffle).
-
#stat ⇒ DataFrameStatFunctions
Statistical helpers.
-
#streaming? ⇒ Boolean
Whether this is a streaming DataFrame.
-
#subtract(other) ⇒ DataFrame
Rows in this DataFrame not in ‘other` (distinct) - Spark’s ‘EXCEPT`.
-
#summary(*statistics) ⇒ DataFrame
Configurable summary statistics.
-
#take(n) ⇒ Array<Row>
The first ‘n` rows.
-
#to(schema) ⇒ DataFrame
Apply a Types::StructType (reconciling/casting columns to it).
-
#to_arrow ⇒ Arrow::Table?
Materialise the result as an Arrow Arrow::Table (columnar).
-
#to_df(*names) ⇒ DataFrame
(also: #toDF)
Rename all columns positionally.
-
#to_h_array ⇒ Array<Hash>
All rows as Hashes.
-
#to_json(*_args) ⇒ DataFrame
(also: #toJSON)
A single-column (‘value`) DataFrame of each row encoded as a JSON string.
-
#to_local_iterator ⇒ Enumerator<Row>
(also: #toLocalIterator)
An enumerator over all rows.
- #to_s ⇒ Object (also: #inspect)
-
#transform {|df| ... } ⇒ DataFrame
Apply a function to this DataFrame and return its result, enabling a fluent chain of custom transformations.
-
#union(other) ⇒ DataFrame
(also: #union_all, #unionAll)
Union (by position; keeps duplicates - equivalent to Spark’s ‘unionAll`).
-
#union_by_name(other, allow_missing_columns: false) ⇒ DataFrame
(also: #unionByName)
Union by column name.
-
#unpivot(ids, values, variable_column_name, value_column_name) ⇒ DataFrame
(also: #melt)
Unpivot (melt) columns from wide to long format.
-
#with_column(name, col) ⇒ DataFrame
(also: #withColumn)
Add or replace a single column.
-
#with_column_renamed(existing, new_name) ⇒ DataFrame
(also: #withColumnRenamed)
Rename a single column.
-
#with_columns(assigns) ⇒ DataFrame
(also: #withColumns)
Add or replace multiple columns.
-
#with_columns_renamed(renames) ⇒ DataFrame
(also: #withColumnsRenamed)
Rename multiple columns.
-
#with_watermark(event_time, delay_threshold) ⇒ DataFrame
(also: #withWatermark)
Define an event-time watermark for late-data handling on a streaming DataFrame.
-
#write ⇒ DataFrameWriter
Interface for saving this DataFrame.
-
#write_stream ⇒ DataStreamWriter
(also: #writeStream)
Interface for starting a streaming query from this (streaming) DataFrame.
-
#write_to(table) ⇒ DataFrameWriterV2
(also: #writeTo)
The v2 (catalog) write interface.
Constructor Details
#initialize(session, relation) ⇒ DataFrame
Returns a new instance of DataFrame.
53 54 55 56 |
# File 'lib/spark_connect/data_frame.rb', line 53 def initialize(session, relation) @session = session @relation = relation end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
Instance Attribute Details
#relation ⇒ Spark::Connect::Relation (readonly)
Returns the logical plan this DataFrame builds.
49 50 51 |
# File 'lib/spark_connect/data_frame.rb', line 49 def relation @relation end |
#session ⇒ SparkSession (readonly)
47 48 49 |
# File 'lib/spark_connect/data_frame.rb', line 47 def session @session end |
Instance Method Details
#[](key) ⇒ Column
596 597 598 599 600 601 |
# File 'lib/spark_connect/data_frame.rb', line 596 def [](key) case key when Integer then Functions.col(columns[key]) else Functions.col(key.to_s) end end |
#agg(*exprs) ⇒ DataFrame
Aggregate over the whole DataFrame (a group-by with no grouping columns).
248 249 250 |
# File 'lib/spark_connect/data_frame.rb', line 248 def agg(*exprs) group_by.agg(*exprs) end |
#alias(name) ⇒ DataFrame Also known as: as
Alias this DataFrame (a subquery alias usable in join conditions).
395 396 397 |
# File 'lib/spark_connect/data_frame.rb', line 395 def alias(name) build(subquery_alias: Proto::SubqueryAlias.new(input: @relation, alias: name.to_s)) end |
#build(**rel) ⇒ Object
774 775 776 |
# File 'lib/spark_connect/data_frame.rb', line 774 def build(**rel) DataFrame.new(@session, PlanBuilder.relation(@session, **rel)) end |
#checkpoint(eager: true) ⇒ DataFrame
Eagerly checkpoint this DataFrame: materialise it server-side and return a new DataFrame backed by the cached result (truncates the logical plan).
534 535 536 |
# File 'lib/spark_connect/data_frame.rb', line 534 def checkpoint(eager: true) checkpoint_command(local: false, eager: eager) end |
#coalesce(num_partitions) ⇒ DataFrame
Reduce to ‘num_partitions` without a full shuffle.
347 348 349 |
# File 'lib/spark_connect/data_frame.rb', line 347 def coalesce(num_partitions) build(repartition: Proto::Repartition.new(input: @relation, num_partitions: num_partitions, shuffle: false)) end |
#col_regex(regex) ⇒ Column Also known as: colRegex
Select columns by a regular expression matched against their names.
495 496 497 |
# File 'lib/spark_connect/data_frame.rb', line 495 def col_regex(regex) Column.new(Proto::Expression.new(unresolved_regex: Proto::Expression::UnresolvedRegex.new(col_name: regex.to_s))) end |
#collect ⇒ Array<Row> Also known as: to_a
Execute the plan and return all rows.
624 625 626 627 |
# File 'lib/spark_connect/data_frame.rb', line 624 def collect result = @session.client.execute_plan(@relation) ArrowConverter.to_rows(result.arrow_batches) end |
#column_objects ⇒ Array<Column>
Returns one Column per output column.
581 582 583 |
# File 'lib/spark_connect/data_frame.rb', line 581 def column_objects columns.map { |c| Functions.col(c) } end |
#columns ⇒ Array<String>
Returns column names.
571 572 573 |
# File 'lib/spark_connect/data_frame.rb', line 571 def columns schema.names end |
#count ⇒ Integer
Returns the number of rows.
665 666 667 668 669 670 671 672 673 674 |
# File 'lib/spark_connect/data_frame.rb', line 665 def count df = build(aggregate: Proto::Aggregate.new( input: @relation, group_type: :GROUP_TYPE_GROUPBY, grouping_expressions: [], aggregate_expressions: [Column.invoke("count", Column.lit(1)).to_expr] )) row = df.collect.first row ? row[0] : 0 end |
#create_global_temp_view(name) ⇒ void Also known as: createGlobalTempView
This method returns an undefined value.
Register this DataFrame as a global (cross-session) temporary view.
479 480 481 |
# File 'lib/spark_connect/data_frame.rb', line 479 def create_global_temp_view(name) register_view(name, global: true, replace: false) end |
#create_or_replace_global_temp_view(name) ⇒ void Also known as: createOrReplaceGlobalTempView
This method returns an undefined value.
Register (or replace) this DataFrame as a global temporary view.
486 487 488 |
# File 'lib/spark_connect/data_frame.rb', line 486 def create_or_replace_global_temp_view(name) register_view(name, global: true, replace: true) end |
#create_or_replace_temp_view(name) ⇒ void Also known as: createOrReplaceTempView
This method returns an undefined value.
Register (or replace) this DataFrame as a session-scoped temporary view.
472 473 474 |
# File 'lib/spark_connect/data_frame.rb', line 472 def create_or_replace_temp_view(name) register_view(name, global: false, replace: true) end |
#create_temp_view(name) ⇒ void Also known as: createTempView
This method returns an undefined value.
Register this DataFrame as a session-scoped temporary view, failing if a view of the same name already exists.
465 466 467 |
# File 'lib/spark_connect/data_frame.rb', line 465 def create_temp_view(name) register_view(name, global: false, replace: false) end |
#cross_join(other) ⇒ DataFrame Also known as: crossJoin
Cartesian product with another DataFrame.
276 277 278 |
# File 'lib/spark_connect/data_frame.rb', line 276 def cross_join(other) build(join: Proto::Join.new(left: @relation, right: other.relation, join_type: :JOIN_TYPE_CROSS)) end |
#cube(*cols) ⇒ GroupedData
Multi-dimensional cube.
240 241 242 |
# File 'lib/spark_connect/data_frame.rb', line 240 def cube(*cols) GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_CUBE) end |
#describe(*cols) ⇒ DataFrame
Basic descriptive statistics (count, mean, stddev, min, max) per column.
96 97 98 |
# File 'lib/spark_connect/stat_functions.rb', line 96 def describe(*cols) build(describe: Proto::StatDescribe.new(input: @relation, cols: cols.flatten.map(&:to_s))) end |
#distinct ⇒ DataFrame
Distinct rows.
167 168 169 |
# File 'lib/spark_connect/data_frame.rb', line 167 def distinct build(deduplicate: Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true)) end |
#drop(*cols) ⇒ DataFrame
Drop one or more columns (by name or Column).
138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/spark_connect/data_frame.rb', line 138 def drop(*cols) names = [] columns = [] cols.flatten.each do |c| case c when Column then columns << c.to_expr else names << c.to_s end end build(drop: Proto::Drop.new(input: @relation, columns: columns, column_names: names)) end |
#drop_duplicates(subset = nil) ⇒ DataFrame Also known as: dropDuplicates, drop_duplicates_within_watermark
Drop duplicate rows, optionally restricted to a subset of columns.
175 176 177 178 179 180 181 182 183 |
# File 'lib/spark_connect/data_frame.rb', line 175 def drop_duplicates(subset = nil) dedup = if subset.nil? || subset.empty? Proto::Deduplicate.new(input: @relation, all_columns_as_keys: true) else Proto::Deduplicate.new(input: @relation, column_names: Array(subset).map(&:to_s)) end build(deduplicate: dedup) end |
#dtypes ⇒ Array<Array(String, String)>
Returns (name, simpleString-type) pairs.
576 577 578 |
# File 'lib/spark_connect/data_frame.rb', line 576 def dtypes schema.fields.map { |f| [f.name, f.data_type.simple_string] } end |
#each {|row| ... } ⇒ Enumerator, void
Iterate over all rows (materialises the result). Returns an Enumerator when no block is given.
635 636 637 638 639 |
# File 'lib/spark_connect/data_frame.rb', line 635 def each(&block) return collect.each unless block collect.each(&block) end |
#empty? ⇒ Boolean Also known as: is_empty
Returns whether the DataFrame has no rows.
677 678 679 |
# File 'lib/spark_connect/data_frame.rb', line 677 def empty? limit(1).collect.empty? end |
#except_all(other) ⇒ DataFrame Also known as: exceptAll
Rows in this DataFrame not in ‘other` (distinct).
316 317 318 |
# File 'lib/spark_connect/data_frame.rb', line 316 def except_all(other) set_op(other, :SET_OP_TYPE_EXCEPT, is_all: true) end |
#explain(mode = :simple) ⇒ void
This method returns an undefined value.
Print the query plan.
730 731 732 |
# File 'lib/spark_connect/data_frame.rb', line 730 def explain(mode = :simple) $stdout.puts(explain_string(mode)) end |
#explain_string(mode = :simple) ⇒ String
Return the query plan as a string.
723 724 725 726 |
# File 'lib/spark_connect/data_frame.rb', line 723 def explain_string(mode = :simple) em = :"EXPLAIN_MODE_#{mode.to_s.upcase}" analyze(explain: Proto::AnalyzePlanRequest::Explain.new(plan: plan, explain_mode: em)).explain.explain_string end |
#filter(condition) ⇒ DataFrame Also known as: where
Filter rows by a condition.
87 88 89 90 |
# File 'lib/spark_connect/data_frame.rb', line 87 def filter(condition) cond = condition.is_a?(String) ? Functions.expr(condition) : condition build(filter: Proto::Filter.new(input: @relation, condition: cond.to_expr)) end |
#first ⇒ Row?
Returns the first row.
660 661 662 |
# File 'lib/spark_connect/data_frame.rb', line 660 def first take(1).first end |
#group_by(*cols) ⇒ GroupedData Also known as: groupBy, groupby
Group by the given columns.
226 227 228 |
# File 'lib/spark_connect/data_frame.rb', line 226 def group_by(*cols) GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_GROUPBY) end |
#head(n = nil) ⇒ Array<Row>, Row
Returns ‘n` rows (array) or the single first row when called with no arg.
653 654 655 656 657 |
# File 'lib/spark_connect/data_frame.rb', line 653 def head(n = nil) return first if n.nil? take(n) end |
#hint(name, *params) ⇒ DataFrame
Attach a planner hint (e.g. ‘“broadcast”`).
405 406 407 408 409 |
# File 'lib/spark_connect/data_frame.rb', line 405 def hint(name, *params) h = Proto::Hint.new(input: @relation, name: name.to_s, parameters: params.map { |p| Column.to_col(p).to_expr }) build(hint: h) end |
#input_files ⇒ Array<String>
Returns the input files backing this DataFrame.
735 736 737 |
# File 'lib/spark_connect/data_frame.rb', line 735 def input_files analyze(input_files: Proto::AnalyzePlanRequest::InputFiles.new(plan: plan)).input_files.files.to_a end |
#intersect(other) ⇒ DataFrame
Set intersection (distinct).
303 304 305 |
# File 'lib/spark_connect/data_frame.rb', line 303 def intersect(other) set_op(other, :SET_OP_TYPE_INTERSECT, is_all: false) end |
#intersect_all(other) ⇒ DataFrame Also known as: intersectAll
Set intersection keeping duplicates.
309 310 311 |
# File 'lib/spark_connect/data_frame.rb', line 309 def intersect_all(other) set_op(other, :SET_OP_TYPE_INTERSECT, is_all: true) end |
#join(other, on: nil, how: :inner) ⇒ DataFrame
Join with another DataFrame.
261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/spark_connect/data_frame.rb', line 261 def join(other, on: nil, how: :inner) jt = JOIN_TYPES[how.to_s.downcase.to_sym] or raise IllegalArgumentError, "Unsupported join type: #{how}" j = Proto::Join.new(left: @relation, right: other.relation, join_type: jt) case on when nil then nil when Column then j.join_condition = on.to_expr when Array then j.using_columns += on.map(&:to_s) else j.using_columns << on.to_s end build(join: j) end |
#limit(n) ⇒ DataFrame
Returns the first ‘n` rows.
211 212 213 |
# File 'lib/spark_connect/data_frame.rb', line 211 def limit(n) build(limit: Proto::Limit.new(input: @relation, limit: n)) end |
#local? ⇒ Boolean
Returns whether the data is small enough to be local.
740 741 742 |
# File 'lib/spark_connect/data_frame.rb', line 740 def local? analyze(is_local: Proto::AnalyzePlanRequest::IsLocal.new(plan: plan)).is_local.is_local end |
#local_checkpoint(eager: true) ⇒ DataFrame Also known as: localCheckpoint
Like #checkpoint but uses the executors’ local storage (no reliable storage), which is faster but not fault-tolerant.
543 544 545 |
# File 'lib/spark_connect/data_frame.rb', line 543 def local_checkpoint(eager: true) checkpoint_command(local: true, eager: eager) end |
#na ⇒ DataFrameNaFunctions
Returns missing-data helpers (‘drop`, `fill`, `replace`).
433 434 435 |
# File 'lib/spark_connect/data_frame.rb', line 433 def na DataFrameNaFunctions.new(self) end |
#observe(name, *exprs) ⇒ DataFrame
Observe named metrics over this DataFrame.
553 554 555 556 557 558 559 560 561 |
# File 'lib/spark_connect/data_frame.rb', line 553 def observe(name, *exprs) obs_name = name.is_a?(Observation) ? name.name : name.to_s cm = Proto::CollectMetrics.new( input: @relation, name: obs_name, metrics: exprs.map { |e| Column.to_col(e).to_expr } ) df = build(collect_metrics: cm) name.bind(df) if name.is_a?(Observation) df end |
#offset(n) ⇒ DataFrame
Returns all rows except the first ‘n`.
216 217 218 |
# File 'lib/spark_connect/data_frame.rb', line 216 def offset(n) build(offset: Proto::Offset.new(input: @relation, offset: n)) end |
#order_by(*cols) ⇒ DataFrame Also known as: sort, orderBy
Sort by the given columns (globally).
193 194 195 196 |
# File 'lib/spark_connect/data_frame.rb', line 193 def order_by(*cols) orders = normalize_columns(cols).map { |c| to_sort_order(c) } build(sort: Proto::Sort.new(input: @relation, order: orders, is_global: true)) end |
#plan ⇒ Spark::Connect::Plan
763 764 765 |
# File 'lib/spark_connect/data_frame.rb', line 763 def plan PlanBuilder.root_plan(@relation) end |
#print_schema(io = $stdout) ⇒ void Also known as: printSchema
This method returns an undefined value.
Print the schema as an indented tree to ‘io`.
587 588 589 |
# File 'lib/spark_connect/data_frame.rb', line 587 def print_schema(io = $stdout) io.puts(schema.tree_string) end |
#repartition(num_partitions, *cols) ⇒ DataFrame
Repartition into ‘num_partitions`, optionally hash-partitioned by columns.
334 335 336 337 338 339 340 341 342 343 |
# File 'lib/spark_connect/data_frame.rb', line 334 def repartition(num_partitions, *cols) if cols.empty? build(repartition: Proto::Repartition.new(input: @relation, num_partitions: num_partitions, shuffle: true)) else rbe = Proto::RepartitionByExpression.new( input: @relation, partition_exprs: normalize_columns(cols).map(&:to_expr), num_partitions: num_partitions ) build(repartition_by_expression: rbe) end end |
#repartition_by_range(*cols) ⇒ DataFrame #repartition_by_range(num_partitions, *cols) ⇒ DataFrame Also known as: repartitionByRange
Range-partition by the given columns (rows are range-partitioned on the sort order of the columns).
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/spark_connect/data_frame.rb', line 357 def repartition_by_range(*args) num_partitions = args.first.is_a?(Integer) ? args.shift : nil orders = normalize_columns(args).map do |c| expr = c.to_expr if expr.expr_type == :sort_order expr else Proto::Expression.new(sort_order: Proto::Expression::SortOrder.new( child: expr, direction: :SORT_DIRECTION_ASCENDING, null_ordering: :SORT_NULLS_FIRST )) end end rbe = Proto::RepartitionByExpression.new(input: @relation, partition_exprs: orders) rbe.num_partitions = num_partitions if num_partitions build(repartition_by_expression: rbe) end |
#respond_to_missing?(name, include_private = false) ⇒ Boolean
612 613 614 615 616 617 618 |
# File 'lib/spark_connect/data_frame.rb', line 612 def respond_to_missing?(name, include_private = false) begin columns.include?(name.to_s) rescue StandardError false end || super end |
#rollup(*cols) ⇒ GroupedData
Multi-dimensional rollup.
234 235 236 |
# File 'lib/spark_connect/data_frame.rb', line 234 def rollup(*cols) GroupedData.new(self, normalize_columns(cols), :GROUP_TYPE_ROLLUP) end |
#same_semantics?(other) ⇒ Boolean
Returns whether ‘other` has the same logical plan.
750 751 752 753 754 |
# File 'lib/spark_connect/data_frame.rb', line 750 def same_semantics?(other) analyze(same_semantics: Proto::AnalyzePlanRequest::SameSemantics.new( target_plan: plan, other_plan: other.plan )).same_semantics.result end |
#sample(fraction, with_replacement: false, seed: nil) ⇒ DataFrame
Random sample of rows.
383 384 385 386 387 388 389 |
# File 'lib/spark_connect/data_frame.rb', line 383 def sample(fraction, with_replacement: false, seed: nil) s = Proto::Sample.new( input: @relation, lower_bound: 0.0, upper_bound: fraction, with_replacement: with_replacement ) s.seed = seed if seed build(sample: s) end |
#schema ⇒ Types::StructType
Returns the DataFrame’s schema.
566 567 568 |
# File 'lib/spark_connect/data_frame.rb', line 566 def schema @schema ||= Types.from_proto(analyze(schema: Proto::AnalyzePlanRequest::Schema.new(plan: plan)).schema.schema) end |
#select(*cols) ⇒ DataFrame
Select a set of columns or expressions.
64 65 66 67 |
# File 'lib/spark_connect/data_frame.rb', line 64 def select(*cols) exprs = normalize_columns(cols).map(&:to_expr) build(project: Proto::Project.new(input: @relation, expressions: exprs)) end |
#select_expr(*exprs) ⇒ DataFrame Also known as: selectExpr
Select using SQL expression strings.
73 74 75 76 77 78 |
# File 'lib/spark_connect/data_frame.rb', line 73 def select_expr(*exprs) parsed = exprs.flatten.map do |e| Proto::Expression.new(expression_string: Proto::Expression::ExpressionString.new(expression: e)) end build(project: Proto::Project.new(input: @relation, expressions: parsed)) end |
#semantic_hash ⇒ Integer
Returns a hash of the logical plan.
757 758 759 |
# File 'lib/spark_connect/data_frame.rb', line 757 def semantic_hash analyze(semantic_hash: Proto::AnalyzePlanRequest::SemanticHash.new(plan: plan)).semantic_hash.result end |
#show(n = 20, truncate: true, vertical: false) ⇒ void
This method returns an undefined value.
Render the first ‘n` rows as a formatted table.
689 690 691 |
# File 'lib/spark_connect/data_frame.rb', line 689 def show(n = 20, truncate: true, vertical: false) $stdout.puts(show_string(n, truncate: truncate, vertical: vertical)) end |
#show_string(n = 20, truncate: true, vertical: false) ⇒ String
Returns the formatted table string (what #show prints).
694 695 696 697 698 699 700 701 702 703 |
# File 'lib/spark_connect/data_frame.rb', line 694 def show_string(n = 20, truncate: true, vertical: false) trunc = if truncate == true 20 else (truncate == false ? 0 : truncate.to_i) end ss = Proto::ShowString.new(input: @relation, num_rows: n, truncate: trunc, vertical: vertical) df = build(show_string: ss) df.collect.first&.[](0).to_s end |
#sort_within_partitions(*cols) ⇒ DataFrame Also known as: sortWithinPartitions
Sort within each partition (no global shuffle).
202 203 204 205 |
# File 'lib/spark_connect/data_frame.rb', line 202 def sort_within_partitions(*cols) orders = normalize_columns(cols).map { |c| to_sort_order(c) } build(sort: Proto::Sort.new(input: @relation, order: orders, is_global: false)) end |
#stat ⇒ DataFrameStatFunctions
Returns statistical helpers.
438 439 440 |
# File 'lib/spark_connect/data_frame.rb', line 438 def stat DataFrameStatFunctions.new(self) end |
#streaming? ⇒ Boolean
Returns whether this is a streaming DataFrame.
745 746 747 |
# File 'lib/spark_connect/data_frame.rb', line 745 def streaming? analyze(is_streaming: Proto::AnalyzePlanRequest::IsStreaming.new(plan: plan)).is_streaming.is_streaming end |
#subtract(other) ⇒ DataFrame
Rows in this DataFrame not in ‘other` (distinct) - Spark’s ‘EXCEPT`.
323 324 325 |
# File 'lib/spark_connect/data_frame.rb', line 323 def subtract(other) set_op(other, :SET_OP_TYPE_EXCEPT, is_all: false) end |
#summary(*statistics) ⇒ DataFrame
Configurable summary statistics.
105 106 107 |
# File 'lib/spark_connect/stat_functions.rb', line 105 def summary(*statistics) build(summary: Proto::StatSummary.new(input: @relation, statistics: statistics.flatten.map(&:to_s))) end |
#take(n) ⇒ Array<Row>
Returns the first ‘n` rows.
648 649 650 |
# File 'lib/spark_connect/data_frame.rb', line 648 def take(n) limit(n).collect end |
#to(schema) ⇒ DataFrame
Apply a Types::StructType (reconciling/casting columns to it).
159 160 161 |
# File 'lib/spark_connect/data_frame.rb', line 159 def to(schema) build(to_schema: Proto::ToSchema.new(input: @relation, schema: schema.to_proto)) end |
#to_arrow ⇒ Arrow::Table?
Materialise the result as an Arrow Arrow::Table (columnar).
707 708 709 710 |
# File 'lib/spark_connect/data_frame.rb', line 707 def to_arrow result = @session.client.execute_plan(@relation) ArrowConverter.to_table(result.arrow_batches) end |
#to_df(*names) ⇒ DataFrame Also known as: toDF
Rename all columns positionally.
152 153 154 |
# File 'lib/spark_connect/data_frame.rb', line 152 def to_df(*names) build(to_df: Proto::ToDF.new(input: @relation, column_names: names.flatten.map(&:to_s))) end |
#to_h_array ⇒ Array<Hash>
Returns all rows as Hashes.
713 714 715 |
# File 'lib/spark_connect/data_frame.rb', line 713 def to_h_array collect.map(&:to_h) end |
#to_json(*_args) ⇒ DataFrame Also known as: toJSON
Returns a single-column (‘value`) DataFrame of each row encoded as a JSON string.
502 503 504 |
# File 'lib/spark_connect/data_frame.rb', line 502 def to_json(*_args) select(Functions.to_json(Functions.struct(Functions.col("*"))).alias("value")) end |
#to_local_iterator ⇒ Enumerator<Row> Also known as: toLocalIterator
Returns an enumerator over all rows.
642 643 644 |
# File 'lib/spark_connect/data_frame.rb', line 642 def to_local_iterator collect.each end |
#to_s ⇒ Object Also known as: inspect
767 768 769 |
# File 'lib/spark_connect/data_frame.rb', line 767 def to_s "#<SparkConnect::DataFrame>" end |
#transform {|df| ... } ⇒ DataFrame
Apply a function to this DataFrame and return its result, enabling a fluent chain of custom transformations.
525 526 527 |
# File 'lib/spark_connect/data_frame.rb', line 525 def transform yield(self) end |
#union(other) ⇒ DataFrame Also known as: union_all, unionAll
Union (by position; keeps duplicates - equivalent to Spark’s ‘unionAll`).
285 286 287 |
# File 'lib/spark_connect/data_frame.rb', line 285 def union(other) set_op(other, :SET_OP_TYPE_UNION, is_all: true) end |
#union_by_name(other, allow_missing_columns: false) ⇒ DataFrame Also known as: unionByName
Union by column name.
296 297 298 |
# File 'lib/spark_connect/data_frame.rb', line 296 def union_by_name(other, allow_missing_columns: false) set_op(other, :SET_OP_TYPE_UNION, is_all: true, by_name: true, allow_missing_columns: allow_missing_columns) end |
#unpivot(ids, values, variable_column_name, value_column_name) ⇒ DataFrame Also known as: melt
Unpivot (melt) columns from wide to long format.
418 419 420 421 422 423 424 425 426 427 |
# File 'lib/spark_connect/data_frame.rb', line 418 def unpivot(ids, values, variable_column_name, value_column_name) u = Proto::Unpivot.new( input: @relation, ids: normalize_columns(Array(ids)).map(&:to_expr), variable_column_name: variable_column_name, value_column_name: value_column_name ) u.values = Proto::Unpivot::Values.new(values: normalize_columns(Array(values)).map(&:to_expr)) unless values.nil? build(unpivot: u) end |
#with_column(name, col) ⇒ DataFrame Also known as: withColumn
Add or replace a single column.
100 101 102 |
# File 'lib/spark_connect/data_frame.rb', line 100 def with_column(name, col) with_columns(name => col) end |
#with_column_renamed(existing, new_name) ⇒ DataFrame Also known as: withColumnRenamed
Rename a single column.
119 120 121 |
# File 'lib/spark_connect/data_frame.rb', line 119 def with_column_renamed(existing, new_name) with_columns_renamed(existing => new_name) end |
#with_columns(assigns) ⇒ DataFrame Also known as: withColumns
Add or replace multiple columns.
109 110 111 112 113 114 |
# File 'lib/spark_connect/data_frame.rb', line 109 def with_columns(assigns) aliases = assigns.map do |name, col| Proto::Expression::Alias.new(expr: Column.to_col(col).to_expr, name: [name.to_s]) end build(with_columns: Proto::WithColumns.new(input: @relation, aliases: aliases)) end |
#with_columns_renamed(renames) ⇒ DataFrame Also known as: withColumnsRenamed
Rename multiple columns.
128 129 130 131 132 133 |
# File 'lib/spark_connect/data_frame.rb', line 128 def with_columns_renamed(renames) pairs = renames.map do |old, new_name| Proto::WithColumnsRenamed::Rename.new(col_name: old.to_s, new_col_name: new_name.to_s) end build(with_columns_renamed: Proto::WithColumnsRenamed.new(input: @relation, renames: pairs)) end |
#with_watermark(event_time, delay_threshold) ⇒ DataFrame Also known as: withWatermark
Define an event-time watermark for late-data handling on a streaming DataFrame.
513 514 515 516 517 |
# File 'lib/spark_connect/data_frame.rb', line 513 def with_watermark(event_time, delay_threshold) build(with_watermark: Proto::WithWatermark.new( input: @relation, event_time: event_time.to_s, delay_threshold: delay_threshold.to_s )) end |
#write ⇒ DataFrameWriter
Returns interface for saving this DataFrame.
443 444 445 |
# File 'lib/spark_connect/data_frame.rb', line 443 def write DataFrameWriter.new(self) end |
#write_stream ⇒ DataStreamWriter Also known as: writeStream
Returns interface for starting a streaming query from this (streaming) DataFrame.
455 456 457 |
# File 'lib/spark_connect/data_frame.rb', line 455 def write_stream DataStreamWriter.new(self) end |
#write_to(table) ⇒ DataFrameWriterV2 Also known as: writeTo
Returns the v2 (catalog) write interface.
448 449 450 |
# File 'lib/spark_connect/data_frame.rb', line 448 def write_to(table) DataFrameWriterV2.new(self, table) end |