Class: Beacon::Flusher

Inherits:
Object
  • Object
show all
Defined in:
lib/beacon/flusher.rb

Overview

Background flusher: drains the bounded queue, builds a JSON batch, and POSTs it through the transport. Implements the retry, circuit breaker, and Idempotency-Key behavior from .doc/definition/05-clients.md.

The flusher is rescue-all. Its loop crashing is itself an event we log but never re-raise — the host’s request cycle keeps running.

Constant Summary collapse

BACKOFF_SECONDS =
[0.2, 0.4, 0.8, 1.6, 3.2].freeze
CIRCUIT_OPEN_SECONDS =
30.0
CIRCUIT_OPEN_THRESHOLD =
5
DRAIN_BATCH =
1_000
SHUTDOWN_FLUSH_TIMEOUT =
2.0
BATCH_MAX_BYTES =

Split serialized batches at this byte count. The Beacon server’s /events endpoint limits any single request body to roughly 1 MB; 800 KB is a conservative ceiling that leaves headroom for headers and JSON framing. When a single flush produces more, we issue multiple POSTs rather than failing with 413.

800 * 1024

Instance Method Summary collapse

Constructor Details

#initialize(client, transport:, backoff: BACKOFF_SECONDS, log_throttle: nil) ⇒ Flusher

Returns a new instance of Flusher.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/beacon/flusher.rb', line 24

def initialize(client, transport:, backoff: BACKOFF_SECONDS, log_throttle: nil)
  @client    = client
  @config    = client.config
  @transport = transport
  @backoff   = backoff
  @log_throttle = log_throttle || Beacon::LogThrottle.new

  @stop                 = false
  @thread               = nil
  @consecutive_failures = 0
  @circuit_open_until   = nil

  # Observability counters — read by Beacon.stats. Events written
  # under @stats_mutex so concurrent reads (from Beacon.stats on
  # the main thread) see a consistent snapshot.
  @stats_mutex       = Mutex.new
  @sent              = 0
  @last_flush_at     = nil
  @last_flush_status = nil
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/beacon/flusher.rb', line 72

def alive?
  @thread&.alive? ? true : false
end

#flush_nowObject



76
77
78
79
80
81
82
83
84
# File 'lib/beacon/flusher.rb', line 76

def flush_now
  loop do
    events = @client.queue.drain(DRAIN_BATCH)
    break if events.empty?
    send_events(events)
  end
rescue => e
  log_rescue(e)
end

#startObject



57
58
59
60
61
62
# File 'lib/beacon/flusher.rb', line 57

def start
  @stop   = false
  @thread = Thread.new { run_loop }
  @thread.name = "beacon-flusher" if @thread.respond_to?(:name=)
  @thread
end

#statsObject



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/beacon/flusher.rb', line 45

def stats
  @stats_mutex.synchronize do
    {
      sent:                 @sent,
      last_flush_at:        @last_flush_at,
      last_flush_status:    @last_flush_status,
      consecutive_failures: @consecutive_failures,
      circuit_open:         !@circuit_open_until.nil?,
    }
  end
end

#stopObject



64
65
66
67
68
69
70
# File 'lib/beacon/flusher.rb', line 64

def stop
  @stop = true
  @client.queue.signal_all  # wake wait_and_drain from its condvar
  @thread&.join(SHUTDOWN_FLUSH_TIMEOUT)
  flush_now
  nil
end