Class: Vivarium::OtelSpanStreamer

Inherits:
Object
  • Object
show all
Defined in:
lib/vivarium/otel_stream.rb

Overview

Reconstructs method-call spans live from the event stream and enqueues each completed span to an OtelHttpExporter. Each observation session becomes one trace with a “vivarium session” root span, per-thread child spans, and method spans nested under the current thread/method span.

Constant Summary collapse

SESSION_ROOT_SALT =
0x766976617269756d
THREAD_ROOT_SALT =
0x7468726561640000
PROCESS_EXIT_EVENT_NAME =
"proc_exit"

Instance Method Summary collapse

Constructor Details

#initialize(exporter:, session_start_iso:, session_start_ktime:, observer_pid:, main_tid:) ⇒ OtelSpanStreamer

Returns a new instance of OtelSpanStreamer.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/vivarium/otel_stream.rb', line 109

def initialize(exporter:, session_start_iso:, session_start_ktime:, observer_pid:, main_tid:)
  @exporter = exporter
  start_unix = Vivarium::OtelExporter.iso_to_unix_ns(session_start_iso)
  start_ktime = session_start_ktime.to_i
  @to_unix = ->(k) { (start_unix + (k.to_i - start_ktime)).to_s }
  @session_start_ktime = start_ktime
  @observer_pid = observer_pid
  @main_tid = main_tid
  @trace_hi, @trace_lo = Vivarium.synth_trace_id(observer_pid, main_tid, SESSION_ROOT_SALT, start_ktime)
  @session_span_id = Vivarium.synth_span_id(@trace_hi, @trace_lo, observer_pid, start_ktime ^ SESSION_ROOT_SALT)
  @stacks = Hash.new { |h, k| h[k] = [] }
  @thread_spans = {}
  @bpf_thread_span_ids = {}
end

Instance Method Details

#finalize(stop_ktime:) ⇒ Object

Close any still-open spans (dangling) at end of observation.



135
136
137
138
139
140
141
# File 'lib/vivarium/otel_stream.rb', line 135

def finalize(stop_ktime:)
  @stacks.each_value do |stack|
    emit_method_span(stack.pop, stop_ktime) until stack.empty?
  end
  @thread_spans.each_value { |rec| emit_thread_span(rec, stop_ktime) unless rec[:emitted] }
  emit_session_span(stop_ktime)
end

#on_event(ev) ⇒ Object



124
125
126
127
128
129
130
131
132
# File 'lib/vivarium/otel_stream.rb', line 124

def on_event(ev)
  return if Vivarium::OtelExporter.internal_comm?(ev.comm)

  case ev.event_name
  when "span_start" then handle_start(ev)
  when "span_stop" then handle_stop(ev)
  else handle_event(ev)
  end
end