Class: OpenTrace::RingBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/opentrace/ring_buffer.rb

Constant Summary collapse

DEFAULT_CAPACITY =
1024
DEFAULT_MAX_BYTES =

10 MB

10_485_760
DEFAULT_ITEM_BYTE_SIZE =

1 KB estimate per item

1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity: DEFAULT_CAPACITY, max_bytes: DEFAULT_MAX_BYTES) ⇒ RingBuffer

Returns a new instance of RingBuffer.



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/opentrace/ring_buffer.rb', line 11

def initialize(capacity: DEFAULT_CAPACITY, max_bytes: DEFAULT_MAX_BYTES)
  @capacity  = capacity
  @max_bytes = max_bytes
  @buffer    = Array.new(capacity)
  @sizes     = Array.new(capacity, 0)
  @head      = 0 # next write position
  @tail      = 0 # next read position
  @count     = 0
  @byte_size = 0
  @closed    = false
  @mutex     = Mutex.new
  @not_empty = ConditionVariable.new
end

Instance Attribute Details

#capacityObject (readonly)

Returns the value of attribute capacity.



9
10
11
# File 'lib/opentrace/ring_buffer.rb', line 9

def capacity
  @capacity
end

#max_bytesObject (readonly)

Returns the value of attribute max_bytes.



9
10
11
# File 'lib/opentrace/ring_buffer.rb', line 9

def max_bytes
  @max_bytes
end

Instance Method Details

#byte_sizeObject

Total estimated bytes of items currently in the buffer.



96
97
98
# File 'lib/opentrace/ring_buffer.rb', line 96

def byte_size
  @mutex.synchronize { @byte_size }
end

#clearObject

Empties the buffer and resets byte tracking.



109
110
111
112
113
114
115
116
117
118
# File 'lib/opentrace/ring_buffer.rb', line 109

def clear
  @mutex.synchronize do
    @buffer.fill(nil)
    @sizes.fill(0)
    @head = 0
    @tail = 0
    @count = 0
    @byte_size = 0
  end
end

#closeObject

Signals consumers to stop. After close, push returns false and pop/ pop_batch return nil/[] once the buffer is drained.



122
123
124
125
126
127
# File 'lib/opentrace/ring_buffer.rb', line 122

def close
  @mutex.synchronize do
    @closed = true
    @not_empty.broadcast
  end
end

#closed?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/opentrace/ring_buffer.rb', line 129

def closed?
  @mutex.synchronize { @closed }
end

#empty?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/opentrace/ring_buffer.rb', line 104

def empty?
  @mutex.synchronize { @count == 0 }
end

#full?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/opentrace/ring_buffer.rb', line 100

def full?
  @mutex.synchronize { @count >= @capacity }
end

#popObject

Blocking pop. Waits until an item is available. Returns nil when closed and the buffer is empty.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/opentrace/ring_buffer.rb', line 46

def pop
  @mutex.synchronize do
    while @count == 0
      return nil if @closed
      @not_empty.wait(@mutex)
    end

    item = @buffer[@tail]
    item_size = @sizes[@tail]
    @buffer[@tail] = nil
    @sizes[@tail]  = 0
    @tail = (@tail + 1) % @capacity
    @count -= 1
    @byte_size -= item_size

    item
  end
end

#pop_batch(max) ⇒ Object

Drains up to max items at once. Returns an empty array when closed and the buffer is empty.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/opentrace/ring_buffer.rb', line 67

def pop_batch(max)
  @mutex.synchronize do
    while @count == 0
      return [] if @closed
      @not_empty.wait(@mutex)
    end

    n = [max, @count].min
    items = Array.new(n)

    n.times do |i|
      items[i] = @buffer[@tail]
      @byte_size -= @sizes[@tail]
      @buffer[@tail] = nil
      @sizes[@tail]  = 0
      @tail = (@tail + 1) % @capacity
      @count -= 1
    end

    items
  end
end

#push(item, byte_size: DEFAULT_ITEM_BYTE_SIZE) ⇒ Object

Non-blocking push. Returns true if enqueued, false if full, closed, or max_bytes would be exceeded.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/opentrace/ring_buffer.rb', line 27

def push(item, byte_size: DEFAULT_ITEM_BYTE_SIZE)
  @mutex.synchronize do
    return false if @closed
    return false if @count >= @capacity
    return false if @byte_size + byte_size > @max_bytes

    @buffer[@head] = item
    @sizes[@head]  = byte_size
    @head = (@head + 1) % @capacity
    @count += 1
    @byte_size += byte_size

    @not_empty.signal
    true
  end
end

#sizeObject

Current number of items in the buffer.



91
92
93
# File 'lib/opentrace/ring_buffer.rb', line 91

def size
  @mutex.synchronize { @count }
end