Class: SparkConnect::StreamingQuery

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/streaming.rb

Overview

A handle to a running streaming query. Returned by DataStreamWriter#start. Mirrors PySpark’s ‘StreamingQuery`.

Constant Summary collapse

Proto =
SparkConnect::Proto
Cmd =
Proto::StreamingQueryCommand

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, instance_id, name) ⇒ StreamingQuery

Returns a new instance of StreamingQuery.

Parameters:



218
219
220
221
222
223
224
# File 'lib/spark_connect/streaming.rb', line 218

def initialize(session, instance_id, name)
  @session = session
  @instance_id = instance_id
  @id = instance_id.id
  @run_id = instance_id.run_id
  @name = name.nil? || name.empty? ? nil : name
end

Instance Attribute Details

#idString (readonly)

Returns the stable query id (survives restarts from a checkpoint).

Returns:

  • (String)

    the stable query id (survives restarts from a checkpoint).



209
210
211
# File 'lib/spark_connect/streaming.rb', line 209

def id
  @id
end

#nameString? (readonly)

Returns the query name, if one was set.

Returns:

  • (String, nil)

    the query name, if one was set.



213
214
215
# File 'lib/spark_connect/streaming.rb', line 213

def name
  @name
end

#run_idString (readonly)

Returns the run id (unique per start).

Returns:

  • (String)

    the run id (unique per start).



211
212
213
# File 'lib/spark_connect/streaming.rb', line 211

def run_id
  @run_id
end

Instance Method Details

#active?Boolean

Returns whether the query is still running.

Returns:

  • (Boolean)

    whether the query is still running.



239
240
241
# File 'lib/spark_connect/streaming.rb', line 239

def active?
  status["isActive"]
end

#await_termination(timeout_ms = nil) ⇒ Boolean

Block until the query terminates, or until ‘timeout_ms` elapses.

Parameters:

  • timeout_ms (Integer, nil) (defaults to: nil)

Returns:

  • (Boolean)

    whether the query has terminated.



257
258
259
260
261
# File 'lib/spark_connect/streaming.rb', line 257

def await_termination(timeout_ms = nil)
  ac = Cmd::AwaitTerminationCommand.new
  ac.timeout_ms = timeout_ms if timeout_ms
  command(await_termination: ac).await_termination.terminated
end

#exceptionString?

Returns the query’s exception message, if it has failed.

Returns:

  • (String, nil)

    the query’s exception message, if it has failed.



277
278
279
280
# File 'lib/spark_connect/streaming.rb', line 277

def exception
  result = command(exception: true).exception
  result.exception_message && result.exception_message.empty? ? nil : result.exception_message
end

#explain(extended: false) ⇒ String

Returns the query’s execution plan.

Returns:

  • (String)

    the query’s execution plan.



283
284
285
# File 'lib/spark_connect/streaming.rb', line 283

def explain(extended: false)
  command(explain: Cmd::ExplainCommand.new(extended: extended)).explain.result
end

#last_progressHash?

Returns the most recent progress object, if any.

Returns:

  • (Hash, nil)

    the most recent progress object, if any.



249
250
251
# File 'lib/spark_connect/streaming.rb', line 249

def last_progress
  command(last_progress: true).recent_progress.recent_progress_json.map { |j| JSON.parse(j) }.last
end

#process_all_availablevoid

This method returns an undefined value.

Process all available data, then return (useful for tests with bounded sources).



265
266
267
268
# File 'lib/spark_connect/streaming.rb', line 265

def process_all_available
  command(process_all_available: true)
  nil
end

#recent_progressArray<Hash>

Returns parsed JSON progress objects for recent micro-batches.

Returns:

  • (Array<Hash>)

    parsed JSON progress objects for recent micro-batches.



244
245
246
# File 'lib/spark_connect/streaming.rb', line 244

def recent_progress
  command(recent_progress: true).recent_progress.recent_progress_json.map { |j| JSON.parse(j) }
end

#statusHash

Returns the current status (‘message`, `is_data_available`, `is_trigger_active`, `is_active`).

Returns:

  • (Hash)

    the current status (‘message`, `is_data_available`, `is_trigger_active`, `is_active`).



228
229
230
231
232
233
234
235
236
# File 'lib/spark_connect/streaming.rb', line 228

def status
  s = command(status: true).status
  {
    "message" => s.status_message,
    "isDataAvailable" => s.is_data_available,
    "isTriggerActive" => s.is_trigger_active,
    "isActive" => s.is_active,
  }
end

#stopObject

Stop the query. @return [void]



271
272
273
274
# File 'lib/spark_connect/streaming.rb', line 271

def stop
  command(stop: true)
  nil
end

#to_sObject Also known as: inspect



287
288
289
# File 'lib/spark_connect/streaming.rb', line 287

def to_s
  "#<SparkConnect::StreamingQuery id=#{@id} name=#{@name.inspect}>"
end