Class: NewRelic::Agent::InfiniteTracing::Client

Inherits:
Object
  • Object
show all
Includes:
Constants
Defined in:
lib/infinite_tracing/client.rb

Constant Summary

Constants included from Constants

NewRelic::Agent::InfiniteTracing::Constants::GRPC_ERROR_NAME_METRIC, NewRelic::Agent::InfiniteTracing::Constants::GRPC_OTHER_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::QUEUE_DUMPED_METRIC, NewRelic::Agent::InfiniteTracing::Constants::RESPONSE_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SEEN_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SENT_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SUPPORTABILITY_PREFIX

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



18
19
20
21
22
# File 'lib/infinite_tracing/client.rb', line 18

def initialize
  @suspended = false
  @response_handler = nil
  @lock = Mutex.new
end

Instance Method Details

#<<(segment) ⇒ Object



24
25
26
# File 'lib/infinite_tracing/client.rb', line 24

def <<(segment)
  buffer << segment
end

#batching_enabled?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/infinite_tracing/client.rb', line 28

def batching_enabled?
  NewRelic::Agent.config[:'infinite_tracing.batching']
end

#bufferObject



47
48
49
# File 'lib/infinite_tracing/client.rb', line 47

def buffer
  @buffer ||= new_streaming_buffer
end

#flushObject



51
52
53
# File 'lib/infinite_tracing/client.rb', line 51

def flush
  buffer.flush_queue
end

#formatted_class_name(class_name) ⇒ Object

Turns camelcase base class name into upper snake case version of the name.



56
57
58
59
# File 'lib/infinite_tracing/client.rb', line 56

def formatted_class_name(class_name)
  class_name = class_name.split(":")[-1]
  (class_name.gsub!(/(.)([A-Z])/, '\1_\2') || class_name).upcase
end

#grpc_error_metric_name(error) ⇒ Object

Literal codes are all mapped to unique class names, so we can deduce the name of the error to report in the metric from the error's class name.



63
64
65
# File 'lib/infinite_tracing/client.rb', line 63

def grpc_error_metric_name(error)
  GRPC_ERROR_NAME_METRIC % formatted_class_name(error.class.name)
end

#handle_closeObject

This method is called when the server closes the record status stream without raising an error. The Channel/Connection is not closed or reset in this case. We simply start streaming again, which will reuse the channel/connection to the server and re-establish the gRPC bi-directional stream. Useful for the server to initiate a load-balancing scheme.



100
101
102
103
104
# File 'lib/infinite_tracing/client.rb', line 100

def handle_close
  NewRelic::Agent.logger.debug("The gRPC Trace Observer closed the stream with OK response. " \
    "Restarting the stream.")
  start_streaming
end

#handle_error(error) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/infinite_tracing/client.rb', line 78

def handle_error(error)
  record_error_metrics_and_log(error)

  case error
  when GRPC::Unavailable then restart
  when GRPC::FailedPrecondition then restart
  when GRPC::Unimplemented then suspend
  else
    # Set exponential backoff to false so we'll reconnect at periodic (15 second) intervals instead
    start_streaming(false)
  end
end

#new_streaming_bufferObject

provides the correct streaming buffer instance based on whether the client is currently suspended.



42
43
44
45
# File 'lib/infinite_tracing/client.rb', line 42

def new_streaming_buffer
  buffer_class = suspended? ? SuspendedStreamingBuffer : StreamingBuffer
  buffer_class.new(Config.span_events_queue_size)
end

#record_error_metrics_and_log(error) ⇒ Object

Reports AND logs general response metric along with a more specific error metric



68
69
70
71
72
73
74
75
76
# File 'lib/infinite_tracing/client.rb', line 68

def record_error_metrics_and_log(error)
  NewRelic::Agent.record_metric(RESPONSE_ERROR_METRIC, 0.0)
  if error.is_a?(GRPC::BadStatus)
    NewRelic::Agent.record_metric(grpc_error_metric_name(error), 0.0)
  else
    NewRelic::Agent.record_metric(GRPC_OTHER_ERROR_METRIC, 0.0)
  end
  NewRelic::Agent.logger.warn("gRPC response error received.", error)
end

#record_span_batches(exponential_backoff) ⇒ Object



158
159
160
# File 'lib/infinite_tracing/client.rb', line 158

def record_span_batches(exponential_backoff)
  RecordStatusHandler.new(self, Connection.record_span_batches(self, buffer.batch_enumerator, exponential_backoff))
end

#record_spans(exponential_backoff) ⇒ Object



154
155
156
# File 'lib/infinite_tracing/client.rb', line 154

def record_spans(exponential_backoff)
  RecordStatusHandler.new(self, Connection.record_spans(self, buffer.enumerator, exponential_backoff))
end

#reset_connectionObject



119
120
121
122
123
# File 'lib/infinite_tracing/client.rb', line 119

def reset_connection
  @lock.synchronize do
    Connection.reset
  end
end

#response_handler(backoff) ⇒ Object



162
163
164
# File 'lib/infinite_tracing/client.rb', line 162

def response_handler(backoff)
  @response_handler = batching_enabled? ? record_span_batches(backoff) : record_spans(backoff)
end

#restartObject



133
134
135
136
137
# File 'lib/infinite_tracing/client.rb', line 133

def restart
  reset_connection
  transfer_buffer
  start_streaming
end

#start_streaming(exponential_backoff = true) ⇒ Object



147
148
149
150
151
152
# File 'lib/infinite_tracing/client.rb', line 147

def start_streaming(exponential_backoff = true)
  return if suspended?

  Connection.instance.wait_for_agent_connect
  @lock.synchronize { response_handler(exponential_backoff) }
end

#stopObject



139
140
141
142
143
144
145
# File 'lib/infinite_tracing/client.rb', line 139

def stop
  return unless @response_handler
  @lock.synchronize do
    @response_handler.stop
    @response_handler = nil
  end
end

#suspendObject

Places the client into suspended state whereby client will no longer attempt to reconnect to the gRPC server nor will it attempt to send span events henceforth. The Suspended Streaming Buffer will be installed in this state.



109
110
111
112
113
114
115
116
117
# File 'lib/infinite_tracing/client.rb', line 109

def suspend
  return if suspended?
  @lock.synchronize do
    @suspended = true
    @buffer = new_streaming_buffer
    NewRelic::Agent.logger.warn("The Trace Observer host signaled to suspend streaming span events. " \
      "No more span events will be sent during this session.")
  end
end

#suspended?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/infinite_tracing/client.rb', line 91

def suspended?
  @suspended
end

#transfer(previous_client) ⇒ Object

Transfers spans in streaming buffer from previous client (if any) and returns self (so we chain the call)



34
35
36
37
38
# File 'lib/infinite_tracing/client.rb', line 34

def transfer(previous_client)
  return self unless previous_client
  previous_client.buffer.transfer(buffer)
  self
end

#transfer_bufferObject



125
126
127
128
129
130
131
# File 'lib/infinite_tracing/client.rb', line 125

def transfer_buffer
  @lock.synchronize do
    old_buffer = @buffer
    @buffer = new_streaming_buffer
    old_buffer.transfer(@buffer)
  end
end