Class: Igniter::Extensions::Contracts::Dataflow::Session
- Inherits:
-
Object
- Object
- Igniter::Extensions::Contracts::Dataflow::Session
- Defined in:
- lib/igniter/extensions/contracts/dataflow/session.rb
Instance Attribute Summary collapse
-
#compiled_graph ⇒ Object
readonly
Returns the value of attribute compiled_graph.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#environment ⇒ Object
readonly
Returns the value of attribute environment.
-
#key_name ⇒ Object
readonly
Returns the value of attribute key_name.
-
#last_result ⇒ Object
readonly
Returns the value of attribute last_result.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
-
#window ⇒ Object
readonly
Returns the value of attribute window.
Instance Method Summary collapse
- #collection_diff ⇒ Object
- #feed_diff(add: [], remove: [], update: [], inputs: {}) ⇒ Object
-
#initialize(environment:, compiled_graph:, source:, key:, window: nil, context: [], aggregate_operators: {}) ⇒ Session
constructor
A new instance of Session.
- #run(inputs:) ⇒ Object
Constructor Details
#initialize(environment:, compiled_graph:, source:, key:, window: nil, context: [], aggregate_operators: {}) ⇒ Session
Returns a new instance of Session.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 10 def initialize(environment:, compiled_graph:, source:, key:, window: nil, context: [], aggregate_operators: {}) @environment = environment @compiled_graph = compiled_graph @source = source.to_sym @key_name = key.to_sym @window = window @context = Array(context).map(&:to_sym).freeze @aggregate_states = aggregate_operators.transform_values { |operator| AggregateState.new(operator) } @item_sessions = {} @snapshots = {} @cached_items = {} @last_inputs = nil @last_result = nil end |
Instance Attribute Details
#compiled_graph ⇒ Object (readonly)
Returns the value of attribute compiled_graph.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def compiled_graph @compiled_graph end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def context @context end |
#environment ⇒ Object (readonly)
Returns the value of attribute environment.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def environment @environment end |
#key_name ⇒ Object (readonly)
Returns the value of attribute key_name.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def key_name @key_name end |
#last_result ⇒ Object (readonly)
Returns the value of attribute last_result.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def last_result @last_result end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def source @source end |
#window ⇒ Object (readonly)
Returns the value of attribute window.
8 9 10 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 8 def window @window end |
Instance Method Details
#collection_diff ⇒ Object
54 55 56 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 54 def collection_diff @last_result&.diff end |
#feed_diff(add: [], remove: [], update: [], inputs: {}) ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 46 def feed_diff(add: [], remove: [], update: [], inputs: {}) current_inputs = @last_inputs.to_h current_items = Array(current_inputs.fetch(source, [])) merged_items = apply_diff(current_items, add: add, remove: remove, update: update) run(inputs: current_inputs.merge(inputs).merge(source => merged_items)) end |
#run(inputs:) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/igniter/extensions/contracts/dataflow/session.rb', line 26 def run(inputs:) normalized_inputs = Igniter::Contracts::NamedValues.new(inputs) items = filtered_items(normalized_inputs.fetch(source)) diff = compute_diff(items) processed_items = build_processed_items(items, normalized_inputs, diff) delete_removed_sessions(diff.removed) collection_result = CollectionResult.new(items: processed_items, diff: diff) update_aggregate_states(collection_result) @snapshots = snapshot_items(items) @cached_items = processed_items.dup @last_inputs = normalized_inputs @last_result = Result.new( processed: collection_result, aggregates: @aggregate_states.transform_values(&:value), inputs: normalized_inputs ) end |