Class: NewRelic::Agent::InfiniteTracing::StreamingBuffer
- Inherits:
-
Object
- Object
- NewRelic::Agent::InfiniteTracing::StreamingBuffer
- Extended by:
- Forwardable
- Includes:
- Enumerable, Constants
- Defined in:
- lib/infinite_tracing/streaming_buffer.rb
Constant Summary collapse
- DEFAULT_QUEUE_SIZE =
10_000- FLUSH_DELAY =
0.005- MAX_FLUSH_WAIT =
three seconds
3
Constants included from Constants
Constants::GRPC_ERROR_NAME_METRIC, Constants::GRPC_OTHER_ERROR_METRIC, Constants::QUEUE_DUMPED_METRIC, Constants::RESPONSE_ERROR_METRIC, Constants::SPANS_SEEN_METRIC, Constants::SPANS_SENT_METRIC, Constants::SUPPORTABILITY_PREFIX
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#<<(segment) ⇒ Object
Pushes the segment given onto the queue.
-
#batch_enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present.
-
#clear_queue ⇒ Object
Drops all segments from the queue and records a supportability metric for the event.
- #close_queue ⇒ Object
-
#enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present If
nilis popped, the queue is closing. -
#flush_queue ⇒ Object
Waits for the queue to be fully consumed or for the waiting consumers to release.
-
#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer
constructor
A new instance of StreamingBuffer.
-
#transfer(new_buffer) ⇒ Object
Dumps the contents of this streaming buffer onto the given buffer and closes the queue.
Constructor Details
#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer
Returns a new instance of StreamingBuffer.
24 25 26 27 28 29 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 24 def initialize(max_size = DEFAULT_QUEUE_SIZE) @max_size = max_size @lock = Mutex.new @queue = Queue.new @batch = Array.new end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
22 23 24 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 22 def queue @queue end |
Instance Method Details
#<<(segment) ⇒ Object
Pushes the segment given onto the queue.
If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.
When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.
48 49 50 51 52 53 54 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 48 def <<(segment) @lock.synchronize do clear_queue if @queue.size >= @max_size NewRelic::Agent.increment_metric(SPANS_SEEN_METRIC) @queue.push(segment) end end |
#batch_enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present
yielding is deferred until batch_size spans is reached.
If nil is popped, the queue is closing. A final yield on non-empty batch is fired.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 119 def batch_enumerator return enum_for(:enumerator) unless block_given? loop do if proc_or_segment = @queue.pop(false) NewRelic::Agent.increment_metric(SPANS_SENT_METRIC) @batch << transform(proc_or_segment) if @batch.size >= BATCH_SIZE yield(SpanBatch.new(spans: @batch)) @batch.clear end else yield(SpanBatch.new(spans: @batch)) unless @batch.empty? raise ClosedQueueError end end end |
#clear_queue ⇒ Object
Drops all segments from the queue and records a supportability metric for the event.
58 59 60 61 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 58 def clear_queue @queue.clear NewRelic::Agent.increment_metric(QUEUE_DUMPED_METRIC) end |
#close_queue ⇒ Object
81 82 83 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 81 def close_queue @lock.synchronize { @queue.close } end |
#enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present If nil is popped, the queue is closing.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 93 def enumerator return enum_for(:enumerator) unless block_given? loop do if segment = @queue.pop(false) NewRelic::Agent.increment_metric(SPANS_SENT_METRIC) yield(transform(segment)) else raise ClosedQueueError end end end |
#flush_queue ⇒ Object
Waits for the queue to be fully consumed or for the waiting consumers to release.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 65 def flush_queue @queue.num_waiting.times { @queue.push(nil) } close_queue # Logs if we're throwing away spans because nothing's # waiting to take them off the queue. if @queue.num_waiting == 0 && !@queue.empty? NewRelic::Agent.logger.warn("Discarding #{@queue.size} segments on Streaming Buffer") return end # Only wait a short while for queue to flush cutoff = Process.clock_gettime(Process::CLOCK_MONOTONIC) + MAX_FLUSH_WAIT until @queue.empty? || Process.clock_gettime(Process::CLOCK_MONOTONIC) >= cutoff do sleep(FLUSH_DELAY) end end |
#transfer(new_buffer) ⇒ Object
Dumps the contents of this streaming buffer onto the given buffer and closes the queue
33 34 35 36 37 38 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 33 def transfer(new_buffer) @lock.synchronize do until @queue.empty? do new_buffer.push(@queue.pop) end @queue.close end end |