Module: Igniter::Extensions::Contracts::DataflowPack

Defined in:
lib/igniter/extensions/contracts/dataflow_pack.rb

Class Method Summary collapse

Class Method Details

.ensure_installed!(profile) ⇒ Object

Raises:

  • (Igniter::Contracts::Error)


57
58
59
60
61
62
# File 'lib/igniter/extensions/contracts/dataflow_pack.rb', line 57

def ensure_installed!(profile)
  return if profile.pack_names.include?(:extensions_dataflow)

  raise Igniter::Contracts::Error,
        "DataflowPack is not installed in profile #{profile.fingerprint}; add Igniter::Extensions::Contracts::DataflowPack"
end

.install_into(kernel) ⇒ Object



27
28
29
# File 'lib/igniter/extensions/contracts/dataflow_pack.rb', line 27

def install_into(kernel)
  kernel
end

.manifestObject



20
21
22
23
24
25
# File 'lib/igniter/extensions/contracts/dataflow_pack.rb', line 20

def manifest
  Igniter::Contracts::PackManifest.new(
    name: :extensions_dataflow,
    metadata: { category: :orchestration }
  )
end

.session(environment, source:, key:, window: nil, context: [], compiled_graph: nil, &block) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/igniter/extensions/contracts/dataflow_pack.rb', line 31

def session(environment, source:, key:, window: nil, context: [], compiled_graph: nil, &block)
  ensure_installed!(environment.profile)
  Igniter::Extensions::Contracts::IncrementalPack.ensure_installed!(environment.profile)

  item_graph, aggregate_operators =
    if compiled_graph
      raise ArgumentError, "DataflowPack.session accepts either compiled_graph: or a block, not both" if block

      [compiled_graph, {}]
    else
      builder = Dataflow::Builder.new(source: source, key: key, window: window, context: context)
      builder.instance_eval(&block) if block
      builder.build!(environment)
    end

  Dataflow::Session.new(
    environment: environment,
    compiled_graph: item_graph,
    source: source,
    key: key,
    window: window,
    context: context,
    aggregate_operators: aggregate_operators
  )
end