Class: NewRelic::Agent::InfiniteTracing::StreamingBuffer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable, Constants
Defined in:
lib/infinite_tracing/streaming_buffer.rb

Constant Summary collapse

DEFAULT_QUEUE_SIZE =
10_000
FLUSH_DELAY =
0.005
MAX_FLUSH_WAIT =

three seconds

3

Constants included from Constants

Constants::GRPC_ERROR_NAME_METRIC, Constants::GRPC_OTHER_ERROR_METRIC, Constants::QUEUE_DUMPED_METRIC, Constants::RESPONSE_ERROR_METRIC, Constants::SPANS_SEEN_METRIC, Constants::SPANS_SENT_METRIC, Constants::SUPPORTABILITY_PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer

Returns a new instance of StreamingBuffer.



24
25
26
27
28
29
# File 'lib/infinite_tracing/streaming_buffer.rb', line 24

def initialize(max_size = DEFAULT_QUEUE_SIZE)
  @max_size = max_size
  @lock = Mutex.new
  @queue = Queue.new
  @batch = Array.new
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/infinite_tracing/streaming_buffer.rb', line 22

def queue
  @queue
end

Instance Method Details

#<<(segment) ⇒ Object

Pushes the segment given onto the queue.

If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.

When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.



48
49
50
51
52
53
54
# File 'lib/infinite_tracing/streaming_buffer.rb', line 48

def <<(segment)
  @lock.synchronize do
    clear_queue if @queue.size >= @max_size
    NewRelic::Agent.increment_metric(SPANS_SEEN_METRIC)
    @queue.push(segment)
  end
end

#batch_enumeratorObject

Returns the blocking enumerator that will pop items off the queue while any items are present

yielding is deferred until batch_size spans is reached.

If nil is popped, the queue is closing. A final yield on non-empty batch is fired.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/infinite_tracing/streaming_buffer.rb', line 119

def batch_enumerator
  return enum_for(:enumerator) unless block_given?
  loop do
    if proc_or_segment = @queue.pop(false)
      NewRelic::Agent.increment_metric(SPANS_SENT_METRIC)
      @batch << transform(proc_or_segment)
      if @batch.size >= BATCH_SIZE
        yield(SpanBatch.new(spans: @batch))
        @batch.clear
      end

    else
      yield(SpanBatch.new(spans: @batch)) unless @batch.empty?
      raise ClosedQueueError
    end
  end
end

#clear_queueObject

Drops all segments from the queue and records a supportability metric for the event.



58
59
60
61
# File 'lib/infinite_tracing/streaming_buffer.rb', line 58

def clear_queue
  @queue.clear
  NewRelic::Agent.increment_metric(QUEUE_DUMPED_METRIC)
end

#close_queueObject



81
82
83
# File 'lib/infinite_tracing/streaming_buffer.rb', line 81

def close_queue
  @lock.synchronize { @queue.close }
end

#enumeratorObject

Returns the blocking enumerator that will pop items off the queue while any items are present If nil is popped, the queue is closing.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/infinite_tracing/streaming_buffer.rb', line 93

def enumerator
  return enum_for(:enumerator) unless block_given?
  loop do
    if segment = @queue.pop(false)
      NewRelic::Agent.increment_metric(SPANS_SENT_METRIC)
      yield(transform(segment))

    else
      raise ClosedQueueError
    end
  end
end

#flush_queueObject

Waits for the queue to be fully consumed or for the waiting consumers to release.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/infinite_tracing/streaming_buffer.rb', line 65

def flush_queue
  @queue.num_waiting.times { @queue.push(nil) }
  close_queue

  # Logs if we're throwing away spans because nothing's
  # waiting to take them off the queue.
  if @queue.num_waiting == 0 && !@queue.empty?
    NewRelic::Agent.logger.warn("Discarding #{@queue.size} segments on Streaming Buffer")
    return
  end

  # Only wait a short while for queue to flush
  cutoff = Process.clock_gettime(Process::CLOCK_MONOTONIC) + MAX_FLUSH_WAIT
  until @queue.empty? || Process.clock_gettime(Process::CLOCK_MONOTONIC) >= cutoff do sleep(FLUSH_DELAY) end
end

#transfer(new_buffer) ⇒ Object

Dumps the contents of this streaming buffer onto the given buffer and closes the queue



33
34
35
36
37
38
# File 'lib/infinite_tracing/streaming_buffer.rb', line 33

def transfer(new_buffer)
  @lock.synchronize do
    until @queue.empty? do new_buffer.push(@queue.pop) end
    @queue.close
  end
end