Class: Featureflip::Events::EventProcessor
- Inherits:
-
Object
- Object
- Featureflip::Events::EventProcessor
- Defined in:
- lib/featureflip/events/event_processor.rb
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(http_client, flush_interval: 30, flush_batch_size: 100) ⇒ EventProcessor
constructor
A new instance of EventProcessor.
- #queue_event(event) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(http_client, flush_interval: 30, flush_batch_size: 100) ⇒ EventProcessor
Returns a new instance of EventProcessor.
4 5 6 7 8 9 10 11 12 |
# File 'lib/featureflip/events/event_processor.rb', line 4 def initialize(http_client, flush_interval: 30, flush_batch_size: 100) @http_client = http_client @flush_interval = flush_interval @flush_batch_size = flush_batch_size @queue = [] @mutex = Mutex.new @stop_flag = false @thread = nil end |
Instance Method Details
#flush ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/featureflip/events/event_processor.rb', line 23 def flush events_to_send = nil @mutex.synchronize do return if @queue.empty? events_to_send = @queue.dup @queue.clear end return unless events_to_send&.any? @http_client.post_events(events_to_send) rescue StandardError # Events are best-effort — drop on failure end |
#queue_event(event) ⇒ Object
14 15 16 17 18 19 20 21 |
# File 'lib/featureflip/events/event_processor.rb', line 14 def queue_event(event) should_flush = false @mutex.synchronize do @queue << event should_flush = @queue.length >= @flush_batch_size end flush if should_flush end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/featureflip/events/event_processor.rb', line 38 def start @stop_flag = false @thread = Thread.new do elapsed = 0 until @stop_flag sleep(1) elapsed += 1 queue_size = @mutex.synchronize { @queue.length } if elapsed >= @flush_interval || queue_size >= @flush_batch_size elapsed = 0 flush unless @stop_flag end end end end |
#stop ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/featureflip/events/event_processor.rb', line 54 def stop @stop_flag = true @thread&.wakeup rescue nil @thread&.join(5) @thread = nil flush end |