Class: Quonfig::Telemetry::TelemetryReporter

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/telemetry/telemetry_reporter.rb

Overview

Owns the background thread that periodically drains the context aggregators and POSTs a JSON telemetry batch to <telemetry_destination>/api/v1/telemetry/.

Wire shape matches api-telemetry’s TelemetryEventsSchema:

{
  "instanceHash": "...",
  "events": [
    { "summaries":       { "start": ..., "end": ..., "summaries": [...] } },
    { "contextShapes":   { "shapes":   [...] } },
    { "exampleContexts": { "examples": [...] } }
  ]
}

Auth is HTTP Basic with username “1” and the SDK key as password (matching sdk-node and sdk-go). The X-Quonfig-SDK-Version header carries the ruby-<VERSION> identifier.

Constant Summary collapse

LOG =
Quonfig::InternalLogger.new(self)
DEFAULT_INITIAL_DELAY_SECONDS =
8
DEFAULT_MAX_DELAY_SECONDS =
600

Instance Method Summary collapse

Constructor Details

#initialize(options:, instance_hash:, context_shape_aggregator: nil, example_contexts_aggregator: nil, evaluation_summaries_aggregator: nil, sync_interval: nil, http_connection: nil) ⇒ TelemetryReporter

Returns a new instance of TelemetryReporter.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 29

def initialize(options:, instance_hash:,
               context_shape_aggregator: nil,
               example_contexts_aggregator: nil,
               evaluation_summaries_aggregator: nil,
               sync_interval: nil,
               http_connection: nil)
  @options = options
  @instance_hash = instance_hash
  @sdk_key = options.sdk_key
  @telemetry_destination = options.telemetry_destination
  @context_shape_aggregator = context_shape_aggregator
  @example_contexts_aggregator = example_contexts_aggregator
  @evaluation_summaries_aggregator = evaluation_summaries_aggregator
  @http_connection = http_connection
  @sync_interval = calculate_sync_interval(sync_interval)
  @stopped = Concurrent::AtomicBoolean.new(false)
  @thread = nil
  @at_exit_registered = false
end

Instance Method Details

#at_exit_registered?Boolean

Visible for tests.

Returns:

  • (Boolean)


140
141
142
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 140

def at_exit_registered?
  @at_exit_registered
end

#enabled?Boolean

Returns:

  • (Boolean)


49
50
51
52
53
54
55
56
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 49

def enabled?
  return false if @sdk_key.nil? || @sdk_key.to_s.empty?
  return false if @telemetry_destination.nil? || @telemetry_destination.to_s.empty?

  !@context_shape_aggregator.nil? ||
    !@example_contexts_aggregator.nil? ||
    !@evaluation_summaries_aggregator.nil?
end

#record(context) ⇒ Object

Record a context across the context-driven aggregators. Evaluation summaries are recorded separately via record_evaluation(…) since they require the evaluation result.



61
62
63
64
65
66
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 61

def record(context)
  return if context.nil?

  @context_shape_aggregator&.push(context)
  @example_contexts_aggregator&.record(context)
end

#record_evaluation(**kwargs) ⇒ Object



68
69
70
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 68

def record_evaluation(**kwargs)
  @evaluation_summaries_aggregator&.record(**kwargs)
end

#startObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 72

def start
  return if @thread&.alive?
  return unless enabled?

  @stopped.make_false
  register_at_exit_handler
  @thread = Thread.new do
    Thread.current.name = 'quonfig-telemetry-reporter'
    LOG.debug "Telemetry reporter started instance_hash=#{@instance_hash} destination=#{@telemetry_destination}"

    until @stopped.true?
      begin
        sleep_duration = @sync_interval.call
        slept = 0.0
        step = 0.5
        while slept < sleep_duration && !@stopped.true?
          sleep([step, sleep_duration - slept].min)
          slept += step
        end
        break if @stopped.true?

        sync
      rescue StandardError => e
        LOG.warn "[quonfig] Telemetry reporter error: #{e.class}: #{e.message}"
      end
    end
  end
end

#stopObject



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 101

def stop
  @stopped.make_true
  thread = @thread
  @thread = nil
  thread&.wakeup if thread&.alive?
  # Final drain attempt on stop so tests / short-lived processes
  # don't silently drop pending telemetry.
  begin
    sync
  rescue StandardError => e
    LOG.debug "[quonfig] Final telemetry sync failed: #{e.class}: #{e.message}"
  end
end

#syncObject

Drain all aggregators and POST the batch. Public so tests can trigger a sync without waiting for the background loop.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/quonfig/telemetry/telemetry_reporter.rb', line 117

def sync
  events = []
  if (summaries_event = @evaluation_summaries_aggregator&.drain_event)
    events << summaries_event
  end
  if (shape_event = @context_shape_aggregator&.drain_event)
    events << shape_event
  end
  if (example_event = @example_contexts_aggregator&.drain_event)
    events << example_event
  end

  return if events.empty?

  payload = {
    'instanceHash' => @instance_hash,
    'events' => events
  }

  post(payload)
end