Class: Schematic::EventBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/schematic/event_buffer.rb

Constant Summary collapse

DEFAULT_FLUSH_INTERVAL =

seconds (canonical Go value)

5.0
DEFAULT_MAX_BATCH_SIZE =
100
DEFAULT_MAX_RETRIES =
3
DEFAULT_INITIAL_RETRY_DELAY =

seconds

1.0
JITTER_FACTOR =
0.25
DEFAULT_CAPTURE_BASE_URL =
"https://c.schematichq.com"

Instance Method Summary collapse

Constructor Details

#initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_size: DEFAULT_MAX_BATCH_SIZE, offline: false, capture_base_url: DEFAULT_CAPTURE_BASE_URL) ⇒ EventBuffer

Returns a new instance of EventBuffer.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/schematic/event_buffer.rb', line 16

def initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_size: DEFAULT_MAX_BATCH_SIZE,
               offline: false, capture_base_url: DEFAULT_CAPTURE_BASE_URL)
  @api_key = api_key
  @logger = logger
  @interval = interval
  @max_batch_size = max_batch_size
  @offline = offline
  @capture_url = URI.parse("#{capture_base_url}/batch")
  @events = []
  @mutex = Mutex.new
  @stopped = false
  @flushing = false
  @flush_done = ConditionVariable.new

  start_periodic_flush unless @offline
end

Instance Method Details

#flushObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/schematic/event_buffer.rb', line 47

def flush
  events_to_send = nil
  did_set_flushing = false
  @mutex.synchronize do
    return if @flushing || @events.empty?

    @flushing = true
    did_set_flushing = true
    events_to_send = @events.dup
    @events.clear
  end

  return unless events_to_send&.any?

  send_batch(events_to_send)

  # Events may have accumulated while we were sending. Drain them so
  # a size-triggered flush that lost the race doesn't have to wait for
  # the next periodic interval.
  drain_pending
ensure
  if did_set_flushing
    @mutex.synchronize do
      @flushing = false
      @flush_done.broadcast
    end
  end
end

#push(event) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/schematic/event_buffer.rb', line 33

def push(event)
  return if @offline

  should_flush = false
  @mutex.synchronize do
    return if @stopped

    @events << event
    should_flush = @events.size >= @max_batch_size
  end

  flush if should_flush
end

#stopObject



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/schematic/event_buffer.rb', line 76

def stop
  @mutex.synchronize do
    @stopped = true

    # Wait for any in-flight flush to complete before our final flush,
    # so we don't skip events that arrived during the in-flight batch.
    @flush_done.wait(@mutex, 30) if @flushing
  end

  flush

  @flush_thread&.join(5)
end