Class: LogStash::Inputs::Elasticsearch::CursorTracker

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/inputs/elasticsearch/cursor_tracker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) ⇒ CursorTracker

Returns a new instance of CursorTracker.



9
10
11
12
13
14
15
16
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 9

def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:)
  @last_run_metadata_path = 
  @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new
  @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed
  @tracking_field = tracking_field
  logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}"
  @mutex = Mutex.new
end

Instance Attribute Details

#last_valueObject (readonly)

Returns the value of attribute last_value.



7
8
9
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 7

def last_value
  @last_value
end

Instance Method Details

#checkpoint_cursor(intermediate: true) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 18

def checkpoint_cursor(intermediate: true)
  @mutex.synchronize do
    if intermediate
      # in intermediate checkpoints pick the smallest
      converge_last_value {|v1, v2| v1 < v2 ? v1 : v2}
    else
      # in the last search of a PIT choose the largest
      converge_last_value {|v1, v2| v1 > v2 ? v1 : v2}
      @last_value_hashmap.clear
    end
    IO.write(@last_run_metadata_path, @last_value)
  end
end

#converge_last_value(&block) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 32

def converge_last_value(&block)
  return if @last_value_hashmap.empty?
  new_last_value = @last_value_hashmap.reduceValues(1000, &block)
  logger.debug? && logger.debug("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}")
  return if new_last_value == @last_value
  @last_value = new_last_value
  logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}"
end

#inject_cursor(query_json) ⇒ Object



47
48
49
50
51
52
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 47

def inject_cursor(query_json)
  # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT
  result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_minus_30s)
  logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result)
  result
end

#now_minus_30sObject



54
55
56
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 54

def now_minus_30s
  Java::java.time.Instant.now.minusSeconds(30).to_s
end

#record_last_value(event) ⇒ Object



41
42
43
44
45
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 41

def record_last_value(event)
  value = event.get(@tracking_field)
  logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}")
  @last_value_hashmap.put(Thread.current.object_id, value)
end