Class: Logtide::Transport::Batcher
- Inherits:
-
Object
- Object
- Logtide::Transport::Batcher
- Defined in:
- lib/logtide/transport/batcher.rb
Overview
Owns the buffer and the background dispatcher (spec 002 sections 5-8). Capture appends to a bounded buffer and returns immediately; a worker thread flushes on batch size or interval, applying the retry policy and circuit breaker. flush/close are best-effort and never raise.
Instance Method Summary collapse
- #close(timeout = @flush_timeout) ⇒ Object
- #enqueue(entry) ⇒ Object
- #flush(timeout = @flush_timeout) ⇒ Object
-
#initialize(sender:, metrics:, circuit_breaker:, retry_policy:, batch_size: 100, max_buffer_size: 10_000, flush_interval: 5, flush_timeout: 10, sleeper: ->(seconds) { sleep(seconds) }, logger: nil) ⇒ Batcher
constructor
A new instance of Batcher.
Constructor Details
#initialize(sender:, metrics:, circuit_breaker:, retry_policy:, batch_size: 100, max_buffer_size: 10_000, flush_interval: 5, flush_timeout: 10, sleeper: ->(seconds) { sleep(seconds) }, logger: nil) ⇒ Batcher
Returns a new instance of Batcher.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/logtide/transport/batcher.rb', line 12 def initialize(sender:, metrics:, circuit_breaker:, retry_policy:, batch_size: 100, max_buffer_size: 10_000, flush_interval: 5, flush_timeout: 10, sleeper: ->(seconds) { sleep(seconds) }, logger: nil) @sender = sender @metrics = metrics @circuit_breaker = circuit_breaker @retry_policy = retry_policy @batch_size = batch_size @flush_interval = flush_interval @flush_timeout = flush_timeout @sleeper = sleeper @logger = logger @buffer = Buffer.new(max_size: max_buffer_size) @mutex = Mutex.new @work = ConditionVariable.new @flushed = ConditionVariable.new @stop = false @closed = false @delivering = false @auth_warned = false @worker = Thread.new { run } end |
Instance Method Details
#close(timeout = @flush_timeout) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/logtide/transport/batcher.rb', line 62 def close(timeout = @flush_timeout) return if @closed flush(timeout) @mutex.synchronize do @stop = true @work.broadcast end @worker.join(timeout) @closed = true end |
#enqueue(entry) ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/logtide/transport/batcher.rb', line 37 def enqueue(entry) return if stopped? if @buffer.push(entry) signal_worker if @buffer.size >= @batch_size else @metrics.increment(:logs_dropped) log("buffer full, dropping entry") end end |
#flush(timeout = @flush_timeout) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/logtide/transport/batcher.rb', line 48 def flush(timeout = @flush_timeout) deadline = monotonic + timeout @mutex.synchronize do @work.signal until drained? remaining = deadline - monotonic break if remaining <= 0 @flushed.wait(@mutex, remaining) end drained? end end |