Class: Beacon::Flusher
- Inherits:
-
Object
- Object
- Beacon::Flusher
- Defined in:
- lib/beacon/flusher.rb
Overview
Background flusher: drains the bounded queue, builds a JSON batch, and POSTs it through the transport. Implements the retry, circuit breaker, and Idempotency-Key behavior from .doc/definition/05-clients.md.
The flusher is rescue-all. Its loop crashing is itself an event we log but never re-raise — the host’s request cycle keeps running.
Constant Summary collapse
- BACKOFF_SECONDS =
[0.2, 0.4, 0.8, 1.6, 3.2].freeze
- CIRCUIT_OPEN_SECONDS =
30.0- CIRCUIT_OPEN_THRESHOLD =
5- DRAIN_BATCH =
1_000- SHUTDOWN_FLUSH_TIMEOUT =
2.0- BATCH_MAX_BYTES =
Split serialized batches at this byte count. The Beacon server’s /events endpoint limits any single request body to roughly 1 MB; 800 KB is a conservative ceiling that leaves headroom for headers and JSON framing. When a single flush produces more, we issue multiple POSTs rather than failing with 413.
800 * 1024
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #flush_now ⇒ Object
-
#initialize(client, transport:, backoff: BACKOFF_SECONDS, log_throttle: nil) ⇒ Flusher
constructor
A new instance of Flusher.
- #start ⇒ Object
- #stats ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(client, transport:, backoff: BACKOFF_SECONDS, log_throttle: nil) ⇒ Flusher
Returns a new instance of Flusher.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/beacon/flusher.rb', line 24 def initialize(client, transport:, backoff: BACKOFF_SECONDS, log_throttle: nil) @client = client @config = client.config @transport = transport @backoff = backoff @log_throttle = log_throttle || Beacon::LogThrottle.new @stop = false @thread = nil @consecutive_failures = 0 @circuit_open_until = nil # Observability counters — read by Beacon.stats. Events written # under @stats_mutex so concurrent reads (from Beacon.stats on # the main thread) see a consistent snapshot. @stats_mutex = Mutex.new @sent = 0 @last_flush_at = nil @last_flush_status = nil end |
Instance Method Details
#alive? ⇒ Boolean
72 73 74 |
# File 'lib/beacon/flusher.rb', line 72 def alive? @thread&.alive? ? true : false end |
#flush_now ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/beacon/flusher.rb', line 76 def flush_now loop do events = @client.queue.drain(DRAIN_BATCH) break if events.empty? send_events(events) end rescue => e log_rescue(e) end |
#start ⇒ Object
57 58 59 60 61 62 |
# File 'lib/beacon/flusher.rb', line 57 def start @stop = false @thread = Thread.new { run_loop } @thread.name = "beacon-flusher" if @thread.respond_to?(:name=) @thread end |
#stats ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/beacon/flusher.rb', line 45 def stats @stats_mutex.synchronize do { sent: @sent, last_flush_at: @last_flush_at, last_flush_status: @last_flush_status, consecutive_failures: @consecutive_failures, circuit_open: !@circuit_open_until.nil?, } end end |
#stop ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/beacon/flusher.rb', line 64 def stop @stop = true @client.queue.signal_all # wake wait_and_drain from its condvar @thread&.join(SHUTDOWN_FLUSH_TIMEOUT) flush_now nil end |