Class: Salopulse::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/salopulse/buffer.rb

Constant Summary collapse

DEFAULT_MAX_SIZE =
10_000

Instance Method Summary collapse

Constructor Details

#initialize(max_size: DEFAULT_MAX_SIZE) ⇒ Buffer

Returns a new instance of Buffer.



7
8
9
10
11
12
# File 'lib/salopulse/buffer.rb', line 7

def initialize(max_size: DEFAULT_MAX_SIZE)
  @queue = Queue.new
  @max_size = max_size
  @mutex = Mutex.new
  @dropped = 0
end

Instance Method Details

#drain(max: 100) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/salopulse/buffer.rb', line 27

def drain(max: 100)
  events = []
  max.times do
    break if @queue.empty?
    begin
      events << @queue.pop(true)
    rescue ThreadError
      break
    end
  end
  events
end

#dropped_countObject



48
49
50
# File 'lib/salopulse/buffer.rb', line 48

def dropped_count
  @mutex.synchronize { @dropped }
end

#empty?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/salopulse/buffer.rb', line 44

def empty?
  @queue.empty?
end

#push(event) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/salopulse/buffer.rb', line 14

def push(event)
  if @queue.size >= @max_size
    @mutex.synchronize { @dropped += 1 }
    return false
  end
  @queue.push(event)
  true
end

#push_many(events) ⇒ Object



23
24
25
# File 'lib/salopulse/buffer.rb', line 23

def push_many(events)
  events.each { |e| push(e) }
end

#reset_dropped!Object



52
53
54
# File 'lib/salopulse/buffer.rb', line 52

def reset_dropped!
  @mutex.synchronize { @dropped = 0 }
end

#sizeObject



40
41
42
# File 'lib/salopulse/buffer.rb', line 40

def size
  @queue.size
end