Class: SparkConnect::Column
- Inherits:
-
Object
- Object
- SparkConnect::Column
- Defined in:
- lib/spark_connect/column.rb
Overview
A column expression: a lazily-evaluated reference to a column or a computation over columns. Columns are immutable; operators and methods return new Columns.
A Column wraps a protobuf ‘Expression`. Build them with Functions.col, Functions.lit, by indexing a DataFrame (`df`), or by combining other columns with operators.
Constant Summary collapse
- Proto =
SparkConnect::Proto
Instance Attribute Summary collapse
-
#expr ⇒ Spark::Connect::Expression
readonly
The wrapped protobuf expression.
Class Method Summary collapse
-
.from_expr(expr) ⇒ Column
Wrap an existing protobuf expression.
-
.from_name(name) ⇒ Column
An unresolved attribute reference by (possibly dotted) name.
-
.infer_type(value) ⇒ Types::DataType
Infer the Spark Types::DataType for a Ruby value (used when building array/map literals).
-
.invoke(name, *args, is_distinct: false) ⇒ Column
Build an ‘UnresolvedFunction` call column.
-
.lit(value) ⇒ Column
Build a literal column from a Ruby value.
-
.to_col(value) ⇒ Column
Coerce a value into a Column (literals are wrapped).
-
.to_literal(value) ⇒ Spark::Connect::Expression::Literal
Encode a Ruby value as a protobuf ‘Expression.Literal`.
Instance Method Summary collapse
- #! ⇒ Object (also: #not)
- #!=(other) ⇒ Object
- #%(other) ⇒ Object
-
#&(other) ⇒ Object
—- Boolean ———————————————————–.
- #*(other) ⇒ Object
-
#**(other) ⇒ Column
Raise this column to the power of ‘other`.
-
#+(other) ⇒ Object
—- Arithmetic ——————————————————–.
- #+@ ⇒ Object
- #-(other) ⇒ Object
- #-@ ⇒ Object
- #/(other) ⇒ Object
- #<(other) ⇒ Object
- #<=(other) ⇒ Object
-
#==(other) ⇒ Object
—- Comparison ——————————————————–.
- #>(other) ⇒ Object
- #>=(other) ⇒ Object
-
#[](key) ⇒ Column
—- Complex-type access ———————————————- Extract an array element by index, a map value by key, or a struct field.
-
#alias(*names, metadata: nil) ⇒ Column
(also: #name, #as)
—- Aliasing / naming ————————————————- Assign one or more output names.
-
#asc ⇒ Object
—- Sort ordering —————————————————–.
- #asc_nulls_first ⇒ Object
- #asc_nulls_last ⇒ Object
-
#between(lower, upper) ⇒ Column
True if ‘lower <= self <= upper`.
-
#bitwise_and(other) ⇒ Object
—- Bitwise ———————————————————–.
- #bitwise_or(other) ⇒ Object
- #bitwise_xor(other) ⇒ Object
-
#cast(data_type) ⇒ Column
(also: #as_type, #astype)
—- Casting ———————————————————– Cast to another type, given either a Types::DataType or a DDL type string (e.g. ‘“int”`, `“decimal(10,2)”`).
- #contains(other) ⇒ Object
- #desc ⇒ Object
- #desc_nulls_first ⇒ Object
- #desc_nulls_last ⇒ Object
- #endswith(other) ⇒ Object
-
#eq_null_safe(other) ⇒ Column
Null-safe equality (‘<=>` in Spark SQL): `null <=> null` is true.
-
#get_field(name) ⇒ Column
Extract a struct field by name.
- #get_item(key) ⇒ Object
- #ilike(pattern) ⇒ Object
-
#initialize(expr) ⇒ Column
constructor
A new instance of Column.
- #is_nan ⇒ Object
- #is_not_null ⇒ Object (also: #isNotNull)
-
#is_null ⇒ Object
(also: #isNull)
—- Null / membership predicates ————————————-.
-
#isin(*values) ⇒ Column
(also: #in_list)
True if the column’s value is in ‘values`.
-
#like(pattern) ⇒ Object
—- String predicates ————————————————-.
-
#otherwise(value) ⇒ Column
Provide the default (ELSE) value for a CASE expression.
-
#over(window) ⇒ Column
—- Windowing ——————————————————— Define a windowed aggregation / analytic computation over this column.
- #rlike(pattern) ⇒ Object
- #startswith(other) ⇒ Object
-
#substr(start, len) ⇒ Column
Substring of length ‘len` starting at 1-based position `start`.
- #to_expr ⇒ Spark::Connect::Expression
- #to_s ⇒ Object (also: #inspect)
-
#when(condition, value) ⇒ Column
—- CASE WHEN ——————————————————— Add a branch to a CASE expression started by Functions#when.
- #|(other) ⇒ Object
Constructor Details
#initialize(expr) ⇒ Column
Returns a new instance of Column.
26 27 28 |
# File 'lib/spark_connect/column.rb', line 26 def initialize(expr) @expr = expr end |
Instance Attribute Details
#expr ⇒ Spark::Connect::Expression (readonly)
Returns the wrapped protobuf expression.
23 24 25 |
# File 'lib/spark_connect/column.rb', line 23 def expr @expr end |
Class Method Details
.from_expr(expr) ⇒ Column
Wrap an existing protobuf expression.
38 39 40 |
# File 'lib/spark_connect/column.rb', line 38 def from_expr(expr) new(expr) end |
.from_name(name) ⇒ Column
An unresolved attribute reference by (possibly dotted) name. The special name ‘“*”` expands to all columns.
47 48 49 50 51 52 53 54 55 |
# File 'lib/spark_connect/column.rb', line 47 def from_name(name) if name == "*" new(Proto::Expression.new(unresolved_star: Proto::Expression::UnresolvedStar.new)) else new(Proto::Expression.new( unresolved_attribute: Proto::Expression::UnresolvedAttribute.new(unparsed_identifier: name.to_s) )) end end |
.infer_type(value) ⇒ Types::DataType
Infer the Spark Types::DataType for a Ruby value (used when building array/map literals). Mirrors PySpark’s literal type inference.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/spark_connect/column.rb', line 152 def infer_type(value) case value when nil then Types.null when true, false then Types.boolean when Integer value.between?(-2_147_483_648, 2_147_483_647) ? Types.integer : Types.long when Float, Rational then Types.double when BigDecimal then Types.decimal(38, 18) when String then value.encoding == Encoding::ASCII_8BIT ? Types.binary : Types.string when Symbol then Types.string when Time, DateTime then Types. when Date then Types.date when Array then Types.array(value.empty? ? Types.null : infer_type(value.find { |v| !v.nil? })) when Hash Types.map(value.empty? ? Types.string : infer_type(value.keys.first), value.empty? ? Types.string : infer_type(value.values.first)) else raise IllegalArgumentError, "Cannot infer Spark type for #{value.class}" end end |
.invoke(name, *args, is_distinct: false) ⇒ Column
Build an ‘UnresolvedFunction` call column.
74 75 76 77 78 79 80 81 82 |
# File 'lib/spark_connect/column.rb', line 74 def invoke(name, *args, is_distinct: false) new(Proto::Expression.new( unresolved_function: Proto::Expression::UnresolvedFunction.new( function_name: name.to_s, arguments: args.map { |a| to_col(a).to_expr }, is_distinct: is_distinct ) )) end |
.lit(value) ⇒ Column
Build a literal column from a Ruby value.
62 63 64 65 66 |
# File 'lib/spark_connect/column.rb', line 62 def lit(value) return value if value.is_a?(Column) new(Proto::Expression.new(literal: to_literal(value))) end |
.to_col(value) ⇒ Column
Coerce a value into a SparkConnect::Column (literals are wrapped).
86 87 88 |
# File 'lib/spark_connect/column.rb', line 86 def to_col(value) value.is_a?(Column) ? value : lit(value) end |
.to_literal(value) ⇒ Spark::Connect::Expression::Literal
Encode a Ruby value as a protobuf ‘Expression.Literal`.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/spark_connect/column.rb', line 94 def to_literal(value) l = Proto::Expression::Literal case value when nil l.new(null: Types.null.to_proto) when true, false l.new(boolean: value) when Integer if value.between?(-2_147_483_648, 2_147_483_647) l.new(integer: value) else l.new(long: value) end when Float l.new(double: value) when BigDecimal l.new(decimal: l::Decimal.new(value: value.to_s("F"))) when Rational l.new(double: value.to_f) when String if value.encoding == Encoding::ASCII_8BIT l.new(binary: value) else l.new(string: value) end when Symbol l.new(string: value.to_s) when Time l.new(timestamp: (value.to_r * 1_000_000).to_i) when DateTime l.new(timestamp: (value.to_time.to_r * 1_000_000).to_i) when Date l.new(date: (value - Date.new(1970, 1, 1)).to_i) when Array elem_type = infer_array_element_type(value) l.new(array: l::Array.new( element_type: elem_type.to_proto, elements: value.map { |v| to_literal(v) } )) when Hash key_type = value.empty? ? Types.string : infer_type(value.keys.first) val_type = value.empty? ? Types.string : infer_type(value.values.first) l.new(map: l::Map.new( key_type: key_type.to_proto, value_type: val_type.to_proto, keys: value.keys.map { |k| to_literal(k) }, values: value.values.map { |v| to_literal(v) } )) else raise IllegalArgumentError, "Unsupported literal value of type #{value.class}: #{value.inspect}" end end |
Instance Method Details
#! ⇒ Object Also known as: not
210 211 212 |
# File 'lib/spark_connect/column.rb', line 210 def ! Column.invoke("not", self) end |
#!=(other) ⇒ Object
196 |
# File 'lib/spark_connect/column.rb', line 196 def !=(other) = bin_op("!=", other) |
#%(other) ⇒ Object
186 |
# File 'lib/spark_connect/column.rb', line 186 def %(other) = bin_op("%", other) |
#&(other) ⇒ Object
—- Boolean ———————————————————–
207 |
# File 'lib/spark_connect/column.rb', line 207 def &(other) = bin_op("and", other) |
#*(other) ⇒ Object
184 |
# File 'lib/spark_connect/column.rb', line 184 def *(other) = bin_op("*", other) |
#**(other) ⇒ Column
Raise this column to the power of ‘other`.
192 |
# File 'lib/spark_connect/column.rb', line 192 def **(other) = bin_op("power", other) |
#+(other) ⇒ Object
—- Arithmetic ——————————————————–
182 |
# File 'lib/spark_connect/column.rb', line 182 def +(other) = bin_op("+", other) |
#+@ ⇒ Object
188 |
# File 'lib/spark_connect/column.rb', line 188 def +@ = self |
#-(other) ⇒ Object
183 |
# File 'lib/spark_connect/column.rb', line 183 def -(other) = bin_op("-", other) |
#-@ ⇒ Object
187 |
# File 'lib/spark_connect/column.rb', line 187 def -@ = Column.invoke("negative", self) |
#/(other) ⇒ Object
185 |
# File 'lib/spark_connect/column.rb', line 185 def /(other) = bin_op("/", other) |
#<(other) ⇒ Object
197 |
# File 'lib/spark_connect/column.rb', line 197 def <(other) = bin_op("<", other) |
#<=(other) ⇒ Object
198 |
# File 'lib/spark_connect/column.rb', line 198 def <=(other) = bin_op("<=", other) |
#==(other) ⇒ Object
—- Comparison ——————————————————–
195 |
# File 'lib/spark_connect/column.rb', line 195 def ==(other) = bin_op("==", other) |
#>(other) ⇒ Object
199 |
# File 'lib/spark_connect/column.rb', line 199 def >(other) = bin_op(">", other) |
#>=(other) ⇒ Object
200 |
# File 'lib/spark_connect/column.rb', line 200 def >=(other) = bin_op(">=", other) |
#[](key) ⇒ Column
—- Complex-type access ———————————————- Extract an array element by index, a map value by key, or a struct field.
258 259 260 |
# File 'lib/spark_connect/column.rb', line 258 def [](key) get_item(key) end |
#alias(*names, metadata: nil) ⇒ Column Also known as: name, as
—- Aliasing / naming ————————————————- Assign one or more output names. With multiple names the expression must produce a struct/multiple columns (e.g. ‘inline`).
283 284 285 286 287 |
# File 'lib/spark_connect/column.rb', line 283 def alias(*names, metadata: nil) a = Proto::Expression::Alias.new(expr: @expr, name: names.map(&:to_s)) a. = JSON.generate() if Column.new(Proto::Expression.new(alias: a)) end |
#asc ⇒ Object
—- Sort ordering —————————————————–
310 |
# File 'lib/spark_connect/column.rb', line 310 def asc = sort_order(:SORT_DIRECTION_ASCENDING, :SORT_NULLS_FIRST) |
#asc_nulls_first ⇒ Object
312 |
# File 'lib/spark_connect/column.rb', line 312 def asc_nulls_first = sort_order(:SORT_DIRECTION_ASCENDING, :SORT_NULLS_FIRST) |
#asc_nulls_last ⇒ Object
313 |
# File 'lib/spark_connect/column.rb', line 313 def asc_nulls_last = sort_order(:SORT_DIRECTION_ASCENDING, :SORT_NULLS_LAST) |
#between(lower, upper) ⇒ Column
True if ‘lower <= self <= upper`.
237 238 239 |
# File 'lib/spark_connect/column.rb', line 237 def between(lower, upper) (self >= lower) & (self <= upper) end |
#bitwise_and(other) ⇒ Object
—- Bitwise ———————————————————–
216 |
# File 'lib/spark_connect/column.rb', line 216 def bitwise_and(other) = bin_op("&", other) |
#bitwise_or(other) ⇒ Object
217 |
# File 'lib/spark_connect/column.rb', line 217 def bitwise_or(other) = bin_op("|", other) |
#bitwise_xor(other) ⇒ Object
218 |
# File 'lib/spark_connect/column.rb', line 218 def bitwise_xor(other) = bin_op("^", other) |
#cast(data_type) ⇒ Column Also known as: as_type, astype
—- Casting ———————————————————– Cast to another type, given either a Types::DataType or a DDL type string (e.g. ‘“int”`, `“decimal(10,2)”`).
297 298 299 300 301 302 303 304 305 |
# File 'lib/spark_connect/column.rb', line 297 def cast(data_type) c = Proto::Expression::Cast.new(expr: @expr) if data_type.is_a?(String) c.type_str = data_type else c.type = data_type.to_proto end Column.new(Proto::Expression.new(cast: c)) end |
#contains(other) ⇒ Object
245 |
# File 'lib/spark_connect/column.rb', line 245 def contains(other) = bin_op("contains", other) |
#desc ⇒ Object
311 |
# File 'lib/spark_connect/column.rb', line 311 def desc = sort_order(:SORT_DIRECTION_DESCENDING, :SORT_NULLS_LAST) |
#desc_nulls_first ⇒ Object
314 |
# File 'lib/spark_connect/column.rb', line 314 def desc_nulls_first = sort_order(:SORT_DIRECTION_DESCENDING, :SORT_NULLS_FIRST) |
#desc_nulls_last ⇒ Object
315 |
# File 'lib/spark_connect/column.rb', line 315 def desc_nulls_last = sort_order(:SORT_DIRECTION_DESCENDING, :SORT_NULLS_LAST) |
#endswith(other) ⇒ Object
247 |
# File 'lib/spark_connect/column.rb', line 247 def endswith(other) = bin_op("endswith", other) |
#eq_null_safe(other) ⇒ Column
Null-safe equality (‘<=>` in Spark SQL): `null <=> null` is true.
204 |
# File 'lib/spark_connect/column.rb', line 204 def eq_null_safe(other) = bin_op("<=>", other) |
#get_field(name) ⇒ Column
Extract a struct field by name.
272 273 274 |
# File 'lib/spark_connect/column.rb', line 272 def get_field(name) get_item(name.to_s) end |
#get_item(key) ⇒ Object
262 263 264 265 266 267 268 |
# File 'lib/spark_connect/column.rb', line 262 def get_item(key) Column.new(Proto::Expression.new( unresolved_extract_value: Proto::Expression::UnresolvedExtractValue.new( child: @expr, extraction: Column.lit(key).to_expr ) )) end |
#ilike(pattern) ⇒ Object
244 |
# File 'lib/spark_connect/column.rb', line 244 def ilike(pattern) = bin_op("ilike", pattern) |
#is_nan ⇒ Object
223 |
# File 'lib/spark_connect/column.rb', line 223 def is_nan = Column.invoke("isNaN", self) |
#is_not_null ⇒ Object Also known as: isNotNull
222 |
# File 'lib/spark_connect/column.rb', line 222 def is_not_null = Column.invoke("isNotNull", self) |
#is_null ⇒ Object Also known as: isNull
—- Null / membership predicates ————————————-
221 |
# File 'lib/spark_connect/column.rb', line 221 def is_null = Column.invoke("isNull", self) |
#isin(*values) ⇒ Column Also known as: in_list
True if the column’s value is in ‘values`.
229 230 231 232 |
# File 'lib/spark_connect/column.rb', line 229 def isin(*values) values = values.first if values.size == 1 && values.first.is_a?(Array) Column.invoke("in", self, *Array(values)) end |
#like(pattern) ⇒ Object
—- String predicates ————————————————-
242 |
# File 'lib/spark_connect/column.rb', line 242 def like(pattern) = bin_op("like", pattern) |
#otherwise(value) ⇒ Column
Provide the default (ELSE) value for a CASE expression.
334 335 336 337 338 339 340 341 342 343 |
# File 'lib/spark_connect/column.rb', line 334 def otherwise(value) unless @expr.expr_type == :unresolved_function && @expr.unresolved_function.function_name == "when" raise IllegalArgumentError, "otherwise() can only be applied on a Column previously generated by when()" end args = @expr.unresolved_function.arguments.to_a + [Column.to_col(value).to_expr] Column.new(Proto::Expression.new( unresolved_function: Proto::Expression::UnresolvedFunction.new(function_name: "when", arguments: args) )) end |
#over(window) ⇒ Column
—- Windowing ——————————————————— Define a windowed aggregation / analytic computation over this column.
350 351 352 353 354 355 356 357 358 |
# File 'lib/spark_connect/column.rb', line 350 def over(window) w = Proto::Expression::Window.new( window_function: @expr, partition_spec: window.partition_spec, order_spec: window.order_spec ) w.frame_spec = window.frame_spec if window.frame_spec Column.new(Proto::Expression.new(window: w)) end |
#rlike(pattern) ⇒ Object
243 |
# File 'lib/spark_connect/column.rb', line 243 def rlike(pattern) = bin_op("rlike", pattern) |
#startswith(other) ⇒ Object
246 |
# File 'lib/spark_connect/column.rb', line 246 def startswith(other) = bin_op("startswith", other) |
#substr(start, len) ⇒ Column
Substring of length ‘len` starting at 1-based position `start`.
251 252 253 |
# File 'lib/spark_connect/column.rb', line 251 def substr(start, len) Column.invoke("substr", self, start, len) end |
#to_expr ⇒ Spark::Connect::Expression
31 32 33 |
# File 'lib/spark_connect/column.rb', line 31 def to_expr @expr end |
#to_s ⇒ Object Also known as: inspect
360 361 362 |
# File 'lib/spark_connect/column.rb', line 360 def to_s "Column<#{@expr.expr_type}>" end |
#when(condition, value) ⇒ Column
—- CASE WHEN ——————————————————— Add a branch to a CASE expression started by Functions#when.
321 322 323 324 325 326 327 328 329 330 |
# File 'lib/spark_connect/column.rb', line 321 def when(condition, value) unless @expr.expr_type == :unresolved_function && @expr.unresolved_function.function_name == "when" raise IllegalArgumentError, "when() can only be applied on a Column previously generated by when()" end args = @expr.unresolved_function.arguments.to_a + [Column.to_col(condition).to_expr, Column.to_col(value).to_expr] Column.new(Proto::Expression.new( unresolved_function: Proto::Expression::UnresolvedFunction.new(function_name: "when", arguments: args) )) end |
#|(other) ⇒ Object
208 |
# File 'lib/spark_connect/column.rb', line 208 def |(other) = bin_op("or", other) |