Class: Salopulse::Flusher
- Inherits:
-
Object
- Object
- Salopulse::Flusher
- Defined in:
- lib/salopulse/flusher.rb
Instance Method Summary collapse
- #flush_all(timeout: 5) ⇒ Object
- #flush_once ⇒ Object
-
#initialize(buffer:, transport:, interval:, batch_size:, logger:) ⇒ Flusher
constructor
A new instance of Flusher.
- #start ⇒ Object
- #stop(timeout: 5) ⇒ Object
Constructor Details
#initialize(buffer:, transport:, interval:, batch_size:, logger:) ⇒ Flusher
Returns a new instance of Flusher.
3 4 5 6 7 8 9 10 11 |
# File 'lib/salopulse/flusher.rb', line 3 def initialize(buffer:, transport:, interval:, batch_size:, logger:) @buffer = buffer @transport = transport @interval = interval @batch_size = batch_size @logger = logger @stop = false @thread = nil end |
Instance Method Details
#flush_all(timeout: 5) ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/salopulse/flusher.rb', line 37 def flush_all(timeout: 5) deadline = monotonic_now + timeout total = 0 while !@buffer.empty? && monotonic_now < deadline total += flush_once end total end |
#flush_once ⇒ Object
30 31 32 33 34 35 |
# File 'lib/salopulse/flusher.rb', line 30 def flush_once events = @buffer.drain(max: @batch_size) return 0 if events.empty? @transport.send_batch(events) events.size end |
#start ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/salopulse/flusher.rb', line 13 def start return if @thread&.alive? @stop = false @thread = Thread.new do Thread.current.name = "salopulse-flusher" if Thread.current.respond_to?(:name=) loop do break if @stop begin flush_once rescue StandardError => e @logger.error("[Salopulse] flusher error: #{e.class}: #{e.}") end sleep_with_interrupt(@interval) end end end |
#stop(timeout: 5) ⇒ Object
46 47 48 49 50 |
# File 'lib/salopulse/flusher.rb', line 46 def stop(timeout: 5) @stop = true flush_all(timeout: timeout) @thread&.join(timeout) end |