Class: Salopulse::Flusher

Inherits:
Object
  • Object
show all
Defined in:
lib/salopulse/flusher.rb

Instance Method Summary collapse

Constructor Details

#initialize(buffer:, transport:, interval:, batch_size:, logger:) ⇒ Flusher

Returns a new instance of Flusher.



3
4
5
6
7
8
9
10
11
# File 'lib/salopulse/flusher.rb', line 3

def initialize(buffer:, transport:, interval:, batch_size:, logger:)
  @buffer = buffer
  @transport = transport
  @interval = interval
  @batch_size = batch_size
  @logger = logger
  @stop = false
  @thread = nil
end

Instance Method Details

#flush_all(timeout: 5) ⇒ Object



37
38
39
40
41
42
43
44
# File 'lib/salopulse/flusher.rb', line 37

def flush_all(timeout: 5)
  deadline = monotonic_now + timeout
  total = 0
  while !@buffer.empty? && monotonic_now < deadline
    total += flush_once
  end
  total
end

#flush_onceObject



30
31
32
33
34
35
# File 'lib/salopulse/flusher.rb', line 30

def flush_once
  events = @buffer.drain(max: @batch_size)
  return 0 if events.empty?
  @transport.send_batch(events)
  events.size
end

#startObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/salopulse/flusher.rb', line 13

def start
  return if @thread&.alive?
  @stop = false
  @thread = Thread.new do
    Thread.current.name = "salopulse-flusher" if Thread.current.respond_to?(:name=)
    loop do
      break if @stop
      begin
        flush_once
      rescue StandardError => e
        @logger.error("[Salopulse] flusher error: #{e.class}: #{e.message}")
      end
      sleep_with_interrupt(@interval)
    end
  end
end

#stop(timeout: 5) ⇒ Object



46
47
48
49
50
# File 'lib/salopulse/flusher.rb', line 46

def stop(timeout: 5)
  @stop = true
  flush_all(timeout: timeout)
  @thread&.join(timeout)
end