Class: SparkConnect::StreamingQueryManager

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/streaming.rb

Overview

Manages the streaming queries of a session. Returned by SparkConnect::SparkSession#streams. Mirrors PySpark’s ‘StreamingQueryManager`.

Constant Summary collapse

Proto =
SparkConnect::Proto
MCmd =
Proto::StreamingQueryManagerCommand

Instance Method Summary collapse

Constructor Details

#initialize(session) ⇒ StreamingQueryManager

Returns a new instance of StreamingQueryManager.

Parameters:



307
308
309
# File 'lib/spark_connect/streaming.rb', line 307

def initialize(session)
  @session = session
end

Instance Method Details

#activeArray<StreamingQuery>

Returns the currently active queries.

Returns:



312
313
314
# File 'lib/spark_connect/streaming.rb', line 312

def active
  command(active: true).active.active_queries.map { |q| StreamingQuery.new(@session, q.id, q.name) }
end

#await_any_termination(timeout_ms = nil) ⇒ Boolean

Block until any query terminates, or until ‘timeout_ms` elapses.

Parameters:

  • timeout_ms (Integer, nil) (defaults to: nil)

Returns:

  • (Boolean)


331
332
333
334
335
# File 'lib/spark_connect/streaming.rb', line 331

def await_any_termination(timeout_ms = nil)
  ac = MCmd::AwaitAnyTerminationCommand.new
  ac.timeout_ms = timeout_ms if timeout_ms
  command(await_any_termination: ac).await_any_termination.terminated
end

#get(id) ⇒ StreamingQuery?

Look up an active query by its id.

Parameters:

  • id (String)

Returns:



320
321
322
323
324
325
# File 'lib/spark_connect/streaming.rb', line 320

def get(id)
  result = command(get_query: id.to_s)
  return nil unless result.result_type == :query

  StreamingQuery.new(@session, result.query.id, result.query.name)
end

#reset_terminatedObject

Forget the cached termination state of all queries (so a subsequent #await_any_termination blocks again). @return [void]



339
340
341
342
# File 'lib/spark_connect/streaming.rb', line 339

def reset_terminated
  command(reset_terminated: true)
  nil
end