Class: Pangea::Magma::Chain
Overview
Typed DAG of Workspaces wired by ChainEdges. Mirrors ‘magma_pangea::chain::WorkspaceChain` (per theory/MAGMA.md §II.9 and theory/PANGEA-MAGMA-ORCHESTRATION.md §III.2).
Authors:
chain = Pangea::Magma::Chain.build do |c|
c.workspace vpc # Pangea::Magma::Workspace instance
c.workspace cluster
c.workspace fluxcd
c.edge from: vpc, output: :vpc_id,
to: cluster, input: :vpc_id
c.edge from: cluster, output: :kubeconfig,
to: fluxcd, input: :kubeconfig
end
chain.reconcile_all # → AggregateReport Hash (via magma flow run)
Defined Under Namespace
Instance Attribute Summary collapse
-
#edges ⇒ Object
readonly
Returns the value of attribute edges.
-
#optimization ⇒ Object
readonly
Returns the value of attribute optimization.
-
#output_propagation ⇒ Object
readonly
Returns the value of attribute output_propagation.
-
#workspaces ⇒ Object
readonly
Returns the value of attribute workspaces.
Class Method Summary collapse
-
.build(optimization: nil) {|b| ... } ⇒ Object
Build a typed Chain.
-
.compose(workspaces, output_propagation: :in_memory, optimization: nil) ⇒ Object
Compose: produce a Chain from a list of workspaces with no cross-workspace edges (used for “deploy these in sequence without typed-value flow” scenarios).
Instance Method Summary collapse
- #edge_count ⇒ Object
-
#initialize(workspaces:, edges:, output_propagation: :in_memory, optimization: nil) ⇒ Chain
constructor
A new instance of Chain.
- #node_count ⇒ Object
-
#reconcile_all ⇒ Object
Reconcile every workspace in topological order via the canonical ‘magma flow run` engine (magma-flow crate).
-
#reconcile_subset(subset_names, stub_upstream_outputs: {}) ⇒ Object
Subset reconciliation — run a slice of the chain with the rest mocked out (downstream-from-stub gets typed values from the ‘stub_upstream_outputs` map rather than running upstream Workspaces).
-
#to_flow_hash ⇒ Object
The typed flow JSON the chain serializes to.
- #to_h ⇒ Object
- #to_json(*args) ⇒ Object
-
#topo_order ⇒ Object
Topological order across the declared edges.
-
#with_optimization(opt) ⇒ Object
Return a new Chain with the given optimization hints.
Constructor Details
#initialize(workspaces:, edges:, output_propagation: :in_memory, optimization: nil) ⇒ Chain
Returns a new instance of Chain.
91 92 93 94 95 96 97 98 |
# File 'lib/pangea/magma/chain.rb', line 91 def initialize(workspaces:, edges:, output_propagation: :in_memory, optimization: nil) @workspaces = workspaces.freeze @edges = edges.freeze @output_propagation = output_propagation @optimization = optimization validate! end |
Instance Attribute Details
#edges ⇒ Object (readonly)
Returns the value of attribute edges.
89 90 91 |
# File 'lib/pangea/magma/chain.rb', line 89 def edges @edges end |
#optimization ⇒ Object (readonly)
Returns the value of attribute optimization.
89 90 91 |
# File 'lib/pangea/magma/chain.rb', line 89 def optimization @optimization end |
#output_propagation ⇒ Object (readonly)
Returns the value of attribute output_propagation.
89 90 91 |
# File 'lib/pangea/magma/chain.rb', line 89 def output_propagation @output_propagation end |
#workspaces ⇒ Object (readonly)
Returns the value of attribute workspaces.
89 90 91 |
# File 'lib/pangea/magma/chain.rb', line 89 def workspaces @workspaces end |
Class Method Details
.build(optimization: nil) {|b| ... } ⇒ Object
Build a typed Chain. Yields a Builder; returns a frozen Chain.
70 71 72 73 74 75 |
# File 'lib/pangea/magma/chain.rb', line 70 def build(optimization: nil) b = Builder.new yield b new(workspaces: b.workspaces, edges: b.edges, optimization: optimization) end |
.compose(workspaces, output_propagation: :in_memory, optimization: nil) ⇒ Object
Compose: produce a Chain from a list of workspaces with no cross-workspace edges (used for “deploy these in sequence without typed-value flow” scenarios).
80 81 82 83 84 85 86 |
# File 'lib/pangea/magma/chain.rb', line 80 def compose(workspaces, output_propagation: :in_memory, optimization: nil) b = Builder.new workspaces.each { |w| b.workspace(w) } new(workspaces: b.workspaces, edges: b.edges, output_propagation: output_propagation, optimization: optimization) end |
Instance Method Details
#edge_count ⇒ Object
111 112 113 |
# File 'lib/pangea/magma/chain.rb', line 111 def edge_count @edges.size end |
#node_count ⇒ Object
107 108 109 |
# File 'lib/pangea/magma/chain.rb', line 107 def node_count @workspaces.size end |
#reconcile_all ⇒ Object
Reconcile every workspace in topological order via the canonical ‘magma flow run` engine (magma-flow crate). Returns an AggregateReport-shaped Hash. Delegates to Pangea::Magma.flow (which delegates to Runner.invoke) so this method stays one line of intent.
153 154 155 |
# File 'lib/pangea/magma/chain.rb', line 153 def reconcile_all Pangea::Magma.flow(to_flow_hash) end |
#reconcile_subset(subset_names, stub_upstream_outputs: {}) ⇒ Object
Subset reconciliation — run a slice of the chain with the rest mocked out (downstream-from-stub gets typed values from the ‘stub_upstream_outputs` map rather than running upstream Workspaces). Mirrors `WorkspaceChain::reconcile_subset` semantics. For M0.1 this is a thin wrapper that drives `magma flow` with a reduced workspace set; the typed-value injection lands once magma exposes a `–stubs <json>` flag.
164 165 166 167 168 169 |
# File 'lib/pangea/magma/chain.rb', line 164 def reconcile_subset(subset_names, stub_upstream_outputs: {}) raise NotImplementedError, "Chain#reconcile_subset awaits magma's `--stubs` flag (M0.1.x). " \ "subset_names=#{subset_names.inspect}, " \ "stub_upstream_outputs=#{stub_upstream_outputs.inspect}" end |
#to_flow_hash ⇒ Object
The typed flow JSON the chain serializes to. Public so operators can audit it before reconciliation and rspec can snapshot-match it.
139 140 141 142 143 144 145 146 |
# File 'lib/pangea/magma/chain.rb', line 139 def to_flow_hash h = { workspaces: @workspaces.values.map { |w| { name: w.name.to_s, dir: w.workspace_dir } }, edges: @edges.map(&:to_h), } h[:optimization] = @optimization.to_h if @optimization h end |
#to_h ⇒ Object
171 172 173 174 175 176 177 178 179 |
# File 'lib/pangea/magma/chain.rb', line 171 def to_h h = { output_propagation: @output_propagation, workspaces: @workspaces.transform_values(&:to_h), edges: @edges.map(&:to_h), } h[:optimization] = @optimization.to_h if @optimization h end |
#to_json(*args) ⇒ Object
181 182 183 |
# File 'lib/pangea/magma/chain.rb', line 181 def to_json(*args) to_h.to_json(*args) end |
#topo_order ⇒ Object
Topological order across the declared edges. Raises if a cycle is detected.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/pangea/magma/chain.rb', line 117 def topo_order indegree = @workspaces.each_key.to_h { |n| [n, 0] } @edges.each { |e| indegree[e.to] += 1 } queue = indegree.select { |_, d| d.zero? }.keys order = [] until queue.empty? n = queue.shift order << n @edges.each do |e| next unless e.from == n indegree[e.to] -= 1 queue << e.to if indegree[e.to].zero? end end raise "cycle in chain: #{indegree.keys.inspect}" if order.size < @workspaces.size order end |
#with_optimization(opt) ⇒ Object
Return a new Chain with the given optimization hints. Used by Orchestrator to attach Optimization without rebuilding the chain.
102 103 104 105 |
# File 'lib/pangea/magma/chain.rb', line 102 def with_optimization(opt) Chain.new(workspaces: @workspaces, edges: @edges, output_propagation: @output_propagation, optimization: opt) end |