Class: Schematic::EventBuffer
- Inherits:
-
Object
- Object
- Schematic::EventBuffer
- Defined in:
- lib/schematic/event_buffer.rb
Constant Summary collapse
- DEFAULT_FLUSH_INTERVAL =
seconds (canonical Go value)
5.0- DEFAULT_MAX_BATCH_SIZE =
100- DEFAULT_MAX_RETRIES =
3- DEFAULT_INITIAL_RETRY_DELAY =
seconds
1.0- JITTER_FACTOR =
0.25- DEFAULT_CAPTURE_BASE_URL =
"https://c.schematichq.com"
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_size: DEFAULT_MAX_BATCH_SIZE, offline: false, capture_base_url: DEFAULT_CAPTURE_BASE_URL) ⇒ EventBuffer
constructor
A new instance of EventBuffer.
- #push(event) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_size: DEFAULT_MAX_BATCH_SIZE, offline: false, capture_base_url: DEFAULT_CAPTURE_BASE_URL) ⇒ EventBuffer
Returns a new instance of EventBuffer.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/schematic/event_buffer.rb', line 16 def initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_size: DEFAULT_MAX_BATCH_SIZE, offline: false, capture_base_url: DEFAULT_CAPTURE_BASE_URL) @api_key = api_key @logger = logger @interval = interval @max_batch_size = max_batch_size @offline = offline @capture_url = URI.parse("#{capture_base_url}/batch") @events = [] @mutex = Mutex.new @stopped = false @flushing = false @flush_done = ConditionVariable.new start_periodic_flush unless @offline end |
Instance Method Details
#flush ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/schematic/event_buffer.rb', line 47 def flush events_to_send = nil did_set_flushing = false @mutex.synchronize do return if @flushing || @events.empty? @flushing = true did_set_flushing = true events_to_send = @events.dup @events.clear end return unless events_to_send&.any? send_batch(events_to_send) # Events may have accumulated while we were sending. Drain them so # a size-triggered flush that lost the race doesn't have to wait for # the next periodic interval. drain_pending ensure if did_set_flushing @mutex.synchronize do @flushing = false @flush_done.broadcast end end end |
#push(event) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/schematic/event_buffer.rb', line 33 def push(event) return if @offline should_flush = false @mutex.synchronize do return if @stopped @events << event should_flush = @events.size >= @max_batch_size end flush if should_flush end |
#stop ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/schematic/event_buffer.rb', line 76 def stop @mutex.synchronize do @stopped = true # Wait for any in-flight flush to complete before our final flush, # so we don't skip events that arrived during the in-flight batch. @flush_done.wait(@mutex, 30) if @flushing end flush @flush_thread&.join(5) end |