Class: SparkConnect::DataStreamWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/streaming.rb

Overview

Writes a streaming DataFrame to a streaming sink and starts the query. Returned by SparkConnect::DataFrame#write_stream. Mirrors PySpark’s ‘DataStreamWriter`.

‘foreach`/`foreach_batch` are intentionally unsupported: they require user-defined functions, whose Spark Connect protobuf definitions are not yet finalized.

Examples:

query = df.write_stream
          .format("console")
          .output_mode("append")
          .trigger(processing_time: "1 second")
          .start
query.stop

Constant Summary collapse

Proto =
SparkConnect::Proto
WSO =
Proto::WriteStreamOperationStart

Instance Method Summary collapse

Constructor Details

#initialize(df) ⇒ DataStreamWriter

Returns a new instance of DataStreamWriter.

Parameters:



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/spark_connect/streaming.rb', line 97

def initialize(df)
  @df = df
  @format = nil
  @options = {}
  @partitioning = []
  @output_mode = nil
  @query_name = nil
  @trigger = nil
  @path = nil
  @table = nil
end

Instance Method Details

#format(source) ⇒ self

Returns set the sink format (‘“console”`, `“memory”`, `“kafka”`, …).

Returns:

  • (self)

    set the sink format (‘“console”`, `“memory”`, `“kafka”`, …).



110
111
112
113
# File 'lib/spark_connect/streaming.rb', line 110

def format(source)
  @format = source.to_s
  self
end

#option(key, value) ⇒ self

Returns set a single sink option.

Returns:

  • (self)

    set a single sink option.



122
123
124
125
# File 'lib/spark_connect/streaming.rb', line 122

def option(key, value)
  @options[key.to_s] = value.to_s
  self
end

#options(opts) ⇒ self

Returns set multiple sink options.

Returns:

  • (self)

    set multiple sink options.



128
129
130
131
# File 'lib/spark_connect/streaming.rb', line 128

def options(opts)
  opts.each { |k, v| @options[k.to_s] = v.to_s }
  self
end

#output_mode(mode) ⇒ self

Returns set the output mode (‘“append”`, `“complete”`, `“update”`).

Returns:

  • (self)

    set the output mode (‘“append”`, `“complete”`, `“update”`).



116
117
118
119
# File 'lib/spark_connect/streaming.rb', line 116

def output_mode(mode)
  @output_mode = mode.to_s
  self
end

#partition_by(*cols) ⇒ self

Returns partition the output by these columns.

Returns:

  • (self)

    partition the output by these columns.



134
135
136
137
# File 'lib/spark_connect/streaming.rb', line 134

def partition_by(*cols)
  @partitioning = cols.flatten.map(&:to_s)
  self
end

#query_name(name) ⇒ self

Returns name the streaming query (required by the memory sink).

Returns:

  • (self)

    name the streaming query (required by the memory sink).



140
141
142
143
# File 'lib/spark_connect/streaming.rb', line 140

def query_name(name)
  @query_name = name.to_s
  self
end

#start(path = nil) ⇒ StreamingQuery

Start the streaming query to a file/data path.

Parameters:

  • path (String, nil) (defaults to: nil)

Returns:



166
167
168
169
# File 'lib/spark_connect/streaming.rb', line 166

def start(path = nil)
  @path = path if path
  run
end

#to_table(name) ⇒ StreamingQuery Also known as: toTable

Start the streaming query, writing into the given table.

Parameters:

  • name (String)

Returns:



175
176
177
178
# File 'lib/spark_connect/streaming.rb', line 175

def to_table(name)
  @table = name.to_s
  run
end

#trigger(processing_time: nil, once: nil, available_now: nil, continuous: nil) ⇒ self

Configure the query trigger. Provide exactly one of:

Parameters:

  • processing_time (String, nil) (defaults to: nil)

    e.g. ‘“10 seconds”` (micro-batch interval).

  • once (Boolean, nil) (defaults to: nil)

    process all available data once and stop.

  • available_now (Boolean, nil) (defaults to: nil)

    process all available data in (possibly) multiple batches, then stop.

  • continuous (String, nil) (defaults to: nil)

    continuous-processing checkpoint interval.

Returns:

  • (self)


152
153
154
155
156
157
158
159
160
# File 'lib/spark_connect/streaming.rb', line 152

def trigger(processing_time: nil, once: nil, available_now: nil, continuous: nil)
  @trigger =
    if processing_time then [:processing_time_interval, processing_time.to_s]
    elsif once then [:once, true]
    elsif available_now then [:available_now, true]
    elsif continuous then [:continuous_checkpoint_interval, continuous.to_s]
    end
  self
end