Class: LogStash::Inputs::Elasticsearch::CursorTracker
- Inherits:
-
Object
- Object
- LogStash::Inputs::Elasticsearch::CursorTracker
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/inputs/elasticsearch/cursor_tracker.rb
Instance Attribute Summary collapse
-
#last_value ⇒ Object
readonly
Returns the value of attribute last_value.
Instance Method Summary collapse
- #checkpoint_cursor(intermediate: true) ⇒ Object
- #converge_last_value(&block) ⇒ Object
-
#initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) ⇒ CursorTracker
constructor
A new instance of CursorTracker.
- #inject_cursor(query_json) ⇒ Object
- #now_minus_30s ⇒ Object
- #record_last_value(event) ⇒ Object
Constructor Details
#initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) ⇒ CursorTracker
Returns a new instance of CursorTracker.
9 10 11 12 13 14 15 16 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 9 def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) @last_run_metadata_path = @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed @tracking_field = tracking_field logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}" @mutex = Mutex.new end |
Instance Attribute Details
#last_value ⇒ Object (readonly)
Returns the value of attribute last_value.
7 8 9 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 7 def last_value @last_value end |
Instance Method Details
#checkpoint_cursor(intermediate: true) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 18 def checkpoint_cursor(intermediate: true) @mutex.synchronize do if intermediate # in intermediate checkpoints pick the smallest converge_last_value {|v1, v2| v1 < v2 ? v1 : v2} else # in the last search of a PIT choose the largest converge_last_value {|v1, v2| v1 > v2 ? v1 : v2} @last_value_hashmap.clear end IO.write(@last_run_metadata_path, @last_value) end end |
#converge_last_value(&block) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 32 def converge_last_value(&block) return if @last_value_hashmap.empty? new_last_value = @last_value_hashmap.reduceValues(1000, &block) logger.debug? && logger.debug("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}") return if new_last_value == @last_value @last_value = new_last_value logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" end |
#inject_cursor(query_json) ⇒ Object
47 48 49 50 51 52 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 47 def inject_cursor(query_json) # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_minus_30s) logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result) result end |
#now_minus_30s ⇒ Object
54 55 56 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 54 def now_minus_30s Java::java.time.Instant.now.minusSeconds(30).to_s end |
#record_last_value(event) ⇒ Object
41 42 43 44 45 |
# File 'lib/logstash/inputs/elasticsearch/cursor_tracker.rb', line 41 def record_last_value(event) value = event.get(@tracking_field) logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}") @last_value_hashmap.put(Thread.current.object_id, value) end |