Class: NewRelic::Agent::InfiniteTracing::Client

Inherits:
Object
  • Object
show all
Includes:
Constants
Defined in:
lib/infinite_tracing/client.rb

Constant Summary

Constants included from Constants

NewRelic::Agent::InfiniteTracing::Constants::GRPC_ERROR_NAME_METRIC, NewRelic::Agent::InfiniteTracing::Constants::GRPC_OTHER_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::QUEUE_DUMPED_METRIC, NewRelic::Agent::InfiniteTracing::Constants::RESPONSE_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SEEN_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SENT_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SUPPORTABILITY_PREFIX

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



19
20
21
22
23
# File 'lib/infinite_tracing/client.rb', line 19

def initialize
  @suspended = false
  @response_handler = nil
  @lock = Mutex.new
end

Instance Method Details

#<<(segment) ⇒ Object



25
26
27
# File 'lib/infinite_tracing/client.rb', line 25

def <<(segment)
  buffer << segment
end

#bufferObject



44
45
46
# File 'lib/infinite_tracing/client.rb', line 44

def buffer
  @buffer ||= new_streaming_buffer
end

#flushObject



48
49
50
# File 'lib/infinite_tracing/client.rb', line 48

def flush
  buffer.flush_queue
end

#formatted_class_name(class_name) ⇒ Object

Turns camelcase base class name into upper snake case version of the name.



53
54
55
56
# File 'lib/infinite_tracing/client.rb', line 53

def formatted_class_name(class_name)
  class_name = class_name.split(":")[-1]
  (class_name.gsub!(/(.)([A-Z])/, '\1_\2') || class_name).upcase
end

#grpc_error_metric_name(error) ⇒ Object

Literal codes are all mapped to unique class names, so we can deduce the name of the error to report in the metric from the error's class name.



60
61
62
# File 'lib/infinite_tracing/client.rb', line 60

def grpc_error_metric_name(error)
  GRPC_ERROR_NAME_METRIC % formatted_class_name(error.class.name)
end

#handle_closeObject

This method is called when the server closes the record status stream without raising an error. The Channel/Connection is not closed or reset in this case. We simply start streaming again, which will reuse the channel/connection to the server and re-establish the gRPC bi-directional stream. Useful for the server to initiate a load-balancing scheme.



97
98
99
100
101
# File 'lib/infinite_tracing/client.rb', line 97

def handle_close
  NewRelic::Agent.logger.debug("The gRPC Trace Observer closed the stream with OK response. " \
    "Restarting the stream.")
  start_streaming
end

#handle_error(error) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/infinite_tracing/client.rb', line 75

def handle_error(error)
  record_error_metrics_and_log(error)

  case error
  when GRPC::Unavailable then restart
  when GRPC::FailedPrecondition then restart
  when GRPC::Unimplemented then suspend
  else
    # Set exponential backoff to false so we'll reconnect at periodic (15 second) intervals instead
    start_streaming(false)
  end
end

#new_streaming_bufferObject

provides the correct streaming buffer instance based on whether the client is currently suspended.



39
40
41
42
# File 'lib/infinite_tracing/client.rb', line 39

def new_streaming_buffer
  buffer_class = suspended? ? SuspendedStreamingBuffer : StreamingBuffer
  buffer_class.new(Config.span_events_queue_size)
end

#record_error_metrics_and_log(error) ⇒ Object

Reports AND logs general response metric along with a more specific error metric



65
66
67
68
69
70
71
72
73
# File 'lib/infinite_tracing/client.rb', line 65

def record_error_metrics_and_log(error)
  NewRelic::Agent.record_metric(RESPONSE_ERROR_METRIC, 0.0)
  if error.is_a?(GRPC::BadStatus)
    NewRelic::Agent.record_metric(grpc_error_metric_name(error), 0.0)
  else
    NewRelic::Agent.record_metric(GRPC_OTHER_ERROR_METRIC, 0.0)
  end
  NewRelic::Agent.logger.warn("gRPC response error received.", error)
end

#record_span_batches(exponential_backoff) ⇒ Object



154
155
156
# File 'lib/infinite_tracing/client.rb', line 154

def record_span_batches(exponential_backoff)
  RecordStatusHandler.new(self, Connection.record_span_batches(self, buffer.batch_enumerator, exponential_backoff))
end

#record_spans(exponential_backoff) ⇒ Object



150
151
152
# File 'lib/infinite_tracing/client.rb', line 150

def record_spans(exponential_backoff)
  RecordStatusHandler.new(self, Connection.record_spans(self, buffer.enumerator, exponential_backoff))
end

#reset_connectionObject



116
117
118
119
120
# File 'lib/infinite_tracing/client.rb', line 116

def reset_connection
  @lock.synchronize do
    Connection.reset
  end
end

#restartObject



130
131
132
133
134
# File 'lib/infinite_tracing/client.rb', line 130

def restart
  reset_connection
  transfer_buffer
  start_streaming
end

#start_streaming(exponential_backoff = true) ⇒ Object



144
145
146
147
148
# File 'lib/infinite_tracing/client.rb', line 144

def start_streaming(exponential_backoff = true)
  return if suspended?
  Connection.instance.wait_for_agent_connect
  @lock.synchronize { @response_handler = record_spans(exponential_backoff) }
end

#stopObject



136
137
138
139
140
141
142
# File 'lib/infinite_tracing/client.rb', line 136

def stop
  return unless @response_handler
  @lock.synchronize do
    @response_handler.stop
    @response_handler = nil
  end
end

#suspendObject

Places the client into suspended state whereby client will no longer attempt to reconnect to the gRPC server nor will it attempt to send span events henceforth. The Suspended Streaming Buffer will be installed in this state.



106
107
108
109
110
111
112
113
114
# File 'lib/infinite_tracing/client.rb', line 106

def suspend
  return if suspended?
  @lock.synchronize do
    @suspended = true
    @buffer = new_streaming_buffer
    NewRelic::Agent.logger.warn("The Trace Observer host signaled to suspend streaming span events. " \
      "No more span events will be sent during this session.")
  end
end

#suspended?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/infinite_tracing/client.rb', line 88

def suspended?
  @suspended
end

#transfer(previous_client) ⇒ Object

Transfers spans in streaming buffer from previous client (if any) and returns self (so we chain the call)



31
32
33
34
35
# File 'lib/infinite_tracing/client.rb', line 31

def transfer(previous_client)
  return self unless previous_client
  previous_client.buffer.transfer(buffer)
  self
end

#transfer_bufferObject



122
123
124
125
126
127
128
# File 'lib/infinite_tracing/client.rb', line 122

def transfer_buffer
  @lock.synchronize do
    old_buffer = @buffer
    @buffer = new_streaming_buffer
    old_buffer.transfer(@buffer)
  end
end