Class: Pangea::Magma::Chain

Inherits:
Object show all
Defined in:
lib/pangea/magma/chain.rb

Overview

Typed DAG of Workspaces wired by ChainEdges. Mirrors ‘magma_pangea::chain::WorkspaceChain` (per theory/MAGMA.md §II.9 and theory/PANGEA-MAGMA-ORCHESTRATION.md §III.2).

Authors:

chain = Pangea::Magma::Chain.build do |c|
  c.workspace vpc       # Pangea::Magma::Workspace instance
  c.workspace cluster
  c.workspace fluxcd
  c.edge from: vpc, output: :vpc_id,
         to:   cluster, input: :vpc_id
  c.edge from: cluster, output: :kubeconfig,
         to:   fluxcd, input: :kubeconfig
end

chain.reconcile_all   # → AggregateReport Hash (via magma flow run)

Defined Under Namespace

Classes: Builder, ChainEdge

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workspaces:, edges:, output_propagation: :in_memory, optimization: nil) ⇒ Chain

Returns a new instance of Chain.



91
92
93
94
95
96
97
98
# File 'lib/pangea/magma/chain.rb', line 91

def initialize(workspaces:, edges:, output_propagation: :in_memory,
               optimization: nil)
  @workspaces         = workspaces.freeze
  @edges              = edges.freeze
  @output_propagation = output_propagation
  @optimization       = optimization
  validate!
end

Instance Attribute Details

#edgesObject (readonly)

Returns the value of attribute edges.



89
90
91
# File 'lib/pangea/magma/chain.rb', line 89

def edges
  @edges
end

#optimizationObject (readonly)

Returns the value of attribute optimization.



89
90
91
# File 'lib/pangea/magma/chain.rb', line 89

def optimization
  @optimization
end

#output_propagationObject (readonly)

Returns the value of attribute output_propagation.



89
90
91
# File 'lib/pangea/magma/chain.rb', line 89

def output_propagation
  @output_propagation
end

#workspacesObject (readonly)

Returns the value of attribute workspaces.



89
90
91
# File 'lib/pangea/magma/chain.rb', line 89

def workspaces
  @workspaces
end

Class Method Details

.build(optimization: nil) {|b| ... } ⇒ Object

Build a typed Chain. Yields a Builder; returns a frozen Chain.

Yields:

  • (b)


70
71
72
73
74
75
# File 'lib/pangea/magma/chain.rb', line 70

def build(optimization: nil)
  b = Builder.new
  yield b
  new(workspaces: b.workspaces, edges: b.edges,
      optimization: optimization)
end

.compose(workspaces, output_propagation: :in_memory, optimization: nil) ⇒ Object

Compose: produce a Chain from a list of workspaces with no cross-workspace edges (used for “deploy these in sequence without typed-value flow” scenarios).



80
81
82
83
84
85
86
# File 'lib/pangea/magma/chain.rb', line 80

def compose(workspaces, output_propagation: :in_memory, optimization: nil)
  b = Builder.new
  workspaces.each { |w| b.workspace(w) }
  new(workspaces: b.workspaces, edges: b.edges,
      output_propagation: output_propagation,
      optimization: optimization)
end

Instance Method Details

#edge_countObject



111
112
113
# File 'lib/pangea/magma/chain.rb', line 111

def edge_count
  @edges.size
end

#node_countObject



107
108
109
# File 'lib/pangea/magma/chain.rb', line 107

def node_count
  @workspaces.size
end

#reconcile_allObject

Reconcile every workspace in topological order via the canonical ‘magma flow run` engine (magma-flow crate). Returns an AggregateReport-shaped Hash. Delegates to Pangea::Magma.flow (which delegates to Runner.invoke) so this method stays one line of intent.



153
154
155
# File 'lib/pangea/magma/chain.rb', line 153

def reconcile_all
  Pangea::Magma.flow(to_flow_hash)
end

#reconcile_subset(subset_names, stub_upstream_outputs: {}) ⇒ Object

Subset reconciliation — run a slice of the chain with the rest mocked out (downstream-from-stub gets typed values from the ‘stub_upstream_outputs` map rather than running upstream Workspaces). Mirrors `WorkspaceChain::reconcile_subset` semantics. For M0.1 this is a thin wrapper that drives `magma flow` with a reduced workspace set; the typed-value injection lands once magma exposes a `–stubs <json>` flag.

Raises:

  • (NotImplementedError)


164
165
166
167
168
169
# File 'lib/pangea/magma/chain.rb', line 164

def reconcile_subset(subset_names, stub_upstream_outputs: {})
  raise NotImplementedError,
        "Chain#reconcile_subset awaits magma's `--stubs` flag (M0.1.x). " \
        "subset_names=#{subset_names.inspect}, " \
        "stub_upstream_outputs=#{stub_upstream_outputs.inspect}"
end

#to_flow_hashObject

The typed flow JSON the chain serializes to. Public so operators can audit it before reconciliation and rspec can snapshot-match it.



139
140
141
142
143
144
145
146
# File 'lib/pangea/magma/chain.rb', line 139

def to_flow_hash
  h = {
    workspaces: @workspaces.values.map { |w| { name: w.name.to_s, dir: w.workspace_dir } },
    edges:      @edges.map(&:to_h),
  }
  h[:optimization] = @optimization.to_h if @optimization
  h
end

#to_hObject



171
172
173
174
175
176
177
178
179
# File 'lib/pangea/magma/chain.rb', line 171

def to_h
  h = {
    output_propagation: @output_propagation,
    workspaces:         @workspaces.transform_values(&:to_h),
    edges:              @edges.map(&:to_h),
  }
  h[:optimization] = @optimization.to_h if @optimization
  h
end

#to_json(*args) ⇒ Object



181
182
183
# File 'lib/pangea/magma/chain.rb', line 181

def to_json(*args)
  to_h.to_json(*args)
end

#topo_orderObject

Topological order across the declared edges. Raises if a cycle is detected.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/pangea/magma/chain.rb', line 117

def topo_order
  indegree = @workspaces.each_key.to_h { |n| [n, 0] }
  @edges.each { |e| indegree[e.to] += 1 }
  queue = indegree.select { |_, d| d.zero? }.keys
  order = []
  until queue.empty?
    n = queue.shift
    order << n
    @edges.each do |e|
      next unless e.from == n
      indegree[e.to] -= 1
      queue << e.to if indegree[e.to].zero?
    end
  end
  raise "cycle in chain: #{indegree.keys.inspect}" if order.size < @workspaces.size

  order
end

#with_optimization(opt) ⇒ Object

Return a new Chain with the given optimization hints. Used by Orchestrator to attach Optimization without rebuilding the chain.



102
103
104
105
# File 'lib/pangea/magma/chain.rb', line 102

def with_optimization(opt)
  Chain.new(workspaces: @workspaces, edges: @edges,
            output_propagation: @output_propagation, optimization: opt)
end