Class: GetFluxly::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/getfluxly/batch.rb

Overview

Bounded thread-safe event queue + flush bookkeeping.

Defined Under Namespace

Classes: FlushResult

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size:) ⇒ Batch

Returns a new instance of Batch.



21
22
23
24
25
# File 'lib/getfluxly/batch.rb', line 21

def initialize(max_size:)
  @max_size = max_size
  @queue = []
  @mutex = Mutex.new
end

Class Method Details

.empty_flush_resultObject



17
18
19
# File 'lib/getfluxly/batch.rb', line 17

def self.empty_flush_result
  FlushResult.new(0, 0, 0, [])
end

Instance Method Details

#drain(max_events) ⇒ Object



46
47
48
49
50
51
# File 'lib/getfluxly/batch.rb', line 46

def drain(max_events)
  @mutex.synchronize do
    out = @queue.shift([max_events, @queue.size].min)
    out
  end
end

#enqueue(payload) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/getfluxly/batch.rb', line 31

def enqueue(payload)
  @mutex.synchronize do
    if @queue.size >= @max_size
      raise GetFluxly::Error.new(
        "event queue is full (#{@max_size}); flush before enqueueing more",
        code: "queue_overflow",
        retryable: false,
        details: { "max_size" => @max_size }
      )
    end
    @queue.push(payload)
    @queue.size
  end
end

#requeue_front(payloads) ⇒ Object



53
54
55
56
57
# File 'lib/getfluxly/batch.rb', line 53

def requeue_front(payloads)
  @mutex.synchronize do
    @queue = payloads + @queue
  end
end

#sizeObject



27
28
29
# File 'lib/getfluxly/batch.rb', line 27

def size
  @mutex.synchronize { @queue.size }
end