Class: ElasticGraph::GraphQL::DatastoreSearchRouter
- Inherits:
-
Object
- Object
- ElasticGraph::GraphQL::DatastoreSearchRouter
- Defined in:
- lib/elastic_graph/graphql/datastore_search_router.rb
Overview
Responsible for routing datastore search requests to the appropriate cluster and index.
Constant Summary collapse
- INDICES_NOT_CONFIGURED_MESSAGE =
"The datastore indices have not been configured. They must be configured before ElasticGraph can serve queries."
Instance Method Summary collapse
-
#initialize(datastore_clients_by_name:, logger:, monotonic_clock:, config:) ⇒ DatastoreSearchRouter
constructor
A new instance of DatastoreSearchRouter.
-
#msearch(queries, query_tracker: QueryDetailsTracker.empty, opaque_id_parts: ["elasticgraph-graphql"]) ⇒ Object
Sends the datastore a multi-search request based on the given queries.
Constructor Details
#initialize(datastore_clients_by_name:, logger:, monotonic_clock:, config:) ⇒ DatastoreSearchRouter
Returns a new instance of DatastoreSearchRouter.
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 20 def initialize( datastore_clients_by_name:, logger:, monotonic_clock:, config: ) @datastore_clients_by_name = datastore_clients_by_name @logger = logger @monotonic_clock = monotonic_clock @config = config end |
Instance Method Details
#msearch(queries, query_tracker: QueryDetailsTracker.empty, opaque_id_parts: ["elasticgraph-graphql"]) ⇒ Object
Sends the datastore a multi-search request based on the given queries. Returns a hash of responses keyed by the query.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 36 def msearch(queries, query_tracker: QueryDetailsTracker.empty, opaque_id_parts: ["elasticgraph-graphql"]) DatastoreQuery.perform(queries) do |header_body_tuples_by_query| # Here we set a client-side timeout, which causes the client to give up and close the connection. # According to [1]--"We have a new way to cancel search requests efficiently from the client # in 7.4 (by closing the underlying http channel)"--this should cause the server to stop # executing the search, and more importantly, gives us a strictly enforced timeout. # # In addition, the datastore supports a `timeout` option on a search body, but this timeout is # "best effort", applies to each shard (and not to the overall search request), and only interrupts # certain kinds of operations. [2] and [3] below have more info. # # Note that I have not been able to observe this `timeout` on a search body ever working # as documented. In our test suite, none of the slow queries I have tried (both via # slow aggregation query and a slow script) have ever aborted early when that option is # set. In Kibana in production, @bsorbo observed it aborting a `search` request early # (but not necessarily an `msearch` request...), but even then, the response said `timed_out: false`! # Other people ([4]) have reported observing timeout having no effect on msearch requests. # # So, the client-side timeout is the main one we want here, and for now we are not using the # datastore search `timeout` option at all. # # For more info, see: # # [1] https://github.com/elastic/elasticsearch/issues/47716 # [2] https://github.com/elastic/elasticsearch/pull/51858 # [3] https://www.elastic.co/guide/en/elasticsearch/guide/current/_search_options.html#_timeout_2 # [4] https://discuss.elastic.co/t/timeouts-ignored-in-multisearch/23673 # Unfortunately, the Elasticsearch/OpenSearch clients don't support setting a per-request client-side timeout, # even though Faraday (the underlying HTTP client) does. To work around this, we pass our desired # timeout in a specific header that the `SupportTimeouts` Faraday middleware will use. headers = {TIMEOUT_MS_HEADER => msearch_request_timeout_from(queries)&.to_s}.compact if (opaque_id = Support::OpaqueID.build_header(opaque_id_parts)) headers[OPAQUE_ID_HEADER] = opaque_id end queries_and_header_body_tuples_by_datastore_client = header_body_tuples_by_query.group_by do |(query, header_body_tuples)| @datastore_clients_by_name.fetch(query.cluster_name) end datastore_query_started_at = @monotonic_clock.now_in_ms server_took_and_results = Support::Threading.parallel_map(queries_and_header_body_tuples_by_datastore_client) do |datastore_client, query_and_header_body_tuples_for_cluster| queries_by_header_body_tuples = query_and_header_body_tuples_for_cluster .group_by { |(query, header_body_tuple)| header_body_tuple } .transform_values { |values| values.map(&:first) } msearch_body = queries_by_header_body_tuples.keys.flatten(1) response = datastore_client.msearch(body: msearch_body, headers: headers) debug_query(query: msearch_body, response: response) query_tracker.record_datastore_queries_for_single_request(queries_by_header_body_tuples.values.map(&:first)) responses_by_header_body_tuple = queries_by_header_body_tuples.keys.zip(response.fetch("responses")).to_h ordered_queries_and_responses = query_and_header_body_tuples_for_cluster.map do |(query, header_body_tuple)| [query, responses_by_header_body_tuple.fetch(header_body_tuple)] end [response["took"], ordered_queries_and_responses] end queried_shard_count = server_took_and_results.reduce(0) do |outer_accum, (query, queries_and_responses)| outer_accum + queries_and_responses.reduce(0) do |inner_accum, (query, response)| shards_total = response.dig("_shards", "total") if shards_total == 0 && !query.excluding_indices? raise ::GraphQL::ExecutionError, INDICES_NOT_CONFIGURED_MESSAGE end inner_accum + (shards_total || 0) end end query_tracker.record_datastore_query_metrics( client_duration_ms: @monotonic_clock.now_in_ms - datastore_query_started_at, server_duration_ms: server_took_and_results.map(&:first).compact.max, queried_shard_count: queried_shard_count ) server_took_and_results.flat_map(&:last).to_h.tap do |responses_by_query| log_shard_failure_if_necessary(responses_by_query) raise_search_failed_if_any_failures(responses_by_query) end end end |