Class: NewRelic::Agent::InfiniteTracing::StreamingBuffer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable, Constants
Defined in:
lib/infinite_tracing/streaming_buffer.rb

Constant Summary collapse

DEFAULT_QUEUE_SIZE =
10_000
FLUSH_DELAY =
0.005
MAX_FLUSH_WAIT =

three seconds

3

Constants included from Constants

Constants::GRPC_ERROR_NAME_METRIC, Constants::GRPC_OTHER_ERROR_METRIC, Constants::QUEUE_DUMPED_METRIC, Constants::RESPONSE_ERROR_METRIC, Constants::SPANS_SEEN_METRIC, Constants::SPANS_SENT_METRIC, Constants::SUPPORTABILITY_PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer

Returns a new instance of StreamingBuffer.



24
25
26
27
28
29
# File 'lib/infinite_tracing/streaming_buffer.rb', line 24

def initialize max_size = DEFAULT_QUEUE_SIZE
  @max_size = max_size
  @lock = Mutex.new
  @queue = Queue.new
  @batch = Array.new
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/infinite_tracing/streaming_buffer.rb', line 22

def queue
  @queue
end

Instance Method Details

#<<(segment) ⇒ Object

Pushes the segment given onto the queue.

If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.

When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.



48
49
50
51
52
53
54
# File 'lib/infinite_tracing/streaming_buffer.rb', line 48

def << segment
  @lock.synchronize do
    clear_queue if @queue.size >= @max_size
    NewRelic::Agent.increment_metric SPANS_SEEN_METRIC
    @queue.push segment
  end
end

#batch_enumeratorObject

Returns the blocking enumerator that will pop items off the queue while any items are present

yielding is deferred until batch_size spans is reached.

If nil is popped, the queue is closing. A final yield on non-empty batch is fired.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/infinite_tracing/streaming_buffer.rb', line 119

def batch_enumerator
  return enum_for(:enumerator) unless block_given?
  loop do
    if proc_or_segment = @queue.pop(false)
      NewRelic::Agent.increment_metric SPANS_SENT_METRIC
      @batch << transform(proc_or_segment)
      if @batch.size >= BATCH_SIZE
        yield SpanBatch.new(spans: @batch)
        @batch.clear
      end

    else
      yield SpanBatch.new(spans: @batch) unless @batch.empty?
      raise ClosedQueueError
    end
  end
end

#clear_queueObject

Drops all segments from the queue and records a supportability metric for the event.



58
59
60
61
# File 'lib/infinite_tracing/streaming_buffer.rb', line 58

def clear_queue
  @queue.clear
  NewRelic::Agent.increment_metric QUEUE_DUMPED_METRIC
end

#close_queueObject



81
82
83
# File 'lib/infinite_tracing/streaming_buffer.rb', line 81

def close_queue
  @lock.synchronize { @queue.close }
end

#enumeratorObject

Returns the blocking enumerator that will pop items off the queue while any items are present If nil is popped, the queue is closing.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/infinite_tracing/streaming_buffer.rb', line 93

def enumerator
  return enum_for(:enumerator) unless block_given?
  loop do
    if segment = @queue.pop(false)
      NewRelic::Agent.increment_metric SPANS_SENT_METRIC
      yield transform(segment)

    else
      raise ClosedQueueError
    end
  end
end

#flush_queueObject

Waits for the queue to be fully consumed or for the waiting consumers to release.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/infinite_tracing/streaming_buffer.rb', line 65

def flush_queue
  @queue.num_waiting.times { @queue.push nil }
  close_queue

  # Logs if we're throwing away spans because nothing's
  # waiting to take them off the queue.
  if @queue.num_waiting == 0 && !@queue.empty?
    NewRelic::Agent.logger.warn "Discarding #{@queue.size} segments on Streaming Buffer"
    return
  end

  # Only wait a short while for queue to flush
  cutoff = Process.clock_gettime(Process::CLOCK_MONOTONIC) + MAX_FLUSH_WAIT
  until @queue.empty? || Process.clock_gettime(Process::CLOCK_MONOTONIC) >= cutoff do sleep(FLUSH_DELAY) end
end

#transfer(new_buffer) ⇒ Object

Dumps the contents of this streaming buffer onto the given buffer and closes the queue



33
34
35
36
37
38
# File 'lib/infinite_tracing/streaming_buffer.rb', line 33

def transfer new_buffer
  @lock.synchronize do
    until @queue.empty? do new_buffer.push @queue.pop end
    @queue.close
  end
end