Class: SparkConnect::DataStreamReader

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/streaming.rb

Overview

Loads a streaming DataFrame from a streaming source. Returned by SparkSession#read_stream. Mirrors PySpark’s ‘DataStreamReader`.

Examples:

df = spark.read_stream.format("rate").option("rowsPerSecond", 5).load

Constant Summary collapse

Proto =
SparkConnect::Proto

Instance Method Summary collapse

Constructor Details

#initialize(session) ⇒ DataStreamReader

Returns a new instance of DataStreamReader.

Parameters:



15
16
17
18
19
20
# File 'lib/spark_connect/streaming.rb', line 15

def initialize(session)
  @session = session
  @format = nil
  @schema = nil
  @options = {}
end

Instance Method Details

#csv(path) ⇒ DataFrame

Returns convenience for ‘format(…).load(path)`.

Returns:

  • (DataFrame)

    convenience for ‘format(…).load(path)`.



64
# File 'lib/spark_connect/streaming.rb', line 64

def csv(path) = format("csv").load(path)

#format(source) ⇒ self

Returns set the streaming source format (‘“rate”`, `“kafka”`, …).

Returns:

  • (self)

    set the streaming source format (‘“rate”`, `“kafka”`, …).



23
24
25
26
# File 'lib/spark_connect/streaming.rb', line 23

def format(source)
  @format = source.to_s
  self
end

#json(path) ⇒ Object



65
# File 'lib/spark_connect/streaming.rb', line 65

def json(path) = format("json").load(path)

#load(path = nil) ⇒ DataFrame

Load a streaming DataFrame from the configured source.

Parameters:

  • path (String, nil) (defaults to: nil)

Returns:



50
51
52
53
54
55
# File 'lib/spark_connect/streaming.rb', line 50

def load(path = nil)
  ds = Proto::Read::DataSource.new(options: @options, paths: path ? [path.to_s] : [])
  ds.format = @format if @format
  ds.schema = @schema if @schema
  stream_relation(data_source: ds)
end

#option(key, value) ⇒ self

Returns set a single source option.

Returns:

  • (self)

    set a single source option.



35
36
37
38
# File 'lib/spark_connect/streaming.rb', line 35

def option(key, value)
  @options[key.to_s] = value.to_s
  self
end

#options(opts) ⇒ self

Returns set multiple source options.

Returns:

  • (self)

    set multiple source options.



41
42
43
44
# File 'lib/spark_connect/streaming.rb', line 41

def options(opts)
  opts.each { |k, v| @options[k.to_s] = v.to_s }
  self
end

#orc(path) ⇒ Object



67
# File 'lib/spark_connect/streaming.rb', line 67

def orc(path) = format("orc").load(path)

#parquet(path) ⇒ Object



66
# File 'lib/spark_connect/streaming.rb', line 66

def parquet(path) = format("parquet").load(path)

#schema(schema) ⇒ self

Returns set the input schema (a Types::StructType or DDL string).

Returns:



29
30
31
32
# File 'lib/spark_connect/streaming.rb', line 29

def schema(schema)
  @schema = schema.is_a?(Types::StructType) ? schema.simple_string : schema.to_s
  self
end

#table(name) ⇒ DataFrame

Load a streaming DataFrame from a registered table.

Returns:



59
60
61
# File 'lib/spark_connect/streaming.rb', line 59

def table(name)
  stream_relation(named_table: Proto::Read::NamedTable.new(unparsed_identifier: name.to_s, options: @options))
end

#text(path) ⇒ Object



68
# File 'lib/spark_connect/streaming.rb', line 68

def text(path) = format("text").load(path)