Class: Musa::Series::Composer::Composer

Inherits:
Object
  • Object
show all
Defined in:
lib/musa-dsl/series/series-composer.rb

Overview

Multi-input/output serie transformation pipeline system.

Composer enables building complex transformation graphs with multiple named inputs, outputs, and intermediate processing stages. It provides a declarative DSL for defining pipelines and routing data between them, similar to modular synthesis patch routing.

Architecture

The Composer system consists of:

  • Pipelines: Named transformation stages that process series
  • Routes: Connections between pipelines specifying data flow
  • Inputs: Named entry points (automatically proxied and buffered)
  • Outputs: Named exit points for consuming results
  • DSL Context: method_missing-based interface for pipeline definition

Pipeline Definition

Pipelines are defined using method calls in the DSL block. Each pipeline consists of:

  • Constructor (optional): Series constructor like S, H, etc.
  • Operations (one or more): Transformations like reverse, skip, etc.

Routing System

Routes connect pipelines using the route method:

route from_pipeline, to: to_pipeline, on: :source, as: :key

Routing modes:

  1. Hash assignment (when as: provided):
  • Data is assigned to: to_pipeline.input[on][as] = from_pipeline.output
  • Used by operations accepting hash inputs (e.g., H constructor)
  • Default on is :sources when as is provided
  1. Setter method (when as: omitted):
  • Calls: to_pipeline.input.source = from_pipeline.output
  • Used by operations with explicit source setters
  • Default on is :source when as is omitted

Commit System

Composer uses two-phase initialization:

  1. Definition phase: Routes and pipelines are declared
  2. Commit phase: All connections are resolved and finalized

Key behaviors:

  • Routes can be defined in any order (order-independent)
  • Output access is blocked until commit! is called
  • Commit resolves all routes and sets up buffering
  • auto_commit: true (default) commits automatically after DSL block

DSL Context

The DSL uses method_missing to enable natural pipeline definition:

  • Named access: input, output, pipeline names return symbols
  • Pipeline creation: name arg1, arg2, ... creates pipeline
  • Operation symbols: Series operations return as symbols for parsing
  • Constructor symbols: Series constructors return as symbols for parsing

Examples:

Pipeline with constructor

composer = Composer.new(inputs: nil) do
  my_pipeline ({ S: [1, 2, 3] }), reverse, { skip: 1 }
  route my_pipeline, to: output
end
composer.output.i.to_a  # => [2, 1]
# Creates: S([1,2,3]) → reverse → skip(1)

Pipeline with operations only

composer = Composer.new(input: S(1, 2, 3)) do
  my_pipeline reverse, { skip: 1 }
  route input, to: my_pipeline
  route my_pipeline, to: output
end
composer.output.i.to_a  # => [2, 1]
# Applies: input → reverse → skip(1)

Hash routing

composer = Composer.new(inputs: [:a, :b], auto_commit: false) do
  step1 reverse
  step2 reverse
  hash_merge ({ H: {} })

  route a, to: step1
  route b, to: step2
  route step1, to: hash_merge, as: :x    # hash_merge.input[:sources][:x] = step1.output
  route step2, to: hash_merge, as: :y    # hash_merge.input[:sources][:y] = step2.output
  route hash_merge, to: output
end

composer.input(:a).proxy_source = S(1, 2, 3)
composer.input(:b).proxy_source = S(10, 20, 30)
composer.commit!
composer.output.i.to_a  # => [{x: 3, y: 30}, {x: 2, y: 20}, {x: 1, y: 10}]

Setter routing

composer = Composer.new(input: S(1, 2, 3)) do
  step1 reverse
  route input, to: step1            # step1.input.source = input.output
  route step1, to: output
end

composer.output.i.to_a  # => [3, 2, 1]

Basic pipeline

composer = Composer.new(input: S(1, 2, 3)) do
  step1 reverse
  route input, to: step1
  route step1, to: output
end
composer.output.i.to_a  # => [3, 2, 1]

Multiple inputs merging

composer = Composer.new(inputs: { a: S(1, 2), b: S(10, 20) }) do
  hash_merge ({ H: {} })
  route a, to: hash_merge, as: :x
  route b, to: hash_merge, as: :y
  route hash_merge, to: output
end
composer.output.i.to_a  # => [{x: 1, y: 10}, {x: 2, y: 20}]

Multiple outputs

composer = Composer.new(input: S(1, 2, 3)) do
  doubled ({ eval: ->(v) { v * 2 } })
  tripled ({ eval: ->(v) { v * 3 } })

  route input, to: doubled
  route input, to: tripled
  route doubled, to: output
end
composer.output.i.to_a  # => [2, 4, 6]

Complex routing

composer = Composer.new(inputs: [:a, :b], auto_commit: false) do
  step1 reverse
  step2 ({ skip: 1 })
  hash_merge ({ H: {} })

  route a, to: step1
  route b, to: step2
  route step1, to: hash_merge, as: :x
  route step2, to: hash_merge, as: :y
  route hash_merge, to: output
end

composer.input(:a).proxy_source = S(1, 2, 3)
composer.input(:b).proxy_source = S(10, 20, 30)
composer.commit!

composer.output.i.to_a  # => [{x: 3, y: 20}, {x: 2, y: 30}]

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input: nil, inputs: [:input], outputs: [:output], auto_commit: nil, &block) ⇒ Composer

Returns a new instance of Composer.



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/musa-dsl/series/series-composer.rb', line 288

def initialize(input: nil, inputs: [:input], outputs: [:output], auto_commit: nil, &block)
  auto_commit = true if auto_commit.nil?

  inputs = case inputs
           when Array
             inputs.collect { |_| [_, nil] }.to_h
           when nil
             {}
           when Hash
             inputs
           else
             raise ArgumentError, "inputs: expected a Hash with input names and source series { name: serie, ... } or an Array with names [name, ...] but received #{inputs}"
           end

  inputs[:input] = input if input

  @pipelines = {}

  def @pipelines.[]=(name, pipeline)
    pipeline_to_add = @commited ? pipeline.commit! : pipeline
    super(name, pipeline_to_add)
  end

  @dsl = DSLContext.new(@pipelines)
  @inputs = {}
  @outputs = {}

  inputs.keys&.each do |input|
    p = Musa::Series::Constructors.PROXY(inputs[input])

    @inputs[input] = @pipelines[input] = Pipeline.new(input, input: p, output: p.buffered, pipelines: @pipelines)

    @dsl.define_singleton_method(input) { input }
  end

  outputs&.each do |output|
    p = Musa::Series::Constructors.PROXY()
    @outputs[output] = @pipelines[output] = Pipeline.new(output, is_output: true, input: p, output: p, pipelines: @pipelines)

    @dsl.define_singleton_method(output) { output }
  end

  @dsl.with &block if block
  commit! if auto_commit
end

Class Method Details

.[]=(name, pipeline) ⇒ Object



306
307
308
309
# File 'lib/musa-dsl/series/series-composer.rb', line 306

def @pipelines.[]=(name, pipeline)
  pipeline_to_add = @commited ? pipeline.commit! : pipeline
  super(name, pipeline_to_add)
end

Instance Method Details

#commit!void

This method returns an undefined value.

Finalizes composer by resolving all routes and connections.

Commits the composer, resolving all route connections and setting up buffering. Must be called before accessing outputs. Cannot be called twice on same composer instance.

The commit process:

  1. Recursively commits all output pipelines
  2. Each pipeline commits its input routes
  3. Connects all sources through buffers
  4. Sets committed flag enabling output access

Examples:

Manual commit

composer = Composer.new(auto_commit: false) do
  # ... pipeline definitions ...
end
composer.input.proxy_source = S(1, 2, 3)
composer.commit!
result = composer.output.i.to_a

Raises:

  • (RuntimeError)

    if already committed



533
534
535
536
537
538
539
540
541
# File 'lib/musa-dsl/series/series-composer.rb', line 533

def commit!
  raise 'Already commited' if @commited

  @outputs.each_value do |pipeline|
    pipeline.commit!
  end

  @commited = true
end

#input(name = nil) ⇒ ProxySerie

Accesses named input proxy for dynamic source assignment.

Returns the proxy series for the specified input, allowing dynamic assignment of source series after composer creation. Used with auto_commit: false to set sources before manual commit.

Examples:

Set input source dynamically

composer = Composer.new(auto_commit: false) do
  step reverse
  route input, to: step
  route step, to: output
end

composer.input.proxy_source = S(1, 2, 3)
composer.commit!

Parameters:

  • name (Symbol, nil) (defaults to: nil)

    input name (defaults to :input)

Returns:

  • (ProxySerie)

    proxy series for source assignment



355
356
357
358
# File 'lib/musa-dsl/series/series-composer.rb', line 355

def input(name = nil)
  name ||= :input
  @inputs[name].input
end

#output(name = nil) ⇒ Serie

Accesses named output series for consumption.

Returns the output series for the specified output. Can only be called after commit! has been invoked. Raises error if composer is not committed.

Examples:

Access output

composer.output.i.to_a  # => [3, 2, 1]

Multiple outputs

composer.output(:doubled).i.to_a  # => [2, 4, 6]
composer.output(:tripled).i.to_a  # => [3, 6, 9]

Parameters:

  • name (Symbol, nil) (defaults to: nil)

    output name (defaults to :output)

Returns:

  • (Serie)

    output series

Raises:

  • (RuntimeError)

    if composer not yet committed



380
381
382
383
384
385
# File 'lib/musa-dsl/series/series-composer.rb', line 380

def output(name = nil)
  raise "Can't access output if the Composer is uncommited. Call '.commit' first." unless @commited

  name ||= :output
  @outputs[name].output
end

#pipeline(name, *elements) ⇒ void

This method returns an undefined value.

Defines named pipeline with transformation operations.

Creates a pipeline from constructor and/or operations. This method is typically called implicitly through the DSL's method_missing, but can be called directly for dynamic pipeline creation.

Examples:

Direct call (inside DSL block)

composer = Composer.new(inputs: nil) do
  pipeline(:my_step, [{ S: [1, 2, 3] }, :reverse])
  route my_step, to: output
end

composer.output.i.to_a  # => [3, 2, 1]

DSL equivalent (method_missing)

composer = Composer.new(inputs: nil) do
  my_step ({ S: [1, 2, 3] }), reverse
  route my_step, to: output
end

composer.output.i.to_a  # => [3, 2, 1]

Parameters:

  • name (Symbol)

    pipeline name

  • elements (Array)

    constructor and operations



484
485
486
# File 'lib/musa-dsl/series/series-composer.rb', line 484

def pipeline(name, *elements)
  @dsl.pipeline(name, elements)
end

#route(from, to:, on: nil, as: nil) ⇒ void

This method returns an undefined value.

Defines routing connection between pipelines.

Creates data flow route from source pipeline to destination pipeline. The routing behavior depends on whether as: parameter is provided.

With as: parameter (Hash routing):

  • Assigns to hash: to_pipeline.input[on][as] = from_pipeline.output
  • Default on is :sources
  • Used by operations accepting hash inputs (e.g., H constructor)

Without as: parameter (Setter routing):

  • Calls setter: to_pipeline.input.source = from_pipeline.output
  • Default on is :source
  • Used by operations with explicit source setter methods

Examples:

Hash routing (inside DSL block)

composer = Composer.new(inputs: [:a, :b], auto_commit: false) do
  step1 reverse
  step2 reverse
  hash_merge ({ H: {} })

  route a, to: step1
  route b, to: step2
  route step1, to: hash_merge, as: :x    # hash_merge.input[:sources][:x] = step1
  route step2, to: hash_merge, as: :y    # hash_merge.input[:sources][:y] = step2
  route hash_merge, to: output
end

composer.input(:a).proxy_source = S(1, 2)
composer.input(:b).proxy_source = S(10, 20)
composer.commit!
composer.output.i.to_a  # => [{x: 2, y: 20}, {x: 1, y: 10}]

Setter routing (inside DSL block)

composer = Composer.new(input: S(1, 2, 3)) do
  step reverse
  route input, to: step             # step.input.source = input
  route step, to: output
end

composer.output.i.to_a  # => [3, 2, 1]

Custom on parameter (inside DSL block)

composer = Composer.new(input: S(1, 2, 3), auto_commit: false) do
  step reverse
  hash_merge ({ H: {} })
  route input, to: step
  route step, to: hash_merge, on: :sources, as: :x
  route hash_merge, to: output
end

composer.commit!
composer.output.i.to_a  # => [{x: 3}, {x: 2}, {x: 1}]

Parameters:

  • from (Symbol)

    source pipeline name

  • to (Symbol)

    destination pipeline name

  • on (Symbol, nil) (defaults to: nil)

    attribute name (:source or :sources)

  • as (Symbol, nil) (defaults to: nil)

    hash key for assignment

Raises:

  • (ArgumentError)

    if pipeline names not found

  • (ArgumentError)

    if route already exists



452
453
454
# File 'lib/musa-dsl/series/series-composer.rb', line 452

def route(from, to:, on: nil, as: nil)
  @dsl.route(from, to: to, on: on, as: as)
end

#update { ... } ⇒ void

This method returns an undefined value.

Updates composer with additional DSL block.

Allows dynamic modification of composer after creation by executing additional DSL block in the composer's DSL context. Useful for progressive pipeline construction.

Examples:

Add routes dynamically

composer.update do
  route step3, to: output
end

Yields:

  • DSL block with additional pipeline definitions



504
505
506
# File 'lib/musa-dsl/series/series-composer.rb', line 504

def update(&block)
  @dsl.with &block
end