Class: SimpleFlow::Pipeline
- Inherits:
-
Object
- Object
- SimpleFlow::Pipeline
- Defined in:
- lib/simple_flow/pipeline.rb
Overview
The Pipeline class facilitates the creation and execution of a sequence of steps (or operations), with the possibility of inserting middleware to modify or handle the processing in a flexible way. This allows for a clean and modular design where components can be easily added, removed, or replaced without affecting the overall logic flow. It is particularly useful for scenarios where a set of operations needs to be performed in a specific order, and you want to maintain the capability to inject additional behavior (like logging, authorization, or input/output transformations) at any point in this sequence.
Example Usage: pipeline = SimpleFlow::Pipeline.new do
use_middleware SomeMiddlewareClass, option: value
step ->(input) { do_something_with(input) }
step AnotherCallableObject
end
result = pipeline.call(initial_data)
Parallel Execution with Named Steps: pipeline = SimpleFlow::Pipeline.new do
step :fetch_user, ->(result) { ... }, depends_on: :none
step :fetch_orders, ->(result) { ... }, depends_on: [:fetch_user]
step :fetch_products, ->(result) { ... }, depends_on: [:fetch_user]
step :calculate, ->(result) { ... }, depends_on: [:fetch_orders, :fetch_products]
end
result = pipeline.call_parallel(initial_data) # Auto-detects parallelism
Note: You can use either depends_on: [] or depends_on: :none for clarity
Explicit Parallel Blocks: pipeline = SimpleFlow::Pipeline.new do
step ->(result) { ... }
parallel do
step ->(result) { ... }
step ->(result) { ... }
end
step ->(result) { ... }
end
Defined Under Namespace
Classes: ParallelBlock
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#named_steps ⇒ Object
readonly
Returns the value of attribute named_steps.
-
#optional_steps ⇒ Object
readonly
Returns the value of attribute optional_steps.
-
#parallel_groups ⇒ Object
readonly
Returns the value of attribute parallel_groups.
-
#step_dependencies ⇒ Object
readonly
Returns the value of attribute step_dependencies.
-
#steps ⇒ Object
readonly
Returns the value of attribute steps.
Instance Method Summary collapse
-
#apply_middleware(callable) ⇒ Object
Internal: Applies registered middlewares to a callable.
-
#async_available? ⇒ Boolean
Check if async gem is available for parallel execution.
-
#call(result) ⇒ Object
Executes the pipeline with a given initial result.
-
#call_parallel(result, strategy: :auto, max_concurrent: nil) ⇒ Object
Executes the pipeline with parallel execution where possible.
-
#dependency_graph ⇒ DependencyGraph?
Get the dependency graph for this pipeline.
-
#execution_plan ⇒ String?
Get execution plan for this pipeline.
-
#initialize(concurrency: :auto, &config) ⇒ Pipeline
constructor
Initializes a new Pipeline object.
-
#parallel(name = nil, depends_on: :none, &block) ⇒ self
Defines a parallel execution block.
-
#step(name_or_callable = nil, callable = nil, depends_on: [], &block) ⇒ self
Adds a step to the pipeline.
-
#use_middleware(middleware, options = {}) ⇒ Object
Registers a middleware to be applied to each step.
-
#visualize ⇒ DependencyGraphVisualizer?
Create a visualizer for this pipeline’s dependency graph.
-
#visualize_ascii(show_groups: true) ⇒ String?
Print ASCII visualization of the pipeline’s dependency graph.
-
#visualize_dot(include_groups: true, orientation: 'TB') ⇒ String?
Export pipeline visualization to DOT format.
-
#visualize_mermaid ⇒ String?
Export pipeline visualization to Mermaid format.
Constructor Details
#initialize(concurrency: :auto, &config) ⇒ Pipeline
Initializes a new Pipeline object. A block can be provided to dynamically configure the pipeline, allowing the addition of steps and middleware.
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/simple_flow/pipeline.rb', line 50 def initialize(concurrency: :auto, &config) @steps = [] @middlewares = [] @named_steps = {} @step_dependencies = {} @parallel_groups = {} @optional_steps = Set.new @concurrency = concurrency validate_concurrency! instance_eval(&config) if block_given? end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def concurrency @concurrency end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def middlewares @middlewares end |
#named_steps ⇒ Object (readonly)
Returns the value of attribute named_steps.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def named_steps @named_steps end |
#optional_steps ⇒ Object (readonly)
Returns the value of attribute optional_steps.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def optional_steps @optional_steps end |
#parallel_groups ⇒ Object (readonly)
Returns the value of attribute parallel_groups.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def parallel_groups @parallel_groups end |
#step_dependencies ⇒ Object (readonly)
Returns the value of attribute step_dependencies.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def step_dependencies @step_dependencies end |
#steps ⇒ Object (readonly)
Returns the value of attribute steps.
42 43 44 |
# File 'lib/simple_flow/pipeline.rb', line 42 def steps @steps end |
Instance Method Details
#apply_middleware(callable) ⇒ Object
Internal: Applies registered middlewares to a callable.
172 173 174 175 176 177 178 179 180 181 |
# File 'lib/simple_flow/pipeline.rb', line 172 def apply_middleware(callable) @middlewares.reverse_each do |middleware, | if middleware.is_a?(Proc) callable = middleware.call(callable) else callable = middleware.new(callable, **) end end callable end |
#async_available? ⇒ Boolean
Check if async gem is available for parallel execution
218 219 220 |
# File 'lib/simple_flow/pipeline.rb', line 218 def async_available? ParallelExecutor.async_available? end |
#call(result) ⇒ Object
Executes the pipeline with a given initial result. Each step is called in order, and the result of a step is passed to the next. Execution can be short-circuited by a step returning an object that does not satisfy a ‘continue?` condition.
188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/simple_flow/pipeline.rb', line 188 def call(result) steps.reduce(result) do |res, step_def| return res if res.respond_to?(:continue?) && !res.continue? case step_def when Hash execute_step_def(step_def, res) else # Backward compatibility with old format step_def.call(res) end end end |
#call_parallel(result, strategy: :auto, max_concurrent: nil) ⇒ Object
Executes the pipeline with parallel execution where possible. For named steps with dependencies, automatically detects which steps can run in parallel. For explicit parallel blocks, executes them concurrently.
208 209 210 211 212 213 214 |
# File 'lib/simple_flow/pipeline.rb', line 208 def call_parallel(result, strategy: :auto, max_concurrent: nil) if strategy == :auto && has_named_steps? execute_with_dependency_graph(result, max_concurrent: max_concurrent) else execute_with_explicit_parallelism(result, max_concurrent: max_concurrent) end end |
#dependency_graph ⇒ DependencyGraph?
Get the dependency graph for this pipeline
224 225 226 227 |
# File 'lib/simple_flow/pipeline.rb', line 224 def dependency_graph return nil unless has_named_steps? DependencyGraph.new(@step_dependencies) end |
#execution_plan ⇒ String?
Get execution plan for this pipeline
266 267 268 269 270 |
# File 'lib/simple_flow/pipeline.rb', line 266 def execution_plan visualizer = visualize return nil unless visualizer visualizer.to_execution_plan end |
#parallel(name = nil, depends_on: :none, &block) ⇒ self
Defines a parallel execution block. Steps within this block will execute concurrently.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/simple_flow/pipeline.rb', line 138 def parallel(name = nil, depends_on: :none, &block) # Validate name if provided if name && [:none, :nothing].include?(name) raise ArgumentError, "Parallel group name '#{name}' is reserved. Please use a different name." end # Filter and expand dependencies filtered_deps = (Array(depends_on).reject { |dep| [:none, :nothing].include?(dep) }) # Create and evaluate the parallel block group = ParallelBlock.new(self) group.instance_eval(&block) if name # Named parallel group - track it for dependency resolution step_names = group.steps.map { |s| s[:name] }.compact @parallel_groups[name] = { steps: step_names, dependencies: filtered_deps } # Add dependencies from the parallel group to its contained steps step_names.each do |step_name| @step_dependencies[step_name] = filtered_deps end end @steps << { steps: group.steps, type: :parallel, name: name } self end |
#step(name_or_callable = nil, callable = nil, depends_on: [], &block) ⇒ self
Adds a step to the pipeline. Supports both named and unnamed steps.
Named steps with dependencies (for automatic parallel detection):
step :fetch_user, ->(result) { ... }, depends_on: []
step :process_data, ->(result) { ... }, depends_on: [:fetch_user]
Unnamed steps (traditional usage):
step ->(result) { ... }
step { |result| ... }
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/simple_flow/pipeline.rb', line 88 def step(name_or_callable = nil, callable = nil, depends_on: [], &block) # Handle different calling patterns if name_or_callable.is_a?(Symbol) # Named step: step :name, ->(result) { ... }, depends_on: [...] name = name_or_callable callable ||= block # Validate step name if [:none, :nothing, :optional].include?(name) raise ArgumentError, "Step name '#{name}' is reserved. Please use a different name." end raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call) callable = apply_middleware(callable) @named_steps[name] = callable # Check if this is an optional step if depends_on == :optional @optional_steps << name @step_dependencies[name] = [] else # Filter out reserved dependency symbols :none and :nothing, and expand parallel group names @step_dependencies[name] = (Array(depends_on).reject { |dep| [:none, :nothing].include?(dep) }) end @steps << { name: name, callable: callable, type: :named } else # Unnamed step: step ->(result) { ... } or step { |result| ... } callable = name_or_callable || block raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call) callable = apply_middleware(callable) @steps << { callable: callable, type: :unnamed } end self end |
#use_middleware(middleware, options = {}) ⇒ Object
Registers a middleware to be applied to each step. Middlewares can be provided as Proc objects or any object that responds to ‘.new` with the callable to be wrapped and options hash.
68 69 70 |
# File 'lib/simple_flow/pipeline.rb', line 68 def use_middleware(middleware, = {}) @middlewares << [middleware, ] end |
#visualize ⇒ DependencyGraphVisualizer?
Create a visualizer for this pipeline’s dependency graph
231 232 233 234 235 |
# File 'lib/simple_flow/pipeline.rb', line 231 def visualize graph = dependency_graph return nil unless graph DependencyGraphVisualizer.new(graph) end |
#visualize_ascii(show_groups: true) ⇒ String?
Print ASCII visualization of the pipeline’s dependency graph
240 241 242 243 244 |
# File 'lib/simple_flow/pipeline.rb', line 240 def visualize_ascii(show_groups: true) visualizer = visualize return nil unless visualizer visualizer.to_ascii(show_groups: show_groups) end |
#visualize_dot(include_groups: true, orientation: 'TB') ⇒ String?
Export pipeline visualization to DOT format
250 251 252 253 254 |
# File 'lib/simple_flow/pipeline.rb', line 250 def visualize_dot(include_groups: true, orientation: 'TB') visualizer = visualize return nil unless visualizer visualizer.to_dot(include_groups: include_groups, orientation: orientation) end |
#visualize_mermaid ⇒ String?
Export pipeline visualization to Mermaid format
258 259 260 261 262 |
# File 'lib/simple_flow/pipeline.rb', line 258 def visualize_mermaid visualizer = visualize return nil unless visualizer visualizer.to_mermaid end |