Class: OpenTrace::RingBuffer
- Inherits:
-
Object
- Object
- OpenTrace::RingBuffer
- Defined in:
- lib/opentrace/ring_buffer.rb
Constant Summary collapse
- DEFAULT_CAPACITY =
1024- DEFAULT_MAX_BYTES =
10 MB
10_485_760- DEFAULT_ITEM_BYTE_SIZE =
1 KB estimate per item
1024
Instance Attribute Summary collapse
-
#capacity ⇒ Object
readonly
Returns the value of attribute capacity.
-
#max_bytes ⇒ Object
readonly
Returns the value of attribute max_bytes.
Instance Method Summary collapse
-
#byte_size ⇒ Object
Total estimated bytes of items currently in the buffer.
-
#clear ⇒ Object
Empties the buffer and resets byte tracking.
-
#close ⇒ Object
Signals consumers to stop.
- #closed? ⇒ Boolean
- #empty? ⇒ Boolean
- #full? ⇒ Boolean
-
#initialize(capacity: DEFAULT_CAPACITY, max_bytes: DEFAULT_MAX_BYTES) ⇒ RingBuffer
constructor
A new instance of RingBuffer.
-
#pop ⇒ Object
Blocking pop.
-
#pop_batch(max) ⇒ Object
Drains up to
maxitems at once. -
#push(item, byte_size: DEFAULT_ITEM_BYTE_SIZE) ⇒ Object
Non-blocking push.
-
#size ⇒ Object
Current number of items in the buffer.
Constructor Details
#initialize(capacity: DEFAULT_CAPACITY, max_bytes: DEFAULT_MAX_BYTES) ⇒ RingBuffer
Returns a new instance of RingBuffer.
11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/opentrace/ring_buffer.rb', line 11 def initialize(capacity: DEFAULT_CAPACITY, max_bytes: DEFAULT_MAX_BYTES) @capacity = capacity @max_bytes = max_bytes @buffer = Array.new(capacity) @sizes = Array.new(capacity, 0) @head = 0 # next write position @tail = 0 # next read position @count = 0 @byte_size = 0 @closed = false @mutex = Mutex.new @not_empty = ConditionVariable.new end |
Instance Attribute Details
#capacity ⇒ Object (readonly)
Returns the value of attribute capacity.
9 10 11 |
# File 'lib/opentrace/ring_buffer.rb', line 9 def capacity @capacity end |
#max_bytes ⇒ Object (readonly)
Returns the value of attribute max_bytes.
9 10 11 |
# File 'lib/opentrace/ring_buffer.rb', line 9 def max_bytes @max_bytes end |
Instance Method Details
#byte_size ⇒ Object
Total estimated bytes of items currently in the buffer.
96 97 98 |
# File 'lib/opentrace/ring_buffer.rb', line 96 def byte_size @mutex.synchronize { @byte_size } end |
#clear ⇒ Object
Empties the buffer and resets byte tracking.
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/opentrace/ring_buffer.rb', line 109 def clear @mutex.synchronize do @buffer.fill(nil) @sizes.fill(0) @head = 0 @tail = 0 @count = 0 @byte_size = 0 end end |
#close ⇒ Object
Signals consumers to stop. After close, push returns false and pop/ pop_batch return nil/[] once the buffer is drained.
122 123 124 125 126 127 |
# File 'lib/opentrace/ring_buffer.rb', line 122 def close @mutex.synchronize do @closed = true @not_empty.broadcast end end |
#closed? ⇒ Boolean
129 130 131 |
# File 'lib/opentrace/ring_buffer.rb', line 129 def closed? @mutex.synchronize { @closed } end |
#empty? ⇒ Boolean
104 105 106 |
# File 'lib/opentrace/ring_buffer.rb', line 104 def empty? @mutex.synchronize { @count == 0 } end |
#full? ⇒ Boolean
100 101 102 |
# File 'lib/opentrace/ring_buffer.rb', line 100 def full? @mutex.synchronize { @count >= @capacity } end |
#pop ⇒ Object
Blocking pop. Waits until an item is available. Returns nil when closed and the buffer is empty.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/opentrace/ring_buffer.rb', line 46 def pop @mutex.synchronize do while @count == 0 return nil if @closed @not_empty.wait(@mutex) end item = @buffer[@tail] item_size = @sizes[@tail] @buffer[@tail] = nil @sizes[@tail] = 0 @tail = (@tail + 1) % @capacity @count -= 1 @byte_size -= item_size item end end |
#pop_batch(max) ⇒ Object
Drains up to max items at once. Returns an empty array when closed and the buffer is empty.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/opentrace/ring_buffer.rb', line 67 def pop_batch(max) @mutex.synchronize do while @count == 0 return [] if @closed @not_empty.wait(@mutex) end n = [max, @count].min items = Array.new(n) n.times do |i| items[i] = @buffer[@tail] @byte_size -= @sizes[@tail] @buffer[@tail] = nil @sizes[@tail] = 0 @tail = (@tail + 1) % @capacity @count -= 1 end items end end |
#push(item, byte_size: DEFAULT_ITEM_BYTE_SIZE) ⇒ Object
Non-blocking push. Returns true if enqueued, false if full, closed, or max_bytes would be exceeded.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/opentrace/ring_buffer.rb', line 27 def push(item, byte_size: DEFAULT_ITEM_BYTE_SIZE) @mutex.synchronize do return false if @closed return false if @count >= @capacity return false if @byte_size + byte_size > @max_bytes @buffer[@head] = item @sizes[@head] = byte_size @head = (@head + 1) % @capacity @count += 1 @byte_size += byte_size @not_empty.signal true end end |
#size ⇒ Object
Current number of items in the buffer.
91 92 93 |
# File 'lib/opentrace/ring_buffer.rb', line 91 def size @mutex.synchronize { @count } end |