Class: NewRelic::Agent::InfiniteTracing::Client
- Inherits:
-
Object
- Object
- NewRelic::Agent::InfiniteTracing::Client
show all
- Includes:
- Constants
- Defined in:
- lib/infinite_tracing/client.rb
Constant Summary
Constants included
from Constants
NewRelic::Agent::InfiniteTracing::Constants::GRPC_ERROR_NAME_METRIC, NewRelic::Agent::InfiniteTracing::Constants::GRPC_OTHER_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::QUEUE_DUMPED_METRIC, NewRelic::Agent::InfiniteTracing::Constants::RESPONSE_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SEEN_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SENT_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SUPPORTABILITY_PREFIX
Instance Method Summary
collapse
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
18
19
20
21
22
|
# File 'lib/infinite_tracing/client.rb', line 18
def initialize
@suspended = false
@response_handler = nil
@lock = Mutex.new
end
|
Instance Method Details
#<<(segment) ⇒ Object
24
25
26
|
# File 'lib/infinite_tracing/client.rb', line 24
def <<(segment)
buffer << segment
end
|
#batching_enabled? ⇒ Boolean
28
29
30
|
# File 'lib/infinite_tracing/client.rb', line 28
def batching_enabled?
NewRelic::Agent.config[:'infinite_tracing.batching']
end
|
#buffer ⇒ Object
47
48
49
|
# File 'lib/infinite_tracing/client.rb', line 47
def buffer
@buffer ||= new_streaming_buffer
end
|
#flush ⇒ Object
51
52
53
|
# File 'lib/infinite_tracing/client.rb', line 51
def flush
buffer.flush_queue
end
|
Turns camelcase base class name into upper snake case version of the name.
56
57
58
59
|
# File 'lib/infinite_tracing/client.rb', line 56
def formatted_class_name(class_name)
class_name = class_name.split(":")[-1]
(class_name.gsub!(/(.)([A-Z])/, '\1_\2') || class_name).upcase
end
|
#grpc_error_metric_name(error) ⇒ Object
Literal codes are all mapped to unique class names, so we can deduce the name of the error to report in the metric from the error's class name.
63
64
65
|
# File 'lib/infinite_tracing/client.rb', line 63
def grpc_error_metric_name(error)
GRPC_ERROR_NAME_METRIC % formatted_class_name(error.class.name)
end
|
#handle_close ⇒ Object
This method is called when the server closes the record status stream without raising an error. The Channel/Connection is not closed or reset in this case. We simply start streaming again, which will reuse the channel/connection to the server and re-establish the gRPC bi-directional stream. Useful for the server to initiate a load-balancing scheme.
100
101
102
103
104
|
# File 'lib/infinite_tracing/client.rb', line 100
def handle_close
NewRelic::Agent.logger.debug("The gRPC Trace Observer closed the stream with OK response. " \
"Restarting the stream.")
start_streaming
end
|
#handle_error(error) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/infinite_tracing/client.rb', line 78
def handle_error(error)
record_error_metrics_and_log(error)
case error
when GRPC::Unavailable then restart
when GRPC::FailedPrecondition then restart
when GRPC::Unimplemented then suspend
else
start_streaming(false)
end
end
|
#new_streaming_buffer ⇒ Object
provides the correct streaming buffer instance based on whether the client is currently suspended.
#record_error_metrics_and_log(error) ⇒ Object
Reports AND logs general response metric along with a more specific error metric
#record_span_batches(exponential_backoff) ⇒ Object
#record_spans(exponential_backoff) ⇒ Object
#reset_connection ⇒ Object
119
120
121
122
123
|
# File 'lib/infinite_tracing/client.rb', line 119
def reset_connection
@lock.synchronize do
Connection.reset
end
end
|
#response_handler(backoff) ⇒ Object
162
163
164
|
# File 'lib/infinite_tracing/client.rb', line 162
def response_handler(backoff)
@response_handler = batching_enabled? ? record_span_batches(backoff) : record_spans(backoff)
end
|
#restart ⇒ Object
133
134
135
136
137
|
# File 'lib/infinite_tracing/client.rb', line 133
def restart
reset_connection
transfer_buffer
start_streaming
end
|
#start_streaming(exponential_backoff = true) ⇒ Object
147
148
149
150
151
152
|
# File 'lib/infinite_tracing/client.rb', line 147
def start_streaming(exponential_backoff = true)
return if suspended?
Connection.instance.wait_for_agent_connect
@lock.synchronize { response_handler(exponential_backoff) }
end
|
#stop ⇒ Object
139
140
141
142
143
144
145
|
# File 'lib/infinite_tracing/client.rb', line 139
def stop
return unless @response_handler
@lock.synchronize do
@response_handler.stop
@response_handler = nil
end
end
|
#suspend ⇒ Object
Places the client into suspended state whereby client will no longer attempt to reconnect to the gRPC server nor will it attempt to send span events henceforth. The Suspended Streaming Buffer will be installed in this state.
109
110
111
112
113
114
115
116
117
|
# File 'lib/infinite_tracing/client.rb', line 109
def suspend
return if suspended?
@lock.synchronize do
@suspended = true
@buffer = new_streaming_buffer
NewRelic::Agent.logger.warn("The Trace Observer host signaled to suspend streaming span events. " \
"No more span events will be sent during this session.")
end
end
|
#suspended? ⇒ Boolean
91
92
93
|
# File 'lib/infinite_tracing/client.rb', line 91
def suspended?
@suspended
end
|
#transfer(previous_client) ⇒ Object
Transfers spans in streaming buffer from previous client (if any) and returns self (so we chain the call)
34
35
36
37
38
|
# File 'lib/infinite_tracing/client.rb', line 34
def transfer(previous_client)
return self unless previous_client
previous_client.buffer.transfer(buffer)
self
end
|
#transfer_buffer ⇒ Object
125
126
127
128
129
130
131
|
# File 'lib/infinite_tracing/client.rb', line 125
def transfer_buffer
@lock.synchronize do
old_buffer = @buffer
@buffer = new_streaming_buffer
old_buffer.transfer(@buffer)
end
end
|