Class: ElasticGraph::Indexer::Operation::CountAccumulator

Inherits:
Data
  • Object
show all
Defined in:
lib/elastic_graph/indexer/operation/count_accumulator.rb

Overview

Responsible for maintaining state and accumulating list counts while we traverse the ‘data` we are preparing to update in the index. Much of the complexity here is due to the fact that we have 3 kinds of list fields: scalar lists, embedded object lists, and `nested` object lists.

The Elasticsearch/OpenSearch ‘nested` type indexes objects of this type as separate hidden documents. As a result, each `nested` object type gets its own `__counts` field. In contrast, embedded object lists get flattened into separate entries (one per field path) in a flat map (with `dot_separated_path: values_at_path` entries) at the document root.

We mirror this structure with our ‘__counts`: each document (either a root document, or a hidden `nested` document) gets its own `__counts` field, so we essentially have multiple “count parents”. Each `__counts` field is a map, keyed by field paths, and containing the number of list elements at that field path after the flattening has occurred.

The index mapping defines where the ‘__counts` fields go. This abstraction uses the mapping to determine when it needs to create a new “count parent”.

Note: instances of this class are “shallow immutable” (none of the attributes of an instance can be reassigned) but the ‘counts` attribute is itself a mutable hash–we use it to accumulate the list counts as we traverse the structure.

[^1]: www.elastic.co/guide/en/elasticsearch/reference/8.9/nested.html

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#countsObject (readonly)

Returns the value of attribute counts

Returns:

  • (Object)

    the current value of counts



35
36
37
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35

def counts
  @counts
end

#has_list_ancestorObject (readonly)

Returns the value of attribute has_list_ancestor

Returns:

  • (Object)

    the current value of has_list_ancestor



35
36
37
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35

def has_list_ancestor
  @has_list_ancestor
end

#list_counts_field_paths_for_sourceObject (readonly)

Returns the value of attribute list_counts_field_paths_for_source

Returns:

  • (Object)

    the current value of list_counts_field_paths_for_source



35
36
37
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35

def list_counts_field_paths_for_source
  @list_counts_field_paths_for_source
end

#mappingObject (readonly)

Returns the value of attribute mapping

Returns:

  • (Object)

    the current value of mapping



35
36
37
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35

def mapping
  @mapping
end

#path_from_parentObject (readonly)

Returns the value of attribute path_from_parent

Returns:

  • (Object)

    the current value of path_from_parent



35
36
37
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35

def path_from_parent
  @path_from_parent
end

#path_from_rootObject (readonly)

Returns the value of attribute path_from_root

Returns:

  • (Object)

    the current value of path_from_root



35
36
37
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35

def path_from_root
  @path_from_root
end

Class Method Details

.compute_list_counts_of(value, parent_accumulator) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 75

def self.compute_list_counts_of(value, parent_accumulator)
  case value
  when nil
    value
  when ::Hash
    parent_accumulator.maybe_increment
    parent_accumulator.process_hash(value) do |key, subvalue, accumulator|
      [key, compute_list_counts_of(subvalue, accumulator[key])]
    end
  when ::Array
    parent_accumulator.process_list(value) do |element, accumulator|
      compute_list_counts_of(element, accumulator)
    end
  else
    parent_accumulator.maybe_increment
    value
  end
end

.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 57

def self.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:)
  # Here we compute the counts of our list elements so that we can index it.
  data = compute_list_counts_of(params.fetch("data"), CountAccumulator.new_parent(
    # We merge in `type: nested` since the `nested` type indicates a new count accumulator parent and we want that applied at the root.
    mapping.merge("type" => "nested"),
    list_counts_field_paths_for_source
  ))

  # The root `__counts` field needs special handling due to our `sourced_from` feature. Anything in `data`
  # will overwrite what's in the specified fields when the script executes, but since there could be list
  # fields from multiple sources, we need `__counts` to get merged properly. So here we "promote" it from
  # `data.__counts` to being a root-level parameter.
  params.merge(
    "data" => data.except(LIST_COUNTS_FIELD),
    LIST_COUNTS_FIELD => data[LIST_COUNTS_FIELD]
  )
end

.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: nil) ⇒ Object

Creates an initially empty accumulator instance for a new parent (either at the overall document root are at the root of a ‘nested` object).



96
97
98
99
100
101
102
103
104
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 96

def self.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: nil)
  count_field_prefix = path_from_root ? "#{path_from_root}.#{LIST_COUNTS_FIELD}." : "#{LIST_COUNTS_FIELD}."

  initial_counts = (mapping.dig("properties", LIST_COUNTS_FIELD, "properties") || {}).filter_map do |field, _|
    [field, 0] if list_counts_field_paths_for_source.include?(count_field_prefix + field)
  end.to_h

  new(initial_counts, nil, path_from_root, mapping, list_counts_field_paths_for_source, false)
end

Instance Method Details

#[](subpath) ⇒ Object

Creates a “child” accumulator at the given subpath. Should be used as we traverse the data structure.



156
157
158
159
160
161
162
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 156

def [](subpath)
  with(
    path_from_parent: path_from_parent ? "#{path_from_parent}#{LIST_COUNTS_FIELD_PATH_KEY_SEPARATOR}#{subpath}" : subpath,
    path_from_root: path_from_root ? "#{path_from_root}.#{subpath}" : subpath,
    mapping: mapping.fetch("properties").fetch(subpath)
  )
end

#maybe_incrementObject

Increments the count at the current ‘path_from_parent` in the current parent’s counts hash if we are under a list.



148
149
150
151
152
153
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 148

def maybe_increment
  return unless has_list_ancestor

  key = path_from_parent.to_s
  counts[key] = counts.fetch(key) + 1
end

#process_hash(hash) ⇒ Object

Processes the given hash, beginning a new parent if need. A new parent is needed if the current mapping has a ‘__counts` field.

Yields repeatedly (once per hash entry). We yield the entry key/value, and an accumulator instance (either the current ‘self` or a new parent).

Afterwards, merges the resulting ‘__counts` into the hash before it’s returned, as needed.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 113

def process_hash(hash)
  mapping_type = mapping["type"]

  # As we traverse through the JSON object structure, we also have to traverse through the
  # condenseed mapping. Doing this requires that the `properties` of the index mapping
  # match the fields of the JSON data structure. However, Elasticsearch/OpenSearch have a number of field
  # types which can be represented as a JSON object in an indexing call, but which have no
  # `properties` in the mapping. We can't successfully traverse through the JSON data and the
  # mapping when we encounter these field types (since the mapping has no record of the
  # subfields) so we must treat these types as a special case; we can't proceed, and we won't
  # have any lists to count, anyway.
  return hash if DATASTORE_PROPERTYLESS_OBJECT_TYPES.include?(mapping_type)

  # THe `nested` type indicates a new document level, so if it's not `nested`, we should process the hash without making a new parent.
  return hash.to_h { |key, value| yield key, value, self } unless mapping_type == "nested"

  # ...but otherwise, we should make a new parent.
  new_parent = CountAccumulator.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: path_from_root)
  updated_hash = hash.to_h { |key, value| yield key, value, new_parent }

  # If we have a LIST_COUNTS_FIELD at this level of our mapping, we should merge in the counts hash from the new parent.
  if mapping.dig("properties", LIST_COUNTS_FIELD)
    updated_hash.merge(LIST_COUNTS_FIELD => new_parent.counts)
  else
    updated_hash
  end
end

#process_list(list) ⇒ Object

Processes the given list, tracking the fact that subpaths have a list ancestor.



142
143
144
145
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 142

def process_list(list)
  child_accumulator = with(has_list_ancestor: true)
  list.map { |value| yield value, child_accumulator }
end