Class: SimpleFlow::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_flow/pipeline.rb

Overview

The Pipeline class facilitates the creation and execution of a sequence of steps (or operations), with the possibility of inserting middleware to modify or handle the processing in a flexible way. This allows for a clean and modular design where components can be easily added, removed, or replaced without affecting the overall logic flow. It is particularly useful for scenarios where a set of operations needs to be performed in a specific order, and you want to maintain the capability to inject additional behavior (like logging, authorization, or input/output transformations) at any point in this sequence.

Example Usage: pipeline = SimpleFlow::Pipeline.new do

use_middleware SomeMiddlewareClass, option: value
step ->(input) { do_something_with(input) }
step AnotherCallableObject

end

result = pipeline.call(initial_data)

Parallel Execution with Named Steps: pipeline = SimpleFlow::Pipeline.new do

step :fetch_user, ->(result) { ... }, depends_on: :none
step :fetch_orders, ->(result) { ... }, depends_on: [:fetch_user]
step :fetch_products, ->(result) { ... }, depends_on: [:fetch_user]
step :calculate, ->(result) { ... }, depends_on: [:fetch_orders, :fetch_products]

end

result = pipeline.call_parallel(initial_data) # Auto-detects parallelism

Note: You can use either depends_on: [] or depends_on: :none for clarity

Explicit Parallel Blocks: pipeline = SimpleFlow::Pipeline.new do

step ->(result) { ... }
parallel do
  step ->(result) { ... }
  step ->(result) { ... }
end
step ->(result) { ... }

end

Defined Under Namespace

Classes: ParallelBlock

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency: :auto, &config) ⇒ Pipeline

Initializes a new Pipeline object. A block can be provided to dynamically configure the pipeline, allowing the addition of steps and middleware.

Parameters:

  • concurrency (Symbol) (defaults to: :auto)

    concurrency model to use (:auto, :threads, :async)

    • :auto (default) - uses async if available, falls back to threads

    • :threads - always uses Ruby threads

    • :async - uses async gem (raises error if not available)



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/simple_flow/pipeline.rb', line 50

def initialize(concurrency: :auto, &config)
  @steps = []
  @middlewares = []
  @named_steps = {}
  @step_dependencies = {}
  @parallel_groups = {}
  @optional_steps = Set.new
  @concurrency = concurrency

  validate_concurrency!

  instance_eval(&config) if block_given?
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def concurrency
  @concurrency
end

#middlewaresObject (readonly)

Returns the value of attribute middlewares.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def middlewares
  @middlewares
end

#named_stepsObject (readonly)

Returns the value of attribute named_steps.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def named_steps
  @named_steps
end

#optional_stepsObject (readonly)

Returns the value of attribute optional_steps.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def optional_steps
  @optional_steps
end

#parallel_groupsObject (readonly)

Returns the value of attribute parallel_groups.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def parallel_groups
  @parallel_groups
end

#step_dependenciesObject (readonly)

Returns the value of attribute step_dependencies.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def step_dependencies
  @step_dependencies
end

#stepsObject (readonly)

Returns the value of attribute steps.



42
43
44
# File 'lib/simple_flow/pipeline.rb', line 42

def steps
  @steps
end

Instance Method Details

#apply_middleware(callable) ⇒ Object

Internal: Applies registered middlewares to a callable.

Parameters:

  • callable (Proc, Object)

    the target callable to wrap with middleware

Returns:

  • (Object)

    the callable wrapped with all registered middleware



172
173
174
175
176
177
178
179
180
181
# File 'lib/simple_flow/pipeline.rb', line 172

def apply_middleware(callable)
  @middlewares.reverse_each do |middleware, options|
    if middleware.is_a?(Proc)
      callable = middleware.call(callable)
    else
      callable = middleware.new(callable, **options)
    end
  end
  callable
end

#async_available?Boolean

Check if async gem is available for parallel execution

Returns:

  • (Boolean)


218
219
220
# File 'lib/simple_flow/pipeline.rb', line 218

def async_available?
  ParallelExecutor.async_available?
end

#call(result) ⇒ Object

Executes the pipeline with a given initial result. Each step is called in order, and the result of a step is passed to the next. Execution can be short-circuited by a step returning an object that does not satisfy a ‘continue?` condition.

Parameters:

  • result (Object)

    the initial data/input to be passed through the pipeline

Returns:

  • (Object)

    the result of executing the pipeline



188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/simple_flow/pipeline.rb', line 188

def call(result)
  steps.reduce(result) do |res, step_def|
    return res if res.respond_to?(:continue?) && !res.continue?

    case step_def
    when Hash
      execute_step_def(step_def, res)
    else
      # Backward compatibility with old format
      step_def.call(res)
    end
  end
end

#call_parallel(result, strategy: :auto, max_concurrent: nil) ⇒ Object

Executes the pipeline with parallel execution where possible. For named steps with dependencies, automatically detects which steps can run in parallel. For explicit parallel blocks, executes them concurrently.

Parameters:

  • result (Object)

    the initial data/input to be passed through the pipeline

  • strategy (Symbol) (defaults to: :auto)

    :auto (automatic detection) or :explicit (only explicit parallel blocks)

Returns:

  • (Object)

    the result of executing the pipeline



208
209
210
211
212
213
214
# File 'lib/simple_flow/pipeline.rb', line 208

def call_parallel(result, strategy: :auto, max_concurrent: nil)
  if strategy == :auto && has_named_steps?
    execute_with_dependency_graph(result, max_concurrent: max_concurrent)
  else
    execute_with_explicit_parallelism(result, max_concurrent: max_concurrent)
  end
end

#dependency_graphDependencyGraph?

Get the dependency graph for this pipeline

Returns:



224
225
226
227
# File 'lib/simple_flow/pipeline.rb', line 224

def dependency_graph
  return nil unless has_named_steps?
  DependencyGraph.new(@step_dependencies)
end

#execution_planString?

Get execution plan for this pipeline

Returns:

  • (String, nil)

    execution plan or nil if no named steps



266
267
268
269
270
# File 'lib/simple_flow/pipeline.rb', line 266

def execution_plan
  visualizer = visualize
  return nil unless visualizer
  visualizer.to_execution_plan
end

#parallel(name = nil, depends_on: :none, &block) ⇒ self

Defines a parallel execution block. Steps within this block will execute concurrently.

Examples:

Named parallel group with dependencies

parallel :fetch_data, depends_on: :validate do
  step :fetch_orders, ->(result) { ... }
  step :fetch_products, ->(result) { ... }
end
step :process, ->(result) { ... }, depends_on: :fetch_data

Parameters:

  • name (Symbol, nil) (defaults to: nil)

    optional name for the parallel group

  • depends_on (Symbol, Array) (defaults to: :none)

    dependencies for this parallel group

  • block (Block)

    block containing step definitions

Returns:

  • (self)


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/simple_flow/pipeline.rb', line 138

def parallel(name = nil, depends_on: :none, &block)
  # Validate name if provided
  if name && [:none, :nothing].include?(name)
    raise ArgumentError, "Parallel group name '#{name}' is reserved. Please use a different name."
  end

  # Filter and expand dependencies
  filtered_deps = expand_dependencies(Array(depends_on).reject { |dep| [:none, :nothing].include?(dep) })

  # Create and evaluate the parallel block
  group = ParallelBlock.new(self)
  group.instance_eval(&block)

  if name
    # Named parallel group - track it for dependency resolution
    step_names = group.steps.map { |s| s[:name] }.compact
    @parallel_groups[name] = {
      steps: step_names,
      dependencies: filtered_deps
    }

    # Add dependencies from the parallel group to its contained steps
    step_names.each do |step_name|
      @step_dependencies[step_name] = filtered_deps
    end
  end

  @steps << { steps: group.steps, type: :parallel, name: name }
  self
end

#step(name_or_callable = nil, callable = nil, depends_on: [], &block) ⇒ self

Adds a step to the pipeline. Supports both named and unnamed steps.

Named steps with dependencies (for automatic parallel detection):

step :fetch_user, ->(result) { ... }, depends_on: []
step :process_data, ->(result) { ... }, depends_on: [:fetch_user]

Unnamed steps (traditional usage):

step ->(result) { ... }
step { |result| ... }

Parameters:

  • name_or_callable (Symbol, Proc, Object) (defaults to: nil)

    step name (Symbol) or callable object

  • callable (Proc, Object) (defaults to: nil)

    an object responding to call (if first param is a name)

  • options (Hash)

    options including :depends_on for dependency declaration

  • block (Block)

    a block to use as the step if no callable is provided

Returns:

  • (self)

    so that calls can be chained

Raises:

  • (ArgumentError)

    if neither a callable nor block is given, or if the provided object does not respond to call



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/simple_flow/pipeline.rb', line 88

def step(name_or_callable = nil, callable = nil, depends_on: [], &block)
  # Handle different calling patterns
  if name_or_callable.is_a?(Symbol)
    # Named step: step :name, ->(result) { ... }, depends_on: [...]
    name = name_or_callable
    callable ||= block

    # Validate step name
    if [:none, :nothing, :optional].include?(name)
      raise ArgumentError, "Step name '#{name}' is reserved. Please use a different name."
    end

    raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call)

    callable = apply_middleware(callable)
    @named_steps[name] = callable

    # Check if this is an optional step
    if depends_on == :optional
      @optional_steps << name
      @step_dependencies[name] = []
    else
      # Filter out reserved dependency symbols :none and :nothing, and expand parallel group names
      @step_dependencies[name] = expand_dependencies(Array(depends_on).reject { |dep| [:none, :nothing].include?(dep) })
    end

    @steps << { name: name, callable: callable, type: :named }
  else
    # Unnamed step: step ->(result) { ... } or step { |result| ... }
    callable = name_or_callable || block
    raise ArgumentError, "Step must respond to #call" unless callable.respond_to?(:call)

    callable = apply_middleware(callable)
    @steps << { callable: callable, type: :unnamed }
  end

  self
end

#use_middleware(middleware, options = {}) ⇒ Object

Registers a middleware to be applied to each step. Middlewares can be provided as Proc objects or any object that responds to ‘.new` with the callable to be wrapped and options hash.

Parameters:

  • middleware (Proc, Class)

    the middleware to be used

  • options (Hash) (defaults to: {})

    any options to be passed to the middleware upon initialization



68
69
70
# File 'lib/simple_flow/pipeline.rb', line 68

def use_middleware(middleware, options = {})
  @middlewares << [middleware, options]
end

#visualizeDependencyGraphVisualizer?

Create a visualizer for this pipeline’s dependency graph

Returns:



231
232
233
234
235
# File 'lib/simple_flow/pipeline.rb', line 231

def visualize
  graph = dependency_graph
  return nil unless graph
  DependencyGraphVisualizer.new(graph)
end

#visualize_ascii(show_groups: true) ⇒ String?

Print ASCII visualization of the pipeline’s dependency graph

Parameters:

  • show_groups (Boolean) (defaults to: true)

    whether to show parallel execution groups

Returns:

  • (String, nil)

    ASCII visualization or nil if no named steps



240
241
242
243
244
# File 'lib/simple_flow/pipeline.rb', line 240

def visualize_ascii(show_groups: true)
  visualizer = visualize
  return nil unless visualizer
  visualizer.to_ascii(show_groups: show_groups)
end

#visualize_dot(include_groups: true, orientation: 'TB') ⇒ String?

Export pipeline visualization to DOT format

Parameters:

  • include_groups (Boolean) (defaults to: true)

    whether to color-code parallel groups

  • orientation (String) (defaults to: 'TB')

    graph orientation: ‘TB’ or ‘LR’

Returns:

  • (String, nil)

    DOT format or nil if no named steps



250
251
252
253
254
# File 'lib/simple_flow/pipeline.rb', line 250

def visualize_dot(include_groups: true, orientation: 'TB')
  visualizer = visualize
  return nil unless visualizer
  visualizer.to_dot(include_groups: include_groups, orientation: orientation)
end

#visualize_mermaidString?

Export pipeline visualization to Mermaid format

Returns:

  • (String, nil)

    Mermaid format or nil if no named steps



258
259
260
261
262
# File 'lib/simple_flow/pipeline.rb', line 258

def visualize_mermaid
  visualizer = visualize
  return nil unless visualizer
  visualizer.to_mermaid
end