Class: SparkConnect::SparkConnectClient
- Inherits:
-
Object
- Object
- SparkConnect::SparkConnectClient
- Defined in:
- lib/spark_connect/client.rb
Overview
The low-level Spark Connect client. Wraps the gRPC stub and exposes the four core RPC families used by the high-level API: #execute_plan, #execute_command, #analyze, and #config. Higher layers (SparkSession, DataFrame) never touch the stub directly.
Transient transport failures (e.g. ‘GRPC::Unavailable`) are retried with exponential backoff and jitter before any response data has been observed.
Defined Under Namespace
Classes: ExecuteResult
Constant Summary collapse
- Proto =
SparkConnect::Proto
Instance Attribute Summary collapse
-
#arrow_batches ⇒ Array<String>
readonly
Each element is one Arrow IPC stream chunk.
- #channel_builder ⇒ ChannelBuilder readonly
-
#client_type ⇒ String
readonly
The user agent / client type.
- #metrics ⇒ Spark::Connect::ExecutePlanResponse::Metrics? readonly
-
#observed_metrics ⇒ Array
readonly
Observed (named) metrics.
-
#schema ⇒ Spark::Connect::DataType?
readonly
Result schema, if returned.
-
#session_id ⇒ String
readonly
The client-side session id (UUID v4).
-
#sql_command_result ⇒ Spark::Connect::Relation?
readonly
Relation produced by a SQL command.
-
#tags ⇒ Array<String>
readonly
Operation tags attached to every subsequent execution (used with #interrupt ‘type: :tag`).
Instance Method Summary collapse
-
#add_tag(tag) ⇒ void
Add an operation tag.
-
#analyze(**analyze_kw) ⇒ Spark::Connect::AnalyzePlanResponse
Run an ‘AnalyzePlan` request.
-
#clear_tags ⇒ Object
Remove all operation tags.
-
#config(operation) ⇒ Spark::Connect::ConfigResponse
Run a ‘Config` request.
-
#execute_command(command) ⇒ ExecuteResult
Execute a command plan (side-effecting, e.g. write/SQL DML).
-
#execute_plan(relation) ⇒ ExecuteResult
Execute a relation plan and accumulate the streamed response.
-
#initialize(channel_builder, session_id: nil, max_retries: 10, retry_base_delay: 0.05, max_retry_delay: 10.0) ⇒ SparkConnectClient
constructor
A new instance of SparkConnectClient.
-
#interrupt(type: :all, value: nil) ⇒ Spark::Connect::InterruptResponse
Interrupt running operations.
-
#release_session ⇒ void
Release this client’s server-side session.
-
#remove_tag(tag) ⇒ Object
Remove an operation tag.
Constructor Details
#initialize(channel_builder, session_id: nil, max_retries: 10, retry_base_delay: 0.05, max_retry_delay: 10.0) ⇒ SparkConnectClient
Returns a new instance of SparkConnectClient.
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/spark_connect/client.rb', line 45 def initialize(channel_builder, session_id: nil, max_retries: 10, retry_base_delay: 0.05, max_retry_delay: 10.0) @channel_builder = channel_builder @stub = channel_builder.build_stub @metadata = channel_builder. @session_id = session_id || channel_builder.session_id || SecureRandom.uuid @client_type = channel_builder.user_agent @user_context = Proto::UserContext.new(user_id: channel_builder.user_id || "") @max_retries = max_retries @retry_base_delay = retry_base_delay @max_retry_delay = max_retry_delay @server_side_session_id = nil @tags = [] end |
Instance Attribute Details
#arrow_batches ⇒ Array<String> (readonly)
Returns each element is one Arrow IPC stream chunk.
28 29 30 31 32 |
# File 'lib/spark_connect/client.rb', line 28 ExecuteResult = Struct.new( :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count, :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation, :pipeline_command_result, :pipeline_events ) |
#channel_builder ⇒ ChannelBuilder (readonly)
39 40 41 |
# File 'lib/spark_connect/client.rb', line 39 def channel_builder @channel_builder end |
#client_type ⇒ String (readonly)
Returns the user agent / client type.
37 38 39 |
# File 'lib/spark_connect/client.rb', line 37 def client_type @client_type end |
#metrics ⇒ Spark::Connect::ExecutePlanResponse::Metrics? (readonly)
28 29 30 31 32 |
# File 'lib/spark_connect/client.rb', line 28 ExecuteResult = Struct.new( :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count, :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation, :pipeline_command_result, :pipeline_events ) |
#observed_metrics ⇒ Array (readonly)
Returns observed (named) metrics.
28 29 30 31 32 |
# File 'lib/spark_connect/client.rb', line 28 ExecuteResult = Struct.new( :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count, :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation, :pipeline_command_result, :pipeline_events ) |
#schema ⇒ Spark::Connect::DataType? (readonly)
Returns result schema, if returned.
28 29 30 31 32 |
# File 'lib/spark_connect/client.rb', line 28 ExecuteResult = Struct.new( :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count, :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation, :pipeline_command_result, :pipeline_events ) |
#session_id ⇒ String (readonly)
Returns the client-side session id (UUID v4).
35 36 37 |
# File 'lib/spark_connect/client.rb', line 35 def session_id @session_id end |
#sql_command_result ⇒ Spark::Connect::Relation? (readonly)
Returns relation produced by a SQL command.
28 29 30 31 32 |
# File 'lib/spark_connect/client.rb', line 28 ExecuteResult = Struct.new( :arrow_batches, :schema, :metrics, :observed_metrics, :sql_command_result, :row_count, :write_stream_result, :streaming_query_result, :streaming_manager_result, :checkpoint_relation, :pipeline_command_result, :pipeline_events ) |
#tags ⇒ Array<String> (readonly)
Operation tags attached to every subsequent execution (used with #interrupt ‘type: :tag`).
62 63 64 |
# File 'lib/spark_connect/client.rb', line 62 def @tags end |
Instance Method Details
#add_tag(tag) ⇒ void
This method returns an undefined value.
Add an operation tag. Tags must be non-empty and contain no commas.
66 67 68 69 70 71 72 73 |
# File 'lib/spark_connect/client.rb', line 66 def add_tag(tag) tag = tag.to_s raise IllegalArgumentError, "Tag must not be empty" if tag.empty? raise IllegalArgumentError, "Tag must not contain ','" if tag.include?(",") @tags << tag unless @tags.include?(tag) nil end |
#analyze(**analyze_kw) ⇒ Spark::Connect::AnalyzePlanResponse
Run an ‘AnalyzePlan` request.
108 109 110 111 112 113 114 115 116 |
# File 'lib/spark_connect/client.rb', line 108 def analyze(**analyze_kw) req = Proto::AnalyzePlanRequest.new( session_id: @session_id, user_context: @user_context, client_type: @client_type, **analyze_kw ) with_retries { @stub.analyze_plan(req, metadata: @metadata) } end |
#clear_tags ⇒ Object
Remove all operation tags. @return [void]
82 83 84 85 |
# File 'lib/spark_connect/client.rb', line 82 def @tags.clear nil end |
#config(operation) ⇒ Spark::Connect::ConfigResponse
Run a ‘Config` request.
122 123 124 125 126 127 128 129 130 |
# File 'lib/spark_connect/client.rb', line 122 def config(operation) req = Proto::ConfigRequest.new( session_id: @session_id, user_context: @user_context, client_type: @client_type, operation: operation ) with_retries { @stub.config(req, metadata: @metadata) } end |
#execute_command(command) ⇒ ExecuteResult
Execute a command plan (side-effecting, e.g. write/SQL DML).
99 100 101 |
# File 'lib/spark_connect/client.rb', line 99 def execute_command(command) execute(PlanBuilder.command_plan(command)) end |
#execute_plan(relation) ⇒ ExecuteResult
Execute a relation plan and accumulate the streamed response.
91 92 93 |
# File 'lib/spark_connect/client.rb', line 91 def execute_plan(relation) execute(PlanBuilder.root_plan(relation)) end |
#interrupt(type: :all, value: nil) ⇒ Spark::Connect::InterruptResponse
Interrupt running operations.
137 138 139 140 141 142 143 144 145 |
# File 'lib/spark_connect/client.rb', line 137 def interrupt(type: :all, value: nil) kw = { interrupt_type: :"INTERRUPT_TYPE_#{type.to_s.upcase}" } kw[:operation_tag] = value if type == :tag kw[:operation_id] = value if type == :operation_id req = Proto::InterruptRequest.new( session_id: @session_id, user_context: @user_context, client_type: @client_type, **kw ) with_retries { @stub.interrupt(req, metadata: @metadata) } end |
#release_session ⇒ void
This method returns an undefined value.
Release this client’s server-side session.
149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/spark_connect/client.rb', line 149 def release_session req = Proto::ReleaseSessionRequest.new( session_id: @session_id, user_context: @user_context, client_type: @client_type ) # Best-effort and non-retrying: this runs on teardown, so a dead server # must not block the caller with the retry/backoff loop. @stub.release_session(req, metadata: @metadata) nil rescue StandardError nil end |
#remove_tag(tag) ⇒ Object
Remove an operation tag. @return [void]
76 77 78 79 |
# File 'lib/spark_connect/client.rb', line 76 def remove_tag(tag) @tags.delete(tag.to_s) nil end |