Class: ElasticGraph::GraphQL::DatastoreSearchRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/elastic_graph/graphql/datastore_search_router.rb

Overview

Responsible for routing datastore search requests to the appropriate cluster and index.

Constant Summary collapse

INDICES_NOT_CONFIGURED_MESSAGE =
"The datastore indices have not been configured. They must be configured before ElasticGraph can serve queries."

Instance Method Summary collapse

Constructor Details

#initialize(datastore_clients_by_name:, logger:, monotonic_clock:, config:) ⇒ DatastoreSearchRouter

Returns a new instance of DatastoreSearchRouter.



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 20

def initialize(
  datastore_clients_by_name:,
  logger:,
  monotonic_clock:,
  config:
)
  @datastore_clients_by_name = datastore_clients_by_name
  @logger = logger
  @monotonic_clock = monotonic_clock
  @config = config
end

Instance Method Details

#msearch(queries, query_tracker: QueryDetailsTracker.empty, opaque_id_parts: ["elasticgraph-graphql"]) ⇒ Object

Sends the datastore a multi-search request based on the given queries. Returns a hash of responses keyed by the query.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 36

def msearch(queries, query_tracker: QueryDetailsTracker.empty, opaque_id_parts: ["elasticgraph-graphql"])
  DatastoreQuery.perform(queries) do |header_body_tuples_by_query|
    # Here we set a client-side timeout, which causes the client to give up and close the connection.
    # According to [1]--"We have a new way to cancel search requests efficiently from the client
    # in 7.4 (by closing the underlying http channel)"--this should cause the server to stop
    # executing the search, and more importantly, gives us a strictly enforced timeout.
    #
    # In addition, the datastore supports a `timeout` option on a search body, but this timeout is
    # "best effort", applies to each shard (and not to the overall search request), and only interrupts
    # certain kinds of operations. [2] and [3] below have more info.
    #
    # Note that I have not been able to observe this `timeout` on a search body ever working
    # as documented. In our test suite, none of the slow queries I have tried (both via
    # slow aggregation query and a slow script) have ever aborted early when that option is
    # set. In Kibana in production, @bsorbo observed it aborting a `search` request early
    # (but not necessarily an `msearch` request...), but even then, the response said `timed_out: false`!
    # Other people ([4]) have reported observing timeout having no effect on msearch requests.
    #
    # So, the client-side timeout is the main one we want here, and for now we are not using the
    # datastore search `timeout` option at all.
    #
    # For more info, see:
    #
    # [1] https://github.com/elastic/elasticsearch/issues/47716
    # [2] https://github.com/elastic/elasticsearch/pull/51858
    # [3] https://www.elastic.co/guide/en/elasticsearch/guide/current/_search_options.html#_timeout_2
    # [4] https://discuss.elastic.co/t/timeouts-ignored-in-multisearch/23673

    # Unfortunately, the Elasticsearch/OpenSearch clients don't support setting a per-request client-side timeout,
    # even though Faraday (the underlying HTTP client) does. To work around this, we pass our desired
    # timeout in a specific header that the `SupportTimeouts` Faraday middleware will use.
    headers = {TIMEOUT_MS_HEADER => msearch_request_timeout_from(queries)&.to_s}.compact
    if (opaque_id = Support::OpaqueID.build_header(opaque_id_parts))
      headers[OPAQUE_ID_HEADER] = opaque_id
    end

    queries_and_header_body_tuples_by_datastore_client = header_body_tuples_by_query.group_by do |(query, header_body_tuples)|
      @datastore_clients_by_name.fetch(query.cluster_name)
    end

    datastore_query_started_at = @monotonic_clock.now_in_ms

    server_took_and_results = Support::Threading.parallel_map(queries_and_header_body_tuples_by_datastore_client) do |datastore_client, query_and_header_body_tuples_for_cluster|
      queries_by_header_body_tuples = query_and_header_body_tuples_for_cluster
        .group_by { |(query, header_body_tuple)| header_body_tuple }
        .transform_values { |values| values.map(&:first) }

      msearch_body = queries_by_header_body_tuples.keys.flatten(1)
      response = datastore_client.msearch(body: msearch_body, headers: headers)
      debug_query(query: msearch_body, response: response)

      query_tracker.record_datastore_queries_for_single_request(queries_by_header_body_tuples.values.map(&:first))

      responses_by_header_body_tuple = queries_by_header_body_tuples.keys.zip(response.fetch("responses")).to_h
      ordered_queries_and_responses = query_and_header_body_tuples_for_cluster.map do |(query, header_body_tuple)|
        [query, responses_by_header_body_tuple.fetch(header_body_tuple)]
      end

      [response["took"], ordered_queries_and_responses]
    end

    queried_shard_count = server_took_and_results.reduce(0) do |outer_accum, (query, queries_and_responses)|
      outer_accum + queries_and_responses.reduce(0) do |inner_accum, (query, response)|
        shards_total = response.dig("_shards", "total")

        if shards_total == 0 && !query.excluding_indices?
          raise ::GraphQL::ExecutionError, INDICES_NOT_CONFIGURED_MESSAGE
        end

        inner_accum + (shards_total || 0)
      end
    end

    query_tracker.record_datastore_query_metrics(
      client_duration_ms: @monotonic_clock.now_in_ms - datastore_query_started_at,
      server_duration_ms: server_took_and_results.map(&:first).compact.max,
      queried_shard_count: queried_shard_count
    )

    server_took_and_results.flat_map(&:last).to_h.tap do |responses_by_query|
      log_shard_failure_if_necessary(responses_by_query)
      raise_search_failed_if_any_failures(responses_by_query)
    end
  end
end