Class: ElasticGraph::Indexer::DatastoreIndexingRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/elastic_graph/indexer/datastore_indexing_router.rb

Overview

Responsible for routing datastore indexing requests to the appropriate cluster and index.

Defined Under Namespace

Classes: BulkResult

Instance Method Summary collapse

Constructor Details

#initialize(datastore_clients_by_name:, logger:) ⇒ DatastoreIndexingRouter

Returns a new instance of DatastoreIndexingRouter.



20
21
22
23
24
25
26
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 20

def initialize(
  datastore_clients_by_name:,
  logger:
)
  @datastore_clients_by_name = datastore_clients_by_name
  @logger = logger
end

Instance Method Details

#bulk(operations, refresh: false) ⇒ Object

Proxies ‘client#bulk` by converting `operations` to their bulk form. Returns a hash between a cluster and a list of successfully applied operations on that cluster.

For each operation, 1 of 4 things will happen, each of which will be treated differently:

1. The operation was successfully applied to the datastore and updated its state.
   The operation will be included in the successful operation of the returned result.
2. The operation could not even be attempted. For example, an `Update` operation
   cannot be attempted when the source event has `nil` for the field used as the source of
   the destination type's id. The returned result will not include this operation.
3. The operation was a no-op due to the external version not increasing. This happens when we
   process a duplicate or out-of-order event. The operation will be included in the returned
   result's list of noop results.
4. The operation failed outright for some other reason. The operation will be included in the
   returned result's failure results.

It is the caller’s responsibility to deal with any returned failures as this method does not raise an exception in that case.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 46

def bulk(operations, refresh: false)
  ops_by_client = ::Hash.new { |h, k| h[k] = [] } # : ::Hash[DatastoreCore::_Client, ::Array[_Operation]]
  unsupported_ops = ::Set.new # : ::Set[_Operation]

  operations.each do |op|
    # Note: this intentionally does not use `accessible_cluster_names_to_index_into`.
    # We want to fail with clear error if any clusters are inaccessible instead of silently ignoring
    # the named cluster. The `IndexingFailuresError` provides a clear error.
    cluster_names = op.destination_index_def.clusters_to_index_into

    cluster_names.each do |cluster_name|
      if (client = @datastore_clients_by_name[cluster_name])
        ops = ops_by_client[client] # : ::Array[::ElasticGraph::Indexer::_Operation]
        ops << op
      else
        unsupported_ops << op
      end
    end

    unsupported_ops << op if cluster_names.empty?
  end

  unless unsupported_ops.empty?
    raise IndexingFailuresError,
      "The index definitions for #{unsupported_ops.size} operations " \
      "(#{unsupported_ops.map { |o| Indexer::EventID.from_event(o.event) }.join(", ")}) " \
      "were configured to be inaccessible. Check the configuration, or avoid sending " \
      "events of this type to this ElasticGraph indexer."
  end

  ops_and_results_by_cluster = Support::Threading.parallel_map(ops_by_client) do |(client, ops)|
    responses = client.bulk(body: ops.flat_map(&:to_datastore_bulk), refresh: refresh).fetch("items")

    # As per https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body,
    # > `items` contains the result of each operation in the bulk request, in the order they were submitted.
    # Thus, we can trust it has the same cardinality as `ops` and they can be zipped together.
    ops_and_results = ops.zip(responses).map { |(op, response)| [op, op.categorize(response)] }
    [client.cluster_name, ops_and_results]
  end.to_h

  BulkResult.new(ops_and_results_by_cluster)
end

#source_event_versions_in_index(operations) ⇒ Object

Given a list of operations (which can contain different types of operations!), queries the datastore to identify the source event versions stored on the corresponding documents.

This was specifically designed to support dealing with malformed events. If an event is malformed we usually want to raise an exception, but if the document targeted by the malformed event is at a newer version in the index than the version number in the event, the malformed state of the event has already been superseded by a corrected event and we can just log a message instead. This method specifically supports that logic.

If the datastore returns errors for any of the calls, this method will raise an exception. Otherwise, this method returns a nested hash:

- The outer hash maps operations to an inner hash of results for that operation.
- The inner hash maps datastore cluster/client names to the version number for that operation from the datastore cluster.

Note that the returned ‘version` for an operation on a cluster can be `nil` (as when the document is not found, or for an operation type that doesn’t store source versions).

This nested structure is necessary because a single operation can target more than one datastore cluster, and a document may have different source event versions in different datastore clusters.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 155

def source_event_versions_in_index(operations)
  ops_by_client_name = ::Hash.new { |h, k| h[k] = [] } # : ::Hash[::String, ::Array[_Operation]]
  operations.each do |op|
    # Note: this intentionally does not use `accessible_cluster_names_to_index_into`.
    # We want to fail with clear error if any clusters are inaccessible instead of silently ignoring
    # the named cluster. The `IndexingFailuresError` provides a clear error.
    cluster_names = op.destination_index_def.clusters_to_index_into
    cluster_names.each { |cluster_name| ops_by_client_name[cluster_name] << op }
  end

  client_names_and_results = Support::Threading.parallel_map(ops_by_client_name) do |(client_name, all_ops)|
    # @type block: [::String, ::Symbol, ::Array[untyped] | ::Hash[_Operation, ::Array[::Integer]]]

    ops, unversioned_ops = all_ops.partition(&:versioned?) # : [::Array[Operation::Update], ::Array[Operation::Update]]

    msearch_response =
      if (client = @datastore_clients_by_name[client_name]) && ops.any?
        body = ops.flat_map do |op|
          [
            # Note: we intentionally search the entire index expression, not just an individual index based on a rollover timestamp.
            # And we intentionally do NOT provide a routing value--we want to find the version, no matter what shard the document
            # lives on.
            #
            # Since this `source_event_versions_in_index` is for handling malformed events, its possible that the
            # rollover timestamp or routing value on the operation is wrong and that the correct document lives in
            # a different shard and index than what the operation is targeted at. We want to search across all of them
            # so that we will find it, regardless of where it lives.
            {index: op.destination_index_def.index_expression_for_search},
            {
              # Filter to the documents matching the id.
              query: {ids: {values: [op.doc_id]}},
              # We only care about the version.
              _source: {includes: ["__versions.#{op.update_target.relationship}"]}
            }
          ]
        end

        headers = {
          OPAQUE_ID_HEADER => Support::OpaqueID.build_header(opaque_id_parts_for_source_event_versions(ops))
        }.compact # : ::Hash[::String, ::String]

        client.msearch(
          body: body,
          headers: headers
        )
      else
        # The named client doesn't exist, so we don't have any versions for the docs.
        {"responses" => ops.map { |op| {"hits" => {"hits" => _ = []}} }}
      end

    errors = msearch_response.fetch("responses").select { |res| res["error"] }

    if errors.empty?
      # We assume the size of the ops and the other array is the same and it cannot have `nil`.
      zip = ops.zip(msearch_response.fetch("responses")) # : ::Array[[Operation::Update, ::Hash[::String, ::Hash[::String, untyped]]]]

      versions_by_op = zip.to_h do |(op, response)|
        hits = response.fetch("hits").fetch("hits")

        if hits.size > 1
          # Got multiple results. The document is duplicated in multiple shards or indexes. Log a warning about this.
          @logger.warn({
            "message_type" => "IdentifyDocumentVersionsGotMultipleResults",
            "index" => hits.map { |h| h["_index"] },
            "routing" => hits.map { |h| h["_routing"] },
            "id" => hits.map { |h| h["_id"] },
            "version" => hits.map { |h| h["_version"] }
          })
        end

        versions = hits.filter_map do |hit|
          hit.dig("_source", "__versions", op.update_target.relationship, hit.fetch("_id"))
        end

        [op, versions.uniq]
      end

      unversioned_ops_hash = unversioned_ops.to_h do |op|
        [op, []] # : [Operation::Update, ::Array[::Integer]]
      end

      [client_name, :success, versions_by_op.merge(unversioned_ops_hash)]
    else
      [client_name, :failure, errors]
    end
  end

  failures = client_names_and_results.flat_map do |(client_name, success_or_failure, results)|
    if success_or_failure == :success
      []
    else
      results.map do |result|
        "From cluster #{client_name}: #{::JSON.generate(result, space: " ")}"
      end
    end
  end

  if failures.empty?
    # All results are success and the third element of the tuple is a hash.
    # Assign the results to narrow down the type.
    success_results = client_names_and_results # : ::Array[[::String, ::Symbol, ::Hash[_Operation, ::Array[::Integer]]]]

    success_results.each_with_object(_ = {}) do |(client_name, _success_or_failure, results), accum|
      results.each do |op, version|
        (accum[op] ||= {})[client_name] = version
      end
    end
  else
    raise Errors::IdentifyDocumentVersionsFailedError, "Got #{failures.size} failure(s) while querying the datastore " \
      "for document versions:\n\n#{failures.join("\n")}"
  end
end