Class: SparkConnect::Pipeline
- Inherits:
-
Object
- Object
- SparkConnect::Pipeline
- Defined in:
- lib/spark_connect/pipelines.rb
Overview
‘foreach`/`foreachBatch` flows and Python query-function evaluation are not supported (they require UDFs); define each flow with a relation instead.
A Spark Declarative Pipeline (SDP) dataflow graph.
A pipeline is built by registering outputs (tables, materialized views, temporary views, or sinks) and the flows that populate them, then started with #start_run. Each flow is defined by a DataFrame (an unresolved relation), so you compose flows with the same API you use for ordinary queries.
Create one with SparkSession#pipeline.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- PC =
Proto::PipelineCommand
- OUTPUT_TYPES =
{ table: :TABLE, materialized_view: :MATERIALIZED_VIEW, temporary_view: :TEMPORARY_VIEW, sink: :SINK, }.freeze
Instance Attribute Summary collapse
-
#graph_id ⇒ String
readonly
The server-assigned dataflow graph id.
Instance Method Summary collapse
-
#create_materialized_view(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) ⇒ String
Define a materialized view and the flow that populates it.
-
#create_sink(name, df, format: nil, options: {}) ⇒ String
Define a streaming sink.
-
#create_table(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) ⇒ String
Define a published table and the flow that populates it.
-
#create_temporary_view(name, df = nil, comment: nil) ⇒ String
Define a (non-published) temporary view and its flow.
-
#define_flow(name, df, target: nil, once: false, sql_conf: {}) ⇒ String
Define a flow that writes the contents of ‘df` into `target`.
-
#define_sql(sql_text, sql_file_path: nil) ⇒ void
Register datasets and flows from a SQL definition file.
-
#drop ⇒ void
Drop this dataflow graph and stop any attached flows.
-
#initialize(session, default_catalog: nil, default_database: nil, sql_conf: {}) ⇒ Pipeline
constructor
A new instance of Pipeline.
-
#read(name) ⇒ DataFrame
Reference a dataset defined in this pipeline as a DataFrame (so later flows can read from earlier outputs).
-
#start_run(full_refresh: [], full_refresh_all: false, refresh: [], dry: false, storage: nil) {|event| ... } ⇒ Array<PipelineEvent>
Resolve the graph and run a pipeline update.
Constructor Details
#initialize(session, default_catalog: nil, default_database: nil, sql_conf: {}) ⇒ Pipeline
Returns a new instance of Pipeline.
43 44 45 46 47 48 49 50 |
# File 'lib/spark_connect/pipelines.rb', line 43 def initialize(session, default_catalog: nil, default_database: nil, sql_conf: {}) @session = session cmd = PC::CreateDataflowGraph.new(sql_conf: stringify(sql_conf)) cmd.default_catalog = default_catalog if default_catalog cmd.default_database = default_database if default_database result = dispatch(PC.new(create_dataflow_graph: cmd)) @graph_id = result.pipeline_command_result.create_dataflow_graph_result.dataflow_graph_id end |
Instance Attribute Details
#graph_id ⇒ String (readonly)
Returns the server-assigned dataflow graph id.
37 38 39 |
# File 'lib/spark_connect/pipelines.rb', line 37 def graph_id @graph_id end |
Instance Method Details
#create_materialized_view(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) ⇒ String
Define a materialized view and the flow that populates it.
75 76 77 78 79 80 |
# File 'lib/spark_connect/pipelines.rb', line 75 def create_materialized_view(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) define_table_output(name, :materialized_view, df, comment: comment, format: format, partition_cols: partition_cols, clustering_columns: clustering_columns, table_properties: table_properties, schema: schema) end |
#create_sink(name, df, format: nil, options: {}) ⇒ String
Define a streaming sink.
95 96 97 98 99 100 101 |
# File 'lib/spark_connect/pipelines.rb', line 95 def create_sink(name, df, format: nil, options: {}) sink = PC::DefineOutput::SinkDetails.new(options: stringify()) sink.format = format if format define_output(name, :sink, sink_details: sink) define_flow(name, df, target: name) name end |
#create_table(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) ⇒ String
Define a published table and the flow that populates it.
66 67 68 69 70 71 |
# File 'lib/spark_connect/pipelines.rb', line 66 def create_table(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) define_table_output(name, :table, df, comment: comment, format: format, partition_cols: partition_cols, clustering_columns: clustering_columns, table_properties: table_properties, schema: schema) end |
#create_temporary_view(name, df = nil, comment: nil) ⇒ String
Define a (non-published) temporary view and its flow.
84 85 86 |
# File 'lib/spark_connect/pipelines.rb', line 84 def create_temporary_view(name, df = nil, comment: nil) define_table_output(name, :temporary_view, df, comment: comment) end |
#define_flow(name, df, target: nil, once: false, sql_conf: {}) ⇒ String
Define a flow that writes the contents of ‘df` into `target`.
111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/spark_connect/pipelines.rb', line 111 def define_flow(name, df, target: nil, once: false, sql_conf: {}) flow = PC::DefineFlow.new( dataflow_graph_id: @graph_id, flow_name: name.to_s, target_dataset_name: (target || name).to_s, sql_conf: stringify(sql_conf), relation_flow_details: PC::DefineFlow::WriteRelationFlowDetails.new(relation: df.relation) ) # `once` is optional: only set it when true, since the server rejects the # option being present at all for non-one-time flows (e.g. MV flows). flow.once = true if once result = dispatch(PC.new(define_flow: flow)) identifier_string(result.pipeline_command_result&.define_flow_result&.resolved_identifier) || name.to_s end |
#define_sql(sql_text, sql_file_path: nil) ⇒ void
This method returns an undefined value.
Register datasets and flows from a SQL definition file.
129 130 131 132 133 134 |
# File 'lib/spark_connect/pipelines.rb', line 129 def define_sql(sql_text, sql_file_path: nil) el = PC::DefineSqlGraphElements.new(dataflow_graph_id: @graph_id, sql_text: sql_text.to_s) el.sql_file_path = sql_file_path if sql_file_path dispatch(PC.new(define_sql_graph_elements: el)) nil end |
#drop ⇒ void
This method returns an undefined value.
Drop this dataflow graph and stop any attached flows.
163 164 165 166 |
# File 'lib/spark_connect/pipelines.rb', line 163 def drop dispatch(PC.new(drop_dataflow_graph: PC::DropDataflowGraph.new(dataflow_graph_id: @graph_id))) nil end |
#read(name) ⇒ DataFrame
Reference a dataset defined in this pipeline as a DataFrame (so later flows can read from earlier outputs).
57 58 59 |
# File 'lib/spark_connect/pipelines.rb', line 57 def read(name) @session.read.table(name) end |
#start_run(full_refresh: [], full_refresh_all: false, refresh: [], dry: false, storage: nil) {|event| ... } ⇒ Array<PipelineEvent>
Resolve the graph and run a pipeline update. Blocks until the update completes, returning the events emitted during the run.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/spark_connect/pipelines.rb', line 146 def start_run(full_refresh: [], full_refresh_all: false, refresh: [], dry: false, storage: nil, &block) run = PC::StartRun.new( dataflow_graph_id: @graph_id, full_refresh_selection: Array(full_refresh).map(&:to_s), full_refresh_all: full_refresh_all, refresh_selection: Array(refresh).map(&:to_s), dry: dry ) run.storage = storage if storage result = dispatch(PC.new(start_run: run)) events = result.pipeline_events.map { |e| PipelineEvent.new(e., e.) } events.each(&block) if block events end |