Class: SparkConnect::SparkConnectClient

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/client.rb

Overview

The low-level Spark Connect client. Wraps the gRPC stub and exposes the four core RPC families used by the high-level API: #execute_plan, #execute_command, #analyze, and #config. Higher layers (SparkSession, DataFrame) never touch the stub directly.

Transient transport failures (e.g. ‘GRPC::Unavailable`) are retried with exponential backoff and jitter before any response data has been observed.

Defined Under Namespace

Classes: ExecuteResult

Constant Summary collapse

Proto =
SparkConnect::Proto

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel_builder, session_id: nil, max_retries: 10, retry_base_delay: 0.05, max_retry_delay: 10.0) ⇒ SparkConnectClient

Returns a new instance of SparkConnectClient.

Parameters:

  • channel_builder (ChannelBuilder)
  • session_id (String, nil) (defaults to: nil)

    reuse a session id, otherwise generated.

  • max_retries (Integer) (defaults to: 10)
  • retry_base_delay (Float) (defaults to: 0.05)

    base backoff in seconds.



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/spark_connect/client.rb', line 45

def initialize(channel_builder, session_id: nil, max_retries: 10, retry_base_delay: 0.05, max_retry_delay: 10.0)
  @channel_builder = channel_builder
  @stub = channel_builder.build_stub
  @metadata = channel_builder.
  @session_id = session_id || channel_builder.session_id || SecureRandom.uuid
  @client_type = channel_builder.user_agent
  @user_context = Proto::UserContext.new(user_id: channel_builder.user_id || "")
  @max_retries = max_retries
  @retry_base_delay = retry_base_delay
  @max_retry_delay = max_retry_delay
  @server_side_session_id = nil
  @tags = []
end

Instance Attribute Details

#arrow_batchesArray<String> (readonly)

Returns each element is one Arrow IPC stream chunk.

Returns:

  • (Array<String>)

    each element is one Arrow IPC stream chunk.



28
29
30
31
32
# File 'lib/spark_connect/client.rb', line 28

ExecuteResult = Struct.new(
  :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count,
  :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation,
  :pipeline_command_result, :pipeline_events
)

#channel_builderChannelBuilder (readonly)

Returns:



39
40
41
# File 'lib/spark_connect/client.rb', line 39

def channel_builder
  @channel_builder
end

#client_typeString (readonly)

Returns the user agent / client type.

Returns:

  • (String)

    the user agent / client type.



37
38
39
# File 'lib/spark_connect/client.rb', line 37

def client_type
  @client_type
end

#metricsSpark::Connect::ExecutePlanResponse::Metrics? (readonly)

Returns:

  • (Spark::Connect::ExecutePlanResponse::Metrics, nil)


28
29
30
31
32
# File 'lib/spark_connect/client.rb', line 28

ExecuteResult = Struct.new(
  :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count,
  :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation,
  :pipeline_command_result, :pipeline_events
)

#observed_metricsArray (readonly)

Returns observed (named) metrics.

Returns:

  • (Array)

    observed (named) metrics.



28
29
30
31
32
# File 'lib/spark_connect/client.rb', line 28

ExecuteResult = Struct.new(
  :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count,
  :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation,
  :pipeline_command_result, :pipeline_events
)

#schemaSpark::Connect::DataType? (readonly)

Returns result schema, if returned.

Returns:



28
29
30
31
32
# File 'lib/spark_connect/client.rb', line 28

ExecuteResult = Struct.new(
  :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count,
  :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation,
  :pipeline_command_result, :pipeline_events
)

#session_idString (readonly)

Returns the client-side session id (UUID v4).

Returns:

  • (String)

    the client-side session id (UUID v4).



35
36
37
# File 'lib/spark_connect/client.rb', line 35

def session_id
  @session_id
end

#sql_command_resultSpark::Connect::Relation? (readonly)

Returns relation produced by a SQL command.

Returns:



28
29
30
31
32
# File 'lib/spark_connect/client.rb', line 28

ExecuteResult = Struct.new(
  :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count,
  :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation,
  :pipeline_command_result, :pipeline_events
)

#tagsArray<String> (readonly)

Operation tags attached to every subsequent execution (used with #interrupt ‘type: :tag`).

Returns:

  • (Array<String>)


62
63
64
# File 'lib/spark_connect/client.rb', line 62

def tags
  @tags
end

Instance Method Details

#add_tag(tag) ⇒ void

This method returns an undefined value.

Add an operation tag. Tags must be non-empty and contain no commas.



66
67
68
69
70
71
72
73
# File 'lib/spark_connect/client.rb', line 66

def add_tag(tag)
  tag = tag.to_s
  raise IllegalArgumentError, "Tag must not be empty" if tag.empty?
  raise IllegalArgumentError, "Tag must not contain ','" if tag.include?(",")

  @tags << tag unless @tags.include?(tag)
  nil
end

#analyze(**analyze_kw) ⇒ Spark::Connect::AnalyzePlanResponse

Run an ‘AnalyzePlan` request.

Parameters:

  • analyze_kw (Hash)

    exactly one ‘analyze` oneof keyword, e.g. `schema:`, `explain:`, `tree_string:`, `is_local:`, `spark_version:`.

Returns:



108
109
110
111
112
113
114
115
116
# File 'lib/spark_connect/client.rb', line 108

def analyze(**analyze_kw)
  req = Proto::AnalyzePlanRequest.new(
    session_id: @session_id,
    user_context: @user_context,
    client_type: @client_type,
    **analyze_kw
  )
  with_retries { @stub.analyze_plan(req, metadata: @metadata) }
end

#clear_tagsObject

Remove all operation tags. @return [void]



82
83
84
85
# File 'lib/spark_connect/client.rb', line 82

def clear_tags
  @tags.clear
  nil
end

#config(operation) ⇒ Spark::Connect::ConfigResponse

Run a ‘Config` request.

Parameters:

  • operation (Spark::Connect::ConfigRequest::Operation)

Returns:



122
123
124
125
126
127
128
129
130
# File 'lib/spark_connect/client.rb', line 122

def config(operation)
  req = Proto::ConfigRequest.new(
    session_id: @session_id,
    user_context: @user_context,
    client_type: @client_type,
    operation: operation
  )
  with_retries { @stub.config(req, metadata: @metadata) }
end

#execute_command(command) ⇒ ExecuteResult

Execute a command plan (side-effecting, e.g. write/SQL DML).

Parameters:

Returns:



99
100
101
# File 'lib/spark_connect/client.rb', line 99

def execute_command(command)
  execute(PlanBuilder.command_plan(command))
end

#execute_plan(relation) ⇒ ExecuteResult

Execute a relation plan and accumulate the streamed response.

Parameters:

Returns:



91
92
93
# File 'lib/spark_connect/client.rb', line 91

def execute_plan(relation)
  execute(PlanBuilder.root_plan(relation))
end

#interrupt(type: :all, value: nil) ⇒ Spark::Connect::InterruptResponse

Interrupt running operations.

Parameters:

  • type (Symbol) (defaults to: :all)

    ‘:all`, `:tag`, or `:operation_id`.

  • value (String, nil) (defaults to: nil)

    the tag or operation id when applicable.

Returns:



137
138
139
140
141
142
143
144
145
# File 'lib/spark_connect/client.rb', line 137

def interrupt(type: :all, value: nil)
  kw = { interrupt_type: :"INTERRUPT_TYPE_#{type.to_s.upcase}" }
  kw[:operation_tag] = value if type == :tag
  kw[:operation_id] = value if type == :operation_id
  req = Proto::InterruptRequest.new(
    session_id: @session_id, user_context: @user_context, client_type: @client_type, **kw
  )
  with_retries { @stub.interrupt(req, metadata: @metadata) }
end

#release_sessionvoid

This method returns an undefined value.

Release this client’s server-side session.



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/spark_connect/client.rb', line 149

def release_session
  req = Proto::ReleaseSessionRequest.new(
    session_id: @session_id, user_context: @user_context, client_type: @client_type
  )
  # Best-effort and non-retrying: this runs on teardown, so a dead server
  # must not block the caller with the retry/backoff loop.
  @stub.release_session(req, metadata: @metadata)
  nil
rescue StandardError
  nil
end

#remove_tag(tag) ⇒ Object

Remove an operation tag. @return [void]



76
77
78
79
# File 'lib/spark_connect/client.rb', line 76

def remove_tag(tag)
  @tags.delete(tag.to_s)
  nil
end