Class: Karafka::Web::Pro::Ui::Lib::Search::Runner

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/pro/ui/lib/search/runner.rb

Overview

Note:

When running a search from latest, we stop when message timestamp is higher then the time when lookup started. This prevents us from iterating on topics for an extended time period when there are less messages than the requested amount but new are coming in real time. It acts as a virtual “eof”.

Search runner that selects proper matcher, sets the parameters and runs the search We use the Pro Iterator for searching but this runner adds metadata tracking and some other metrics that are useful in the Web UI

Instance Method Summary collapse

Constructor Details

#initialize(topic, partitions_count, search_criteria) ⇒ Runner

Returns a new instance of Runner.

Parameters:

  • topic (String)

    topic in which we want to search

  • partitions_count (Integer)

    how many partitions this topic has

  • search_criteria (Hash)

    normalized search criteria



74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/web/pro/ui/lib/search/runner.rb', line 74

def initialize(topic, partitions_count, search_criteria)
  @topic = topic
  @partitions_count = partitions_count
  @search_criteria = search_criteria
  @partitions_stats = Hash.new { |h, k| h[k] = METRICS_BASE.dup }
  @totals_stats = METRICS_BASE.dup
  @timeout = Web.config.ui.search.timeout
  @stop_reason = nil
  @matched = []
end

Instance Method Details

#callArray<Array<Karafka::Messages::Message>, Hash>

Note:

Results are sorted based on the time value.

Runs the search, collects search statistics and returns the results

Returns:

  • (Array<Array<Karafka::Messages::Message>, Hash>)

    array with search results and metadata



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/web/pro/ui/lib/search/runner.rb', line 89

def call
  search_with_stats

  [
    # We return most recent results on top
    @matched.sort_by(&:timestamp).reverse,
    {
      totals: @totals_stats,
      partitions: @partitions_stats,
      stop_reason: @stop_reason
    }.freeze
  ]
end