Class: Quonfig::Telemetry::TelemetryReporter
- Inherits:
-
Object
- Object
- Quonfig::Telemetry::TelemetryReporter
- Defined in:
- lib/quonfig/telemetry/telemetry_reporter.rb
Overview
Owns the background thread that periodically drains the context aggregators and POSTs a JSON telemetry batch to <telemetry_destination>/api/v1/telemetry/.
Wire shape matches api-telemetry’s TelemetryEventsSchema:
{
"instanceHash": "...",
"events": [
{ "summaries": { "start": ..., "end": ..., "summaries": [...] } },
{ "contextShapes": { "shapes": [...] } },
{ "exampleContexts": { "examples": [...] } }
]
}
Auth is HTTP Basic with username “1” and the SDK key as password (matching sdk-node and sdk-go). The X-Quonfig-SDK-Version header carries the ruby-<VERSION> identifier.
Constant Summary collapse
- LOG =
Quonfig::InternalLogger.new(self)
- DEFAULT_INITIAL_DELAY_SECONDS =
8- DEFAULT_MAX_DELAY_SECONDS =
600
Instance Method Summary collapse
-
#at_exit_registered? ⇒ Boolean
Visible for tests.
- #enabled? ⇒ Boolean
-
#initialize(options:, instance_hash:, context_shape_aggregator: nil, example_contexts_aggregator: nil, evaluation_summaries_aggregator: nil, sync_interval: nil, http_connection: nil) ⇒ TelemetryReporter
constructor
A new instance of TelemetryReporter.
-
#record(context) ⇒ Object
Record a context across the context-driven aggregators.
- #record_evaluation(**kwargs) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
-
#sync ⇒ Object
Drain all aggregators and POST the batch.
Constructor Details
#initialize(options:, instance_hash:, context_shape_aggregator: nil, example_contexts_aggregator: nil, evaluation_summaries_aggregator: nil, sync_interval: nil, http_connection: nil) ⇒ TelemetryReporter
Returns a new instance of TelemetryReporter.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 29 def initialize(options:, instance_hash:, context_shape_aggregator: nil, example_contexts_aggregator: nil, evaluation_summaries_aggregator: nil, sync_interval: nil, http_connection: nil) @options = @instance_hash = instance_hash @sdk_key = .sdk_key @telemetry_destination = .telemetry_destination @context_shape_aggregator = context_shape_aggregator @example_contexts_aggregator = example_contexts_aggregator @evaluation_summaries_aggregator = evaluation_summaries_aggregator @http_connection = http_connection @sync_interval = calculate_sync_interval(sync_interval) @stopped = Concurrent::AtomicBoolean.new(false) @thread = nil @at_exit_registered = false end |
Instance Method Details
#at_exit_registered? ⇒ Boolean
Visible for tests.
140 141 142 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 140 def at_exit_registered? @at_exit_registered end |
#enabled? ⇒ Boolean
49 50 51 52 53 54 55 56 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 49 def enabled? return false if @sdk_key.nil? || @sdk_key.to_s.empty? return false if @telemetry_destination.nil? || @telemetry_destination.to_s.empty? !@context_shape_aggregator.nil? || !@example_contexts_aggregator.nil? || !@evaluation_summaries_aggregator.nil? end |
#record(context) ⇒ Object
Record a context across the context-driven aggregators. Evaluation summaries are recorded separately via record_evaluation(…) since they require the evaluation result.
61 62 63 64 65 66 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 61 def record(context) return if context.nil? @context_shape_aggregator&.push(context) @example_contexts_aggregator&.record(context) end |
#record_evaluation(**kwargs) ⇒ Object
68 69 70 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 68 def record_evaluation(**kwargs) @evaluation_summaries_aggregator&.record(**kwargs) end |
#start ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 72 def start return if @thread&.alive? return unless enabled? @stopped.make_false register_at_exit_handler @thread = Thread.new do Thread.current.name = 'quonfig-telemetry-reporter' LOG.debug "Telemetry reporter started instance_hash=#{@instance_hash} destination=#{@telemetry_destination}" until @stopped.true? begin sleep_duration = @sync_interval.call slept = 0.0 step = 0.5 while slept < sleep_duration && !@stopped.true? sleep([step, sleep_duration - slept].min) slept += step end break if @stopped.true? sync rescue StandardError => e LOG.warn "[quonfig] Telemetry reporter error: #{e.class}: #{e.}" end end end end |
#stop ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 101 def stop @stopped.make_true thread = @thread @thread = nil thread&.wakeup if thread&.alive? # Final drain attempt on stop so tests / short-lived processes # don't silently drop pending telemetry. begin sync rescue StandardError => e LOG.debug "[quonfig] Final telemetry sync failed: #{e.class}: #{e.}" end end |
#sync ⇒ Object
Drain all aggregators and POST the batch. Public so tests can trigger a sync without waiting for the background loop.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 117 def sync events = [] if (summaries_event = @evaluation_summaries_aggregator&.drain_event) events << summaries_event end if (shape_event = @context_shape_aggregator&.drain_event) events << shape_event end if (example_event = @example_contexts_aggregator&.drain_event) events << example_event end return if events.empty? payload = { 'instanceHash' => @instance_hash, 'events' => events } post(payload) end |