Class: SparkConnect::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/pipelines.rb

Overview

Note:

‘foreach`/`foreachBatch` flows and Python query-function evaluation are not supported (they require UDFs); define each flow with a relation instead.

A Spark Declarative Pipeline (SDP) dataflow graph.

A pipeline is built by registering outputs (tables, materialized views, temporary views, or sinks) and the flows that populate them, then started with #start_run. Each flow is defined by a DataFrame (an unresolved relation), so you compose flows with the same API you use for ordinary queries.

Create one with SparkSession#pipeline.

Examples:

pipe = spark.pipeline(storage: "/tmp/pipeline_storage")
pipe.create_materialized_view("bronze", spark.read.json("/data/raw"))
pipe.create_table("silver", pipe.read("bronze").filter(F.col("ok")))
events = pipe.start_run

Constant Summary collapse

Proto =
SparkConnect::Proto
PC =
Proto::PipelineCommand
OUTPUT_TYPES =
{
  table: :TABLE,
  materialized_view: :MATERIALIZED_VIEW,
  temporary_view: :TEMPORARY_VIEW,
  sink: :SINK,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, default_catalog: nil, default_database: nil, sql_conf: {}) ⇒ Pipeline

Returns a new instance of Pipeline.

Parameters:

  • session (SparkSession)
  • default_catalog (String, nil) (defaults to: nil)
  • default_database (String, nil) (defaults to: nil)
  • sql_conf (Hash{String=>String}) (defaults to: {})


43
44
45
46
47
48
49
50
# File 'lib/spark_connect/pipelines.rb', line 43

def initialize(session, default_catalog: nil, default_database: nil, sql_conf: {})
  @session = session
  cmd = PC::CreateDataflowGraph.new(sql_conf: stringify(sql_conf))
  cmd.default_catalog = default_catalog if default_catalog
  cmd.default_database = default_database if default_database
  result = dispatch(PC.new(create_dataflow_graph: cmd))
  @graph_id = result.pipeline_command_result.create_dataflow_graph_result.dataflow_graph_id
end

Instance Attribute Details

#graph_idString (readonly)

Returns the server-assigned dataflow graph id.

Returns:

  • (String)

    the server-assigned dataflow graph id.



37
38
39
# File 'lib/spark_connect/pipelines.rb', line 37

def graph_id
  @graph_id
end

Instance Method Details

#create_materialized_view(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) ⇒ String

Define a materialized view and the flow that populates it.

Returns:

  • (String)


75
76
77
78
79
80
# File 'lib/spark_connect/pipelines.rb', line 75

def create_materialized_view(name, df = nil, comment: nil, format: nil, partition_cols: [],
                             clustering_columns: [], table_properties: {}, schema: nil)
  define_table_output(name, :materialized_view, df, comment: comment, format: format,
                                                    partition_cols: partition_cols, clustering_columns: clustering_columns,
                                                    table_properties: table_properties, schema: schema)
end

#create_sink(name, df, format: nil, options: {}) ⇒ String

Define a streaming sink.

Parameters:

  • name (String)
  • df (DataFrame)

    the flow feeding the sink.

  • format (String, nil) (defaults to: nil)
  • options (Hash{String=>String}) (defaults to: {})

Returns:

  • (String)


95
96
97
98
99
100
101
# File 'lib/spark_connect/pipelines.rb', line 95

def create_sink(name, df, format: nil, options: {})
  sink = PC::DefineOutput::SinkDetails.new(options: stringify(options))
  sink.format = format if format
  define_output(name, :sink, sink_details: sink)
  define_flow(name, df, target: name)
  name
end

#create_table(name, df = nil, comment: nil, format: nil, partition_cols: [], clustering_columns: [], table_properties: {}, schema: nil) ⇒ String

Define a published table and the flow that populates it.

Parameters:

  • name (String)
  • df (DataFrame, nil) (defaults to: nil)

    the query that populates the table (a flow).

Returns:

  • (String)

    the resolved output identifier.



66
67
68
69
70
71
# File 'lib/spark_connect/pipelines.rb', line 66

def create_table(name, df = nil, comment: nil, format: nil, partition_cols: [],
                 clustering_columns: [], table_properties: {}, schema: nil)
  define_table_output(name, :table, df, comment: comment, format: format,
                                        partition_cols: partition_cols, clustering_columns: clustering_columns,
                                        table_properties: table_properties, schema: schema)
end

#create_temporary_view(name, df = nil, comment: nil) ⇒ String

Define a (non-published) temporary view and its flow.

Returns:

  • (String)


84
85
86
# File 'lib/spark_connect/pipelines.rb', line 84

def create_temporary_view(name, df = nil, comment: nil)
  define_table_output(name, :temporary_view, df, comment: comment)
end

#define_flow(name, df, target: nil, once: false, sql_conf: {}) ⇒ String

Define a flow that writes the contents of ‘df` into `target`.

Parameters:

  • name (String)

    the flow name.

  • df (DataFrame)
  • target (String) (defaults to: nil)

    the dataset the flow writes to (defaults to ‘name`).

  • once (Boolean) (defaults to: false)

    define as a one-time (batch) flow.

  • sql_conf (Hash{String=>String}) (defaults to: {})

Returns:

  • (String)

    the resolved flow name.



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/spark_connect/pipelines.rb', line 111

def define_flow(name, df, target: nil, once: false, sql_conf: {})
  flow = PC::DefineFlow.new(
    dataflow_graph_id: @graph_id, flow_name: name.to_s, target_dataset_name: (target || name).to_s,
    sql_conf: stringify(sql_conf),
    relation_flow_details: PC::DefineFlow::WriteRelationFlowDetails.new(relation: df.relation)
  )
  # `once` is optional: only set it when true, since the server rejects the
  # option being present at all for non-one-time flows (e.g. MV flows).
  flow.once = true if once
  result = dispatch(PC.new(define_flow: flow))
  identifier_string(result.pipeline_command_result&.define_flow_result&.resolved_identifier) || name.to_s
end

#define_sql(sql_text, sql_file_path: nil) ⇒ void

This method returns an undefined value.

Register datasets and flows from a SQL definition file.

Parameters:

  • sql_text (String)

    the SQL source.

  • sql_file_path (String, nil) (defaults to: nil)


129
130
131
132
133
134
# File 'lib/spark_connect/pipelines.rb', line 129

def define_sql(sql_text, sql_file_path: nil)
  el = PC::DefineSqlGraphElements.new(dataflow_graph_id: @graph_id, sql_text: sql_text.to_s)
  el.sql_file_path = sql_file_path if sql_file_path
  dispatch(PC.new(define_sql_graph_elements: el))
  nil
end

#dropvoid

This method returns an undefined value.

Drop this dataflow graph and stop any attached flows.



163
164
165
166
# File 'lib/spark_connect/pipelines.rb', line 163

def drop
  dispatch(PC.new(drop_dataflow_graph: PC::DropDataflowGraph.new(dataflow_graph_id: @graph_id)))
  nil
end

#read(name) ⇒ DataFrame

Reference a dataset defined in this pipeline as a DataFrame (so later flows can read from earlier outputs).

Parameters:

  • name (String)

Returns:



57
58
59
# File 'lib/spark_connect/pipelines.rb', line 57

def read(name)
  @session.read.table(name)
end

#start_run(full_refresh: [], full_refresh_all: false, refresh: [], dry: false, storage: nil) {|event| ... } ⇒ Array<PipelineEvent>

Resolve the graph and run a pipeline update. Blocks until the update completes, returning the events emitted during the run.

Parameters:

  • full_refresh (Array<String>) (defaults to: [])

    datasets to reset and recompute.

  • full_refresh_all (Boolean) (defaults to: false)

    reset and recompute everything.

  • refresh (Array<String>) (defaults to: [])

    datasets to update.

  • dry (Boolean) (defaults to: false)

    validate the graph without executing flows.

  • storage (String, nil) (defaults to: nil)

    checkpoint/metadata storage location.

Yield Parameters:

Returns:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/spark_connect/pipelines.rb', line 146

def start_run(full_refresh: [], full_refresh_all: false, refresh: [], dry: false, storage: nil, &block)
  run = PC::StartRun.new(
    dataflow_graph_id: @graph_id,
    full_refresh_selection: Array(full_refresh).map(&:to_s),
    full_refresh_all: full_refresh_all,
    refresh_selection: Array(refresh).map(&:to_s),
    dry: dry
  )
  run.storage = storage if storage
  result = dispatch(PC.new(start_run: run))
  events = result.pipeline_events.map { |e| PipelineEvent.new(e.timestamp, e.message) }
  events.each(&block) if block
  events
end