Class: NewRelic::Agent::InfiniteTracing::StreamingBuffer
- Inherits:
-
Object
- Object
- NewRelic::Agent::InfiniteTracing::StreamingBuffer
- Extended by:
- Forwardable
- Includes:
- Enumerable, Constants
- Defined in:
- lib/infinite_tracing/streaming_buffer.rb
Constant Summary collapse
- DEFAULT_QUEUE_SIZE =
10_000- FLUSH_DELAY =
0.005- MAX_FLUSH_WAIT =
three seconds
3
Constants included from Constants
Constants::GRPC_ERROR_NAME_METRIC, Constants::GRPC_OTHER_ERROR_METRIC, Constants::QUEUE_DUMPED_METRIC, Constants::RESPONSE_ERROR_METRIC, Constants::SPANS_SEEN_METRIC, Constants::SPANS_SENT_METRIC, Constants::SUPPORTABILITY_PREFIX
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#<<(segment) ⇒ Object
Pushes the segment given onto the queue.
-
#batch_enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present.
-
#clear_queue ⇒ Object
Drops all segments from the queue and records a supportability metric for the event.
- #close_queue ⇒ Object
-
#enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present If
nilis popped, the queue is closing. -
#flush_queue ⇒ Object
Waits for the queue to be fully consumed or for the waiting consumers to release.
-
#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer
constructor
A new instance of StreamingBuffer.
-
#transfer(new_buffer) ⇒ Object
Dumps the contents of this streaming buffer onto the given buffer and closes the queue.
Constructor Details
#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer
Returns a new instance of StreamingBuffer.
23 24 25 26 27 28 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 23 def initialize(max_size = DEFAULT_QUEUE_SIZE) @max_size = max_size @lock = Mutex.new @queue = Queue.new @batch = Array.new end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
21 22 23 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 21 def queue @queue end |
Instance Method Details
#<<(segment) ⇒ Object
Pushes the segment given onto the queue.
If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.
When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.
47 48 49 50 51 52 53 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 47 def <<(segment) @lock.synchronize do clear_queue if @queue.size >= @max_size NewRelic::Agent.increment_metric(SPANS_SEEN_METRIC) @queue.push(segment) end end |
#batch_enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present
yielding is deferred until batch_size spans is reached.
If nil is popped, the queue is closing. A final yield on non-empty batch is fired.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 118 def batch_enumerator return enum_for(:enumerator) unless block_given? loop do if proc_or_segment = @queue.pop(false) NewRelic::Agent.increment_metric(SPANS_SENT_METRIC) @batch << transform(proc_or_segment) if @batch.size >= BATCH_SIZE yield(SpanBatch.new(spans: @batch)) @batch.clear end else yield(SpanBatch.new(spans: @batch)) unless @batch.empty? raise ClosedQueueError end end end |
#clear_queue ⇒ Object
Drops all segments from the queue and records a supportability metric for the event.
57 58 59 60 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 57 def clear_queue @queue.clear NewRelic::Agent.increment_metric(QUEUE_DUMPED_METRIC) end |
#close_queue ⇒ Object
80 81 82 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 80 def close_queue @lock.synchronize { @queue.close } end |
#enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present If nil is popped, the queue is closing.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 92 def enumerator return enum_for(:enumerator) unless block_given? loop do if segment = @queue.pop(false) NewRelic::Agent.increment_metric(SPANS_SENT_METRIC) yield(transform(segment)) else raise ClosedQueueError end end end |
#flush_queue ⇒ Object
Waits for the queue to be fully consumed or for the waiting consumers to release.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 64 def flush_queue @queue.num_waiting.times { @queue.push(nil) } close_queue # Logs if we're throwing away spans because nothing's # waiting to take them off the queue. if @queue.num_waiting == 0 && !@queue.empty? NewRelic::Agent.logger.warn("Discarding #{@queue.size} segments on Streaming Buffer") return end # Only wait a short while for queue to flush cutoff = Process.clock_gettime(Process::CLOCK_MONOTONIC) + MAX_FLUSH_WAIT until @queue.empty? || Process.clock_gettime(Process::CLOCK_MONOTONIC) >= cutoff do sleep(FLUSH_DELAY) end end |
#transfer(new_buffer) ⇒ Object
Dumps the contents of this streaming buffer onto the given buffer and closes the queue
32 33 34 35 36 37 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 32 def transfer(new_buffer) @lock.synchronize do until @queue.empty? do new_buffer.push(@queue.pop) end @queue.close end end |