Module: Igniter::Extensions::Contracts::Dataflow::AggregateOperators
- Defined in:
- lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb
Defined Under Namespace
Classes: Operator
Class Method Summary collapse
- .avg(projection:) ⇒ Object
- .count(filter: nil) ⇒ Object
- .custom(initial:, add:, remove:) ⇒ Object
- .duplicate(value) ⇒ Object
- .group_count(projection:) ⇒ Object
- .max(projection:) ⇒ Object
- .min(projection:) ⇒ Object
- .project(item, projection) ⇒ Object
- .recomputed_projection(projection, &finalizer) ⇒ Object
- .sum(projection:) ⇒ Object
Class Method Details
.avg(projection:) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 41 def avg(projection:) Operator.new( initial_fn: -> { { sum: 0.0, count: 0 } }, project: ->(item) { project(item, projection).to_f }, add: ->(acc, value) { { sum: acc.fetch(:sum) + value, count: acc.fetch(:count) + 1 } }, remove: ->(acc, value) { { sum: acc.fetch(:sum) - value, count: acc.fetch(:count) - 1 } }, finalize: lambda { |acc, _| acc.fetch(:count).zero? ? 0.0 : acc.fetch(:sum) / acc.fetch(:count) }, recompute: false ) end |
.count(filter: nil) ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 19 def count(filter: nil) Operator.new( initial_fn: -> { 0 }, project: ->(item) { filter.nil? || filter.call(item) ? 1 : 0 }, add: ->(acc, value) { acc + value }, remove: ->(acc, value) { acc - value }, finalize: ->(acc, _) { acc }, recompute: false ) end |
.custom(initial:, add:, remove:) ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 76 def custom(initial:, add:, remove:) Operator.new( initial_fn: -> { duplicate(initial) }, project: ->(item) { item }, add: ->(acc, item) { add.call(acc, item) }, remove: ->(acc, item) { remove.call(acc, item) }, finalize: ->(acc, _) { acc }, recompute: false ) end |
.duplicate(value) ⇒ Object
110 111 112 113 114 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 110 def duplicate(value) value.frozen? ? value : value.dup rescue TypeError value end |
.group_count(projection:) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 62 def group_count(projection:) Operator.new( initial_fn: -> { {} }, project: ->(item) { project(item, projection) }, add: ->(acc, group_key) { acc.merge(group_key => (acc[group_key] || 0) + 1) }, remove: lambda { |acc, group_key| count = (acc[group_key] || 1) - 1 count <= 0 ? acc.reject { |key, _| key == group_key } : acc.merge(group_key => count) }, finalize: ->(acc, _) { acc }, recompute: false ) end |
.max(projection:) ⇒ Object
58 59 60 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 58 def max(projection:) recomputed_projection(projection, &:max) end |
.min(projection:) ⇒ Object
54 55 56 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 54 def min(projection:) recomputed_projection(projection, &:min) end |
.project(item, projection) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 87 def project(item, projection) return item if projection.nil? return projection.call(item) if projection.respond_to?(:call) key = projection.to_sym return item.output(key) if item.outputs.key?(key) return item.input(key) if item.inputs.key?(key) raise KeyError, "aggregate projection #{projection.inspect} not present on dataflow item #{item.key.inspect}" end |
.recomputed_projection(projection, &finalizer) ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 99 def recomputed_projection(projection, &finalizer) Operator.new( initial_fn: -> { nil }, project: ->(item) { project(item, projection) }, add: nil, remove: nil, finalize: ->(_acc, contributions) { finalizer.call(contributions.values) }, recompute: true ) end |
.sum(projection:) ⇒ Object
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 30 def sum(projection:) Operator.new( initial_fn: -> { 0 }, project: ->(item) { project(item, projection).to_f }, add: ->(acc, value) { acc + value }, remove: ->(acc, value) { acc - value }, finalize: ->(acc, _) { acc }, recompute: false ) end |