Class: Logtide::Transport::Batcher

Inherits:
Object
  • Object
show all
Defined in:
lib/logtide/transport/batcher.rb

Overview

Owns the buffer and the background dispatcher (spec 002 sections 5-8). Capture appends to a bounded buffer and returns immediately; a worker thread flushes on batch size or interval, applying the retry policy and circuit breaker. flush/close are best-effort and never raise.

Instance Method Summary collapse

Constructor Details

#initialize(sender:, metrics:, circuit_breaker:, retry_policy:, batch_size: 100, max_buffer_size: 10_000, flush_interval: 5, flush_timeout: 10, sleeper: ->(seconds) { sleep(seconds) }, logger: nil) ⇒ Batcher

Returns a new instance of Batcher.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/logtide/transport/batcher.rb', line 12

def initialize(sender:, metrics:, circuit_breaker:, retry_policy:,
               batch_size: 100, max_buffer_size: 10_000,
               flush_interval: 5, flush_timeout: 10,
               sleeper: ->(seconds) { sleep(seconds) }, logger: nil)
  @sender = sender
  @metrics = metrics
  @circuit_breaker = circuit_breaker
  @retry_policy = retry_policy
  @batch_size = batch_size
  @flush_interval = flush_interval
  @flush_timeout = flush_timeout
  @sleeper = sleeper
  @logger = logger

  @buffer = Buffer.new(max_size: max_buffer_size)
  @mutex = Mutex.new
  @work = ConditionVariable.new
  @flushed = ConditionVariable.new
  @stop = false
  @closed = false
  @delivering = false
  @auth_warned = false
  @worker = Thread.new { run }
end

Instance Method Details

#close(timeout = @flush_timeout) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/logtide/transport/batcher.rb', line 62

def close(timeout = @flush_timeout)
  return if @closed

  flush(timeout)
  @mutex.synchronize do
    @stop = true
    @work.broadcast
  end
  @worker.join(timeout)
  @closed = true
end

#enqueue(entry) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/logtide/transport/batcher.rb', line 37

def enqueue(entry)
  return if stopped?

  if @buffer.push(entry)
    signal_worker if @buffer.size >= @batch_size
  else
    @metrics.increment(:logs_dropped)
    log("buffer full, dropping entry")
  end
end

#flush(timeout = @flush_timeout) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/logtide/transport/batcher.rb', line 48

def flush(timeout = @flush_timeout)
  deadline = monotonic + timeout
  @mutex.synchronize do
    @work.signal
    until drained?
      remaining = deadline - monotonic
      break if remaining <= 0

      @flushed.wait(@mutex, remaining)
    end
    drained?
  end
end