Class: Vivarium::OtelSpanStreamer
- Inherits:
-
Object
- Object
- Vivarium::OtelSpanStreamer
- Defined in:
- lib/vivarium/otel_stream.rb
Overview
Reconstructs method-call spans live from the event stream and enqueues each completed span to an OtelHttpExporter. Each observation session becomes one trace with a “vivarium session” root span, per-thread child spans, and method spans nested under the current thread/method span.
Constant Summary collapse
- SESSION_ROOT_SALT =
0x766976617269756d- THREAD_ROOT_SALT =
0x7468726561640000- PROCESS_EXIT_EVENT_NAME =
"proc_exit"
Instance Method Summary collapse
-
#finalize(stop_ktime:) ⇒ Object
Close any still-open spans (dangling) at end of observation.
-
#initialize(exporter:, session_start_iso:, session_start_ktime:, observer_pid:, main_tid:) ⇒ OtelSpanStreamer
constructor
A new instance of OtelSpanStreamer.
- #on_event(ev) ⇒ Object
Constructor Details
#initialize(exporter:, session_start_iso:, session_start_ktime:, observer_pid:, main_tid:) ⇒ OtelSpanStreamer
Returns a new instance of OtelSpanStreamer.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/vivarium/otel_stream.rb', line 109 def initialize(exporter:, session_start_iso:, session_start_ktime:, observer_pid:, main_tid:) @exporter = exporter start_unix = Vivarium::OtelExporter.iso_to_unix_ns(session_start_iso) start_ktime = session_start_ktime.to_i @to_unix = ->(k) { (start_unix + (k.to_i - start_ktime)).to_s } @session_start_ktime = start_ktime @observer_pid = observer_pid @main_tid = main_tid @trace_hi, @trace_lo = Vivarium.synth_trace_id(observer_pid, main_tid, SESSION_ROOT_SALT, start_ktime) @session_span_id = Vivarium.synth_span_id(@trace_hi, @trace_lo, observer_pid, start_ktime ^ SESSION_ROOT_SALT) @stacks = Hash.new { |h, k| h[k] = [] } @thread_spans = {} @bpf_thread_span_ids = {} end |
Instance Method Details
#finalize(stop_ktime:) ⇒ Object
Close any still-open spans (dangling) at end of observation.
135 136 137 138 139 140 141 |
# File 'lib/vivarium/otel_stream.rb', line 135 def finalize(stop_ktime:) @stacks.each_value do |stack| emit_method_span(stack.pop, stop_ktime) until stack.empty? end @thread_spans.each_value { |rec| emit_thread_span(rec, stop_ktime) unless rec[:emitted] } emit_session_span(stop_ktime) end |
#on_event(ev) ⇒ Object
124 125 126 127 128 129 130 131 132 |
# File 'lib/vivarium/otel_stream.rb', line 124 def on_event(ev) return if Vivarium::OtelExporter.internal_comm?(ev.comm) case ev.event_name when "span_start" then handle_start(ev) when "span_stop" then handle_stop(ev) else handle_event(ev) end end |