Class: CMDx::Chain

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/cmdx/chain.rb

Overview

Ordered collection of Results produced by a top-level task and any nested tasks it triggers. A Chain is stored per-fiber so concurrent workflows (see Pipeline parallel strategy) each get their own. The root Runtime clears the chain on teardown.

Constant Summary collapse

STORAGE_KEY =

Fiber-local storage key used by current/current=/clear.

:cmdx_chain

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(xid = nil) ⇒ Chain

Returns a new instance of Chain.

Parameters:

  • xid (String, nil) (defaults to: nil)

    external correlation id (e.g. Rails ‘request_id`) shared across every Result in this chain. Resolved once by Runtime from Settings#correlation_id (a callable) when the root chain is created.



43
44
45
46
47
48
49
# File 'lib/cmdx/chain.rb', line 43

def initialize(xid = nil)
  @xid     = xid
  @id      = SecureRandom.uuid_v7
  @mutex   = Mutex.new
  @results = []
  @root    = nil
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



37
38
39
# File 'lib/cmdx/chain.rb', line 37

def id
  @id
end

#xidObject (readonly)

Returns the value of attribute xid.



37
38
39
# File 'lib/cmdx/chain.rb', line 37

def xid
  @xid
end

Class Method Details

.clearnil

Clears the fiber-local chain reference.

Returns:

  • (nil)


31
32
33
# File 'lib/cmdx/chain.rb', line 31

def clear
  Fiber[STORAGE_KEY] = nil
end

.currentChain?

Returns the chain active on the current fiber, or nil outside execution.

Returns:

  • (Chain, nil)

    the chain active on the current fiber, or nil outside execution



18
19
20
# File 'lib/cmdx/chain.rb', line 18

def current
  Fiber[STORAGE_KEY]
end

.current=(chain) ⇒ Chain?

Installs ‘chain` as the active chain on the current fiber.

Parameters:

Returns:



25
26
27
# File 'lib/cmdx/chain.rb', line 25

def current=(chain)
  Fiber[STORAGE_KEY] = chain
end

Instance Method Details

#each {|Result| ... } ⇒ Enumerator, Chain

Yields:

  • (Result)

    each result in insertion order

Returns:



125
126
127
# File 'lib/cmdx/chain.rb', line 125

def each(&)
  results.each(&)
end

#empty?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/cmdx/chain.rb', line 114

def empty?
  @mutex.synchronize { @results.empty? }
end

#freezeChain

Freezes the chain and its results. Called by Runtime teardown.

Returns:



132
133
134
135
# File 'lib/cmdx/chain.rb', line 132

def freeze
  @mutex.synchronize { @results.freeze }
  super
end

#index(result) ⇒ Integer?

Returns zero-based position of ‘result`, or nil when absent.

Parameters:

Returns:

  • (Integer, nil)

    zero-based position of ‘result`, or nil when absent



87
88
89
# File 'lib/cmdx/chain.rb', line 87

def index(result)
  @mutex.synchronize { @results.index(result) }
end

#lastResult?

Returns the most recently appended result.

Returns:

  • (Result, nil)

    the most recently appended result



92
93
94
# File 'lib/cmdx/chain.rb', line 92

def last
  @mutex.synchronize { @results.last }
end

#push(result) ⇒ Chain Also known as: <<

Appends ‘result` to the chain. Thread-safe to support parallel pipelines.

Parameters:

Returns:

  • (Chain)

    self for chaining



64
65
66
67
68
69
70
# File 'lib/cmdx/chain.rb', line 64

def push(result)
  @mutex.synchronize do
    @results << result
    @root = result if @root.nil? && result.respond_to?(:root?) && result.root?
  end
  self
end

#resultsArray<Result>

Returns snapshot of the results stored in this chain. While the chain is mutable a dup is returned so callers cannot mutate internal state and see consistent ordering despite parallel pushes; after #freeze the actual frozen array is returned to preserve ‘Array#frozen?` semantics.

Returns:

  • (Array<Result>)

    snapshot of the results stored in this chain. While the chain is mutable a dup is returned so callers cannot mutate internal state and see consistent ordering despite parallel pushes; after #freeze the actual frozen array is returned to preserve ‘Array#frozen?` semantics.



56
57
58
# File 'lib/cmdx/chain.rb', line 56

def results
  @mutex.synchronize { @results.frozen? ? @results : @results.dup }
end

#rootResult?

Returns the root result, or nil when absent.

Returns:

  • (Result, nil)

    the root result, or nil when absent



97
98
99
100
101
# File 'lib/cmdx/chain.rb', line 97

def root
  @mutex.synchronize do
    @root || @results.find { |r| r.respond_to?(:root?) && r.root? }
  end
end

#sizeInteger

Returns:

  • (Integer)


119
120
121
# File 'lib/cmdx/chain.rb', line 119

def size
  @mutex.synchronize { @results.size }
end

#stateString?

Returns the state of the root result, or nil when absent.

Returns:

  • (String, nil)

    the state of the root result, or nil when absent



104
105
106
# File 'lib/cmdx/chain.rb', line 104

def state
  root&.state
end

#statusString?

Returns the status of the root result, or nil when absent.

Returns:

  • (String, nil)

    the status of the root result, or nil when absent



109
110
111
# File 'lib/cmdx/chain.rb', line 109

def status
  root&.status
end

#unshift(result) ⇒ Chain

Prepends ‘result` to the chain. Thread-safe to support parallel pipelines.

Parameters:

Returns:

  • (Chain)

    self for chaining



77
78
79
80
81
82
83
# File 'lib/cmdx/chain.rb', line 77

def unshift(result)
  @mutex.synchronize do
    @results.unshift(result)
    @root = result if result.respond_to?(:root?) && result.root?
  end
  self
end