Module: Igniter::Extensions::Contracts::Dataflow::AggregateOperators

Defined in:
lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb

Defined Under Namespace

Classes: Operator

Class Method Summary collapse

Class Method Details

.avg(projection:) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 41

def avg(projection:)
  Operator.new(
    initial_fn: -> { { sum: 0.0, count: 0 } },
    project: ->(item) { project(item, projection).to_f },
    add: ->(acc, value) { { sum: acc.fetch(:sum) + value, count: acc.fetch(:count) + 1 } },
    remove: ->(acc, value) { { sum: acc.fetch(:sum) - value, count: acc.fetch(:count) - 1 } },
    finalize: lambda { |acc, _|
      acc.fetch(:count).zero? ? 0.0 : acc.fetch(:sum) / acc.fetch(:count)
    },
    recompute: false
  )
end

.count(filter: nil) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 19

def count(filter: nil)
  Operator.new(
    initial_fn: -> { 0 },
    project: ->(item) { filter.nil? || filter.call(item) ? 1 : 0 },
    add: ->(acc, value) { acc + value },
    remove: ->(acc, value) { acc - value },
    finalize: ->(acc, _) { acc },
    recompute: false
  )
end

.custom(initial:, add:, remove:) ⇒ Object



76
77
78
79
80
81
82
83
84
85
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 76

def custom(initial:, add:, remove:)
  Operator.new(
    initial_fn: -> { duplicate(initial) },
    project: ->(item) { item },
    add: ->(acc, item) { add.call(acc, item) },
    remove: ->(acc, item) { remove.call(acc, item) },
    finalize: ->(acc, _) { acc },
    recompute: false
  )
end

.duplicate(value) ⇒ Object



110
111
112
113
114
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 110

def duplicate(value)
  value.frozen? ? value : value.dup
rescue TypeError
  value
end

.group_count(projection:) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 62

def group_count(projection:)
  Operator.new(
    initial_fn: -> { {} },
    project: ->(item) { project(item, projection) },
    add: ->(acc, group_key) { acc.merge(group_key => (acc[group_key] || 0) + 1) },
    remove: lambda { |acc, group_key|
      count = (acc[group_key] || 1) - 1
      count <= 0 ? acc.reject { |key, _| key == group_key } : acc.merge(group_key => count)
    },
    finalize: ->(acc, _) { acc },
    recompute: false
  )
end

.max(projection:) ⇒ Object



58
59
60
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 58

def max(projection:)
  recomputed_projection(projection, &:max)
end

.min(projection:) ⇒ Object



54
55
56
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 54

def min(projection:)
  recomputed_projection(projection, &:min)
end

.project(item, projection) ⇒ Object

Raises:

  • (KeyError)


87
88
89
90
91
92
93
94
95
96
97
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 87

def project(item, projection)
  return item if projection.nil?
  return projection.call(item) if projection.respond_to?(:call)

  key = projection.to_sym
  return item.output(key) if item.outputs.key?(key)
  return item.input(key) if item.inputs.key?(key)

  raise KeyError,
        "aggregate projection #{projection.inspect} not present on dataflow item #{item.key.inspect}"
end

.recomputed_projection(projection, &finalizer) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 99

def recomputed_projection(projection, &finalizer)
  Operator.new(
    initial_fn: -> { nil },
    project: ->(item) { project(item, projection) },
    add: nil,
    remove: nil,
    finalize: ->(_acc, contributions) { finalizer.call(contributions.values) },
    recompute: true
  )
end

.sum(projection:) ⇒ Object



30
31
32
33
34
35
36
37
38
39
# File 'lib/igniter/extensions/contracts/dataflow/aggregate_operators.rb', line 30

def sum(projection:)
  Operator.new(
    initial_fn: -> { 0 },
    project: ->(item) { project(item, projection).to_f },
    add: ->(acc, value) { acc + value },
    remove: ->(acc, value) { acc - value },
    finalize: ->(acc, _) { acc },
    recompute: false
  )
end