Class: Igniter::Extensions::Contracts::Dataflow::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/extensions/contracts/dataflow/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(environment:, compiled_graph:, source:, key:, window: nil, context: [], aggregate_operators: {}) ⇒ Session

Returns a new instance of Session.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 10

def initialize(environment:, compiled_graph:, source:, key:, window: nil, context: [],
               aggregate_operators: {})
  @environment = environment
  @compiled_graph = compiled_graph
  @source = source.to_sym
  @key_name = key.to_sym
  @window = window
  @context = Array(context).map(&:to_sym).freeze
  @aggregate_states = aggregate_operators.transform_values { |operator| AggregateState.new(operator) }
  @item_sessions = {}
  @snapshots = {}
  @cached_items = {}
  @last_inputs = nil
  @last_result = nil
end

Instance Attribute Details

#compiled_graphObject (readonly)

Returns the value of attribute compiled_graph.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def compiled_graph
  @compiled_graph
end

#contextObject (readonly)

Returns the value of attribute context.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def context
  @context
end

#environmentObject (readonly)

Returns the value of attribute environment.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def environment
  @environment
end

#key_nameObject (readonly)

Returns the value of attribute key_name.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def key_name
  @key_name
end

#last_resultObject (readonly)

Returns the value of attribute last_result.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def last_result
  @last_result
end

#sourceObject (readonly)

Returns the value of attribute source.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def source
  @source
end

#windowObject (readonly)

Returns the value of attribute window.



8
9
10
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8

def window
  @window
end

Instance Method Details

#collection_diffObject



54
55
56
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 54

def collection_diff
  @last_result&.diff
end

#feed_diff(add: [], remove: [], update: [], inputs: {}) ⇒ Object



46
47
48
49
50
51
52
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 46

def feed_diff(add: [], remove: [], update: [], inputs: {})
  current_inputs = @last_inputs.to_h
  current_items = Array(current_inputs.fetch(source, []))
  merged_items = apply_diff(current_items, add: add, remove: remove, update: update)

  run(inputs: current_inputs.merge(inputs).merge(source => merged_items))
end

#run(inputs:) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 26

def run(inputs:)
  normalized_inputs = Igniter::Contracts::NamedValues.new(inputs)
  items = filtered_items(normalized_inputs.fetch(source))
  diff = compute_diff(items)
  processed_items = build_processed_items(items, normalized_inputs, diff)
  delete_removed_sessions(diff.removed)

  collection_result = CollectionResult.new(items: processed_items, diff: diff)
  update_aggregate_states(collection_result)

  @snapshots = snapshot_items(items)
  @cached_items = processed_items.dup
  @last_inputs = normalized_inputs
  @last_result = Result.new(
    processed: collection_result,
    aggregates: @aggregate_states.transform_values(&:value),
    inputs: normalized_inputs
  )
end