Class: ElasticGraph::Indexer::Operation::CountAccumulator
- Inherits:
-
Data
- Object
- Data
- ElasticGraph::Indexer::Operation::CountAccumulator
- Defined in:
- lib/elastic_graph/indexer/operation/count_accumulator.rb
Overview
Responsible for maintaining state and accumulating list counts while we traverse the ‘data` we are preparing to update in the index. Much of the complexity here is due to the fact that we have 3 kinds of list fields: scalar lists, embedded object lists, and `nested` object lists.
The Elasticsearch/OpenSearch ‘nested` type indexes objects of this type as separate hidden documents. As a result, each `nested` object type gets its own `__counts` field. In contrast, embedded object lists get flattened into separate entries (one per field path) in a flat map (with `dot_separated_path: values_at_path` entries) at the document root.
We mirror this structure with our ‘__counts`: each document (either a root document, or a hidden `nested` document) gets its own `__counts` field, so we essentially have multiple “count parents”. Each `__counts` field is a map, keyed by field paths, and containing the number of list elements at that field path after the flattening has occurred.
The index mapping defines where the ‘__counts` fields go. This abstraction uses the mapping to determine when it needs to create a new “count parent”.
Note: instances of this class are “shallow immutable” (none of the attributes of an instance can be reassigned) but the ‘counts` attribute is itself a mutable hash–we use it to accumulate the list counts as we traverse the structure.
[^1]: www.elastic.co/guide/en/elasticsearch/reference/8.9/nested.html
Instance Attribute Summary collapse
-
#counts ⇒ Object
readonly
Returns the value of attribute counts.
-
#has_list_ancestor ⇒ Object
readonly
Returns the value of attribute has_list_ancestor.
-
#list_counts_field_paths_for_source ⇒ Object
readonly
Returns the value of attribute list_counts_field_paths_for_source.
-
#mapping ⇒ Object
readonly
Returns the value of attribute mapping.
-
#path_from_parent ⇒ Object
readonly
Returns the value of attribute path_from_parent.
-
#path_from_root ⇒ Object
readonly
Returns the value of attribute path_from_root.
Class Method Summary collapse
- .compute_list_counts_of(value, parent_accumulator) ⇒ Object
- .merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:) ⇒ Object
-
.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: nil) ⇒ Object
Creates an initially empty accumulator instance for a new parent (either at the overall document root are at the root of a ‘nested` object).
Instance Method Summary collapse
-
#[](subpath) ⇒ Object
Creates a “child” accumulator at the given subpath.
-
#maybe_increment ⇒ Object
Increments the count at the current ‘path_from_parent` in the current parent’s counts hash if we are under a list.
-
#process_hash(hash) ⇒ Object
Processes the given hash, beginning a new parent if need.
-
#process_list(list) ⇒ Object
Processes the given list, tracking the fact that subpaths have a list ancestor.
Instance Attribute Details
#counts ⇒ Object (readonly)
Returns the value of attribute counts
35 36 37 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35 def counts @counts end |
#has_list_ancestor ⇒ Object (readonly)
Returns the value of attribute has_list_ancestor
35 36 37 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35 def has_list_ancestor @has_list_ancestor end |
#list_counts_field_paths_for_source ⇒ Object (readonly)
Returns the value of attribute list_counts_field_paths_for_source
35 36 37 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35 def list_counts_field_paths_for_source @list_counts_field_paths_for_source end |
#mapping ⇒ Object (readonly)
Returns the value of attribute mapping
35 36 37 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35 def mapping @mapping end |
#path_from_parent ⇒ Object (readonly)
Returns the value of attribute path_from_parent
35 36 37 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35 def path_from_parent @path_from_parent end |
#path_from_root ⇒ Object (readonly)
Returns the value of attribute path_from_root
35 36 37 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 35 def path_from_root @path_from_root end |
Class Method Details
.compute_list_counts_of(value, parent_accumulator) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 75 def self.compute_list_counts_of(value, parent_accumulator) case value when nil value when ::Hash parent_accumulator.maybe_increment parent_accumulator.process_hash(value) do |key, subvalue, accumulator| [key, compute_list_counts_of(subvalue, accumulator[key])] end when ::Array parent_accumulator.process_list(value) do |element, accumulator| compute_list_counts_of(element, accumulator) end else parent_accumulator.maybe_increment value end end |
.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 57 def self.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:) # Here we compute the counts of our list elements so that we can index it. data = compute_list_counts_of(params.fetch("data"), CountAccumulator.new_parent( # We merge in `type: nested` since the `nested` type indicates a new count accumulator parent and we want that applied at the root. mapping.merge("type" => "nested"), list_counts_field_paths_for_source )) # The root `__counts` field needs special handling due to our `sourced_from` feature. Anything in `data` # will overwrite what's in the specified fields when the script executes, but since there could be list # fields from multiple sources, we need `__counts` to get merged properly. So here we "promote" it from # `data.__counts` to being a root-level parameter. params.merge( "data" => data.except(LIST_COUNTS_FIELD), LIST_COUNTS_FIELD => data[LIST_COUNTS_FIELD] ) end |
.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: nil) ⇒ Object
Creates an initially empty accumulator instance for a new parent (either at the overall document root are at the root of a ‘nested` object).
96 97 98 99 100 101 102 103 104 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 96 def self.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: nil) count_field_prefix = path_from_root ? "#{path_from_root}.#{LIST_COUNTS_FIELD}." : "#{LIST_COUNTS_FIELD}." initial_counts = (mapping.dig("properties", LIST_COUNTS_FIELD, "properties") || {}).filter_map do |field, _| [field, 0] if list_counts_field_paths_for_source.include?(count_field_prefix + field) end.to_h new(initial_counts, nil, path_from_root, mapping, list_counts_field_paths_for_source, false) end |
Instance Method Details
#[](subpath) ⇒ Object
Creates a “child” accumulator at the given subpath. Should be used as we traverse the data structure.
156 157 158 159 160 161 162 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 156 def [](subpath) with( path_from_parent: path_from_parent ? "#{path_from_parent}#{LIST_COUNTS_FIELD_PATH_KEY_SEPARATOR}#{subpath}" : subpath, path_from_root: path_from_root ? "#{path_from_root}.#{subpath}" : subpath, mapping: mapping.fetch("properties").fetch(subpath) ) end |
#maybe_increment ⇒ Object
Increments the count at the current ‘path_from_parent` in the current parent’s counts hash if we are under a list.
148 149 150 151 152 153 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 148 def maybe_increment return unless has_list_ancestor key = path_from_parent.to_s counts[key] = counts.fetch(key) + 1 end |
#process_hash(hash) ⇒ Object
Processes the given hash, beginning a new parent if need. A new parent is needed if the current mapping has a ‘__counts` field.
Yields repeatedly (once per hash entry). We yield the entry key/value, and an accumulator instance (either the current ‘self` or a new parent).
Afterwards, merges the resulting ‘__counts` into the hash before it’s returned, as needed.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 113 def process_hash(hash) mapping_type = mapping["type"] # As we traverse through the JSON object structure, we also have to traverse through the # condenseed mapping. Doing this requires that the `properties` of the index mapping # match the fields of the JSON data structure. However, Elasticsearch/OpenSearch have a number of field # types which can be represented as a JSON object in an indexing call, but which have no # `properties` in the mapping. We can't successfully traverse through the JSON data and the # mapping when we encounter these field types (since the mapping has no record of the # subfields) so we must treat these types as a special case; we can't proceed, and we won't # have any lists to count, anyway. return hash if DATASTORE_PROPERTYLESS_OBJECT_TYPES.include?(mapping_type) # THe `nested` type indicates a new document level, so if it's not `nested`, we should process the hash without making a new parent. return hash.to_h { |key, value| yield key, value, self } unless mapping_type == "nested" # ...but otherwise, we should make a new parent. new_parent = CountAccumulator.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: path_from_root) updated_hash = hash.to_h { |key, value| yield key, value, new_parent } # If we have a LIST_COUNTS_FIELD at this level of our mapping, we should merge in the counts hash from the new parent. if mapping.dig("properties", LIST_COUNTS_FIELD) updated_hash.merge(LIST_COUNTS_FIELD => new_parent.counts) else updated_hash end end |
#process_list(list) ⇒ Object
Processes the given list, tracking the fact that subpaths have a list ancestor.
142 143 144 145 |
# File 'lib/elastic_graph/indexer/operation/count_accumulator.rb', line 142 def process_list(list) child_accumulator = with(has_list_ancestor: true) list.map { |value| yield value, child_accumulator } end |